This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch griffin-2.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/griffin-2.0.0-dev by this push:
     new 46789ff5 Griffin 2.0.0 dev (#672)
46789ff5 is described below

commit 46789ff5f56af7c43f8cee3c65192a3aea904b3f
Author: William Guo <[email protected]>
AuthorDate: Thu Jan 2 10:39:08 2025 +0800

    Griffin 2.0.0 dev (#672)
    
    * add metric test
    
    * init create sql
    
    * metrics persistence service
    
    * remove relativePath
    
    * local dqc app execution engine
    
    * remove hardcoded version
---
 griffin-bom/pom.xml                                |  2 +-
 griffin-dqc/pom.xml                                | 38 +++++++++
 .../org/apache/griffin/dqc/DQCFlowFactory.java     | 71 +++++++++++++++++
 .../java/org/apache/griffin/dqc/DQCLocalApp.java   | 13 ++++
 .../org/apache/griffin/dqc/DQCTriggerService.java  | 39 ++++++++++
 .../main/java/org/apache/griffin/dqc/IDQCFlow.java |  4 +
 .../main/java/org/apache/griffin/dqc/SACFlow.java  | 91 ++++++++++++++++++++++
 .../src/main/resources/application.properties      | 11 +++
 8 files changed, 268 insertions(+), 1 deletion(-)

diff --git a/griffin-bom/pom.xml b/griffin-bom/pom.xml
index 2ed73a84..dcec850c 100644
--- a/griffin-bom/pom.xml
+++ b/griffin-bom/pom.xml
@@ -36,7 +36,7 @@ under the License.
     <url>https://griffin.apache.org</url>
 
     <properties>
-        <spring-boot.version>2.7.3</spring-boot.version>
+        <spring-boot.version>3.3.4</spring-boot.version>
         <mybatis-plus.version>3.5.2</mybatis-plus.version>
         <commons-codec.version>1.11</commons-codec.version>
         <commons-logging.version>1.1.1</commons-logging.version>
diff --git a/griffin-dqc/pom.xml b/griffin-dqc/pom.xml
index 7d503e5c..26dfd3dd 100644
--- a/griffin-dqc/pom.xml
+++ b/griffin-dqc/pom.xml
@@ -33,5 +33,43 @@ under the License.
     <name>${project.artifactId}</name>
     <url>https://griffin.apache.org</url>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.griffin</groupId>
+                <artifactId>griffin-bom</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-batch</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.batch</groupId>
+            <artifactId>spring-batch-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCFlowFactory.java 
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCFlowFactory.java
new file mode 100644
index 00000000..a9a357b6
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCFlowFactory.java
@@ -0,0 +1,71 @@
+package org.apache.griffin.dqc;
+import org.springframework.batch.core.Step;
+import 
org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.job.builder.FlowBuilder;
+import org.springframework.batch.core.job.flow.Flow;
+import org.springframework.batch.core.job.flow.support.SimpleFlow;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.job.builder.JobBuilder;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.transaction.PlatformTransactionManager;
+
+
+@Configuration
+@EnableBatchProcessing
+public class DQCFlowFactory {
+    Logger logger = LoggerFactory.getLogger(DQCFlowFactory.class);
+
+    @Autowired
+    public JobRepository jobRepository;
+
+    @Autowired
+    public PlatformTransactionManager transactionManager;
+
+
+    /**
+     * This Job defines the one typical DQC workflow as below.
+     * <pre>
+     *                  -> FetchSourceCount
+     *                /                     \
+     * PrepareStep -->                        --> CompareStep
+     *                \                     /
+     *                  -> FetchTargetCount
+     * </pre>
+     */
+    @Bean
+    public Job Split_Apply_Combine(JobRepository jobRepository, Step setup, 
Step scanSource, Step scanTarget, Step check) {
+
+        Flow f2 = new FlowBuilder<SimpleFlow>("source")
+                .start(scanSource)
+                .build();
+
+        Flow f3 = new FlowBuilder<SimpleFlow>("target")
+                .start(scanTarget)
+                .build();
+
+        Flow splitFlow = new FlowBuilder<SimpleFlow>("splitFlow")
+                .split(new SimpleAsyncTaskExecutor()) // Split execution in 
parallel
+                .add(f2, f3)
+                .build();
+
+        Flow sequentialFlow = new FlowBuilder<SimpleFlow>("sequentialFlow")
+                .start(setup)
+                .next(splitFlow)
+                .next(check)
+                .build();
+
+        // Define the job using the sequential flow
+        return new JobBuilder("CompareTwoAssetsJob", jobRepository)
+                .start(sequentialFlow)
+                .end()
+                .build();
+    }
+
+}
diff --git a/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCLocalApp.java 
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCLocalApp.java
new file mode 100644
index 00000000..34c9d9d0
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCLocalApp.java
@@ -0,0 +1,13 @@
+package org.apache.griffin.dqc;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class DQCLocalApp {
+
+    public static void main(String[] args) {
+        SpringApplication.run(DQCLocalApp.class, args);
+    }
+
+}
diff --git 
a/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCTriggerService.java 
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCTriggerService.java
new file mode 100644
index 00000000..3e311249
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCTriggerService.java
@@ -0,0 +1,39 @@
+package org.apache.griffin.dqc;
+
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.batch.core.launch.JobLauncher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class DQCTriggerService {
+
+    @Autowired
+    private JobLauncher jobLauncher;
+
+    @Autowired
+    private Job dqcFlow;
+
+    @GetMapping("/run")
+    public ResponseEntity<String> runDqcFlow(@RequestParam(required = false) 
String param) {
+        try {
+            // Create JobParameters (if any) or pass empty parameters
+            JobParameters jobParameters = new JobParametersBuilder()
+                    .addString("requestParam", param != null ? param : 
"default_value:"+System.currentTimeMillis())
+                    .toJobParameters();
+
+            // Launch the job
+            jobLauncher.run(dqcFlow, jobParameters);
+
+            return ResponseEntity.ok("DQC Flow job started successfully.");
+        } catch (Throwable e) {
+            return ResponseEntity.status(500).body("Job failed to start: " + 
e.getMessage());
+        }
+    }
+}
+
diff --git a/griffin-dqc/src/main/java/org/apache/griffin/dqc/IDQCFlow.java 
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/IDQCFlow.java
new file mode 100644
index 00000000..c43e70b7
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/IDQCFlow.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.dqc;
+
+public interface IDQCFlow {
+}
diff --git a/griffin-dqc/src/main/java/org/apache/griffin/dqc/SACFlow.java 
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/SACFlow.java
new file mode 100644
index 00000000..37e76fc3
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/SACFlow.java
@@ -0,0 +1,91 @@
+package org.apache.griffin.dqc;
+
+import org.springframework.batch.core.Step;
+import 
org.springframework.batch.core.listener.ExecutionContextPromotionListener;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.StepBuilder;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.PlatformTransactionManager;
+
+import java.util.Random;
+
+/**
+ * Split-Apply-Combine Flow
+ */
+@Component
+public class SACFlow implements IDQCFlow{
+    private final Random random;
+
+    public SACFlow() {
+        this.random = new Random();
+    }
+
+
+    @Bean
+    public Step setup(JobRepository jobRepository, PlatformTransactionManager 
transactionManager) {
+        return new StepBuilder("setup", jobRepository)
+                .tasklet((contribution, chunkContext) -> {
+                    
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("setup_key",
 "setup_value");
+                    Thread.sleep(random.nextInt(3000));
+                    return RepeatStatus.FINISHED;
+                }, transactionManager)
+                .listener(executionContextPromotionListener()) // Add listener
+                .build();
+    }
+
+    public ExecutionContextPromotionListener 
executionContextPromotionListener() {
+        ExecutionContextPromotionListener listener = new 
ExecutionContextPromotionListener();
+        listener.setKeys(new String[]{"setup_key", "source_count", 
"target_count"});
+
+        return listener;
+    }
+
+    @Bean
+    public Step scanSource(JobRepository jobRepository, 
PlatformTransactionManager transactionManager) {
+        return new StepBuilder("scanSource", 
jobRepository).tasklet((contribution, chunkContext) -> {
+                    System.out.println(System.currentTimeMillis() + " : " + 
Thread.currentThread() + ": running scanSource");
+                    String setupKey = (String) 
chunkContext.getStepContext().getJobExecutionContext().get("setup_key");
+                    System.out.println("setup_key from setup: " + setupKey);
+                    
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("source_count",
 100);
+                    Thread.sleep(random.nextInt(3000));
+                    return RepeatStatus.FINISHED;
+                }, transactionManager)
+                .listener(executionContextPromotionListener()) // Add listener
+                .build();
+    }
+
+    @Bean
+    public Step scanTarget(JobRepository jobRepository, 
PlatformTransactionManager transactionManager) {
+        return new StepBuilder("scanTarget", 
jobRepository).tasklet((contribution, chunkContext) -> {
+                    System.out.println(System.currentTimeMillis() + " : " + 
Thread.currentThread() + ": running scanTarget");
+                    Thread.sleep(random.nextInt(3000));
+                    
System.out.println(chunkContext.getStepContext().getJobExecutionContext().get("setup_key"));
+                    
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("target_count",
 100);
+                    return RepeatStatus.FINISHED;
+                }, transactionManager)
+                .listener(executionContextPromotionListener()) // Add listener
+                .build();
+    }
+
+    //    @Qualifier("step1")
+    @Bean
+    public Step check(JobRepository jobRepository, PlatformTransactionManager 
transactionManager) {
+        return new StepBuilder("check", jobRepository).tasklet((contribution, 
chunkContext) -> {
+            System.out.println(System.currentTimeMillis() + " : " + 
Thread.currentThread() + ": running check");
+            Thread.sleep(random.nextInt(3000));
+            
System.out.println(chunkContext.getStepContext().getJobExecutionContext().get("setup_key"));
+            int source_count = (int) 
chunkContext.getStepContext().getJobExecutionContext().get(("source_count"));
+            int target_count = (int) 
chunkContext.getStepContext().getJobExecutionContext().get(("target_count"));
+            if (source_count == target_count) {
+                System.out.println("source_count == target_count");
+            } else {
+                System.out.println("source_count != target_count");
+            }
+            return RepeatStatus.FINISHED;
+        }, transactionManager).build();
+    }
+
+
+}
diff --git a/griffin-dqc/src/main/resources/application.properties 
b/griffin-dqc/src/main/resources/application.properties
new file mode 100644
index 00000000..f40e3980
--- /dev/null
+++ b/griffin-dqc/src/main/resources/application.properties
@@ -0,0 +1,11 @@
+spring.application.name=griffin-dqc
+
+spring.datasource.url=jdbc:h2:mem:testdb
+spring.datasource.driverClassName=org.h2.Driver
+spring.datasource.username=sa
+spring.datasource.password=password
+spring.h2.console.enabled=true
+spring.batch.jdbc.initialize-schema=always
+spring.batch.job.enabled=true
+
+spring.sql.init.schema-locations=classpath:/org/springframework/batch/core/schema-h2.sql
\ No newline at end of file

Reply via email to