This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch griffin-2.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/griffin-2.0.0-dev by this push:
new 46789ff5 Griffin 2.0.0 dev (#672)
46789ff5 is described below
commit 46789ff5f56af7c43f8cee3c65192a3aea904b3f
Author: William Guo <[email protected]>
AuthorDate: Thu Jan 2 10:39:08 2025 +0800
Griffin 2.0.0 dev (#672)
* add metric test
* init create sql
* metrics persistence service
* remove relativePath
* local dqc app execution engine
* remove hardcoded version
---
griffin-bom/pom.xml | 2 +-
griffin-dqc/pom.xml | 38 +++++++++
.../org/apache/griffin/dqc/DQCFlowFactory.java | 71 +++++++++++++++++
.../java/org/apache/griffin/dqc/DQCLocalApp.java | 13 ++++
.../org/apache/griffin/dqc/DQCTriggerService.java | 39 ++++++++++
.../main/java/org/apache/griffin/dqc/IDQCFlow.java | 4 +
.../main/java/org/apache/griffin/dqc/SACFlow.java | 91 ++++++++++++++++++++++
.../src/main/resources/application.properties | 11 +++
8 files changed, 268 insertions(+), 1 deletion(-)
diff --git a/griffin-bom/pom.xml b/griffin-bom/pom.xml
index 2ed73a84..dcec850c 100644
--- a/griffin-bom/pom.xml
+++ b/griffin-bom/pom.xml
@@ -36,7 +36,7 @@ under the License.
<url>https://griffin.apache.org</url>
<properties>
- <spring-boot.version>2.7.3</spring-boot.version>
+ <spring-boot.version>3.3.4</spring-boot.version>
<mybatis-plus.version>3.5.2</mybatis-plus.version>
<commons-codec.version>1.11</commons-codec.version>
<commons-logging.version>1.1.1</commons-logging.version>
diff --git a/griffin-dqc/pom.xml b/griffin-dqc/pom.xml
index 7d503e5c..26dfd3dd 100644
--- a/griffin-dqc/pom.xml
+++ b/griffin-dqc/pom.xml
@@ -33,5 +33,43 @@ under the License.
<name>${project.artifactId}</name>
<url>https://griffin.apache.org</url>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.griffin</groupId>
+ <artifactId>griffin-bom</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-batch</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.batch</groupId>
+ <artifactId>spring-batch-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git
a/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCFlowFactory.java
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCFlowFactory.java
new file mode 100644
index 00000000..a9a357b6
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCFlowFactory.java
@@ -0,0 +1,71 @@
+package org.apache.griffin.dqc;
+import org.springframework.batch.core.Step;
+import
org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.job.builder.FlowBuilder;
+import org.springframework.batch.core.job.flow.Flow;
+import org.springframework.batch.core.job.flow.support.SimpleFlow;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.job.builder.JobBuilder;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.transaction.PlatformTransactionManager;
+
+
+@Configuration
+@EnableBatchProcessing
+public class DQCFlowFactory {
+ Logger logger = LoggerFactory.getLogger(DQCFlowFactory.class);
+
+ @Autowired
+ public JobRepository jobRepository;
+
+ @Autowired
+ public PlatformTransactionManager transactionManager;
+
+
+ /**
+ * This Job defines the one typical DQC workflow as below.
+ * <pre>
+ * -> FetchSourceCount
+ * / \
+ * PrepareStep --> --> CompareStep
+ * \ /
+ * -> FetchTargetCount
+ * </pre>
+ */
+ @Bean
+ public Job Split_Apply_Combine(JobRepository jobRepository, Step setup,
Step scanSource, Step scanTarget, Step check) {
+
+ Flow f2 = new FlowBuilder<SimpleFlow>("source")
+ .start(scanSource)
+ .build();
+
+ Flow f3 = new FlowBuilder<SimpleFlow>("target")
+ .start(scanTarget)
+ .build();
+
+ Flow splitFlow = new FlowBuilder<SimpleFlow>("splitFlow")
+ .split(new SimpleAsyncTaskExecutor()) // Split execution in
parallel
+ .add(f2, f3)
+ .build();
+
+ Flow sequentialFlow = new FlowBuilder<SimpleFlow>("sequentialFlow")
+ .start(setup)
+ .next(splitFlow)
+ .next(check)
+ .build();
+
+ // Define the job using the sequential flow
+ return new JobBuilder("CompareTwoAssetsJob", jobRepository)
+ .start(sequentialFlow)
+ .end()
+ .build();
+ }
+
+}
diff --git a/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCLocalApp.java
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCLocalApp.java
new file mode 100644
index 00000000..34c9d9d0
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCLocalApp.java
@@ -0,0 +1,13 @@
+package org.apache.griffin.dqc;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class DQCLocalApp {
+
+ public static void main(String[] args) {
+ SpringApplication.run(DQCLocalApp.class, args);
+ }
+
+}
diff --git
a/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCTriggerService.java
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCTriggerService.java
new file mode 100644
index 00000000..3e311249
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/DQCTriggerService.java
@@ -0,0 +1,39 @@
+package org.apache.griffin.dqc;
+
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.batch.core.launch.JobLauncher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class DQCTriggerService {
+
+ @Autowired
+ private JobLauncher jobLauncher;
+
+ @Autowired
+ private Job dqcFlow;
+
+ @GetMapping("/run")
+ public ResponseEntity<String> runDqcFlow(@RequestParam(required = false)
String param) {
+ try {
+ // Create JobParameters (if any) or pass empty parameters
+ JobParameters jobParameters = new JobParametersBuilder()
+ .addString("requestParam", param != null ? param :
"default_value:"+System.currentTimeMillis())
+ .toJobParameters();
+
+ // Launch the job
+ jobLauncher.run(dqcFlow, jobParameters);
+
+ return ResponseEntity.ok("DQC Flow job started successfully.");
+ } catch (Throwable e) {
+ return ResponseEntity.status(500).body("Job failed to start: " +
e.getMessage());
+ }
+ }
+}
+
diff --git a/griffin-dqc/src/main/java/org/apache/griffin/dqc/IDQCFlow.java
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/IDQCFlow.java
new file mode 100644
index 00000000..c43e70b7
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/IDQCFlow.java
@@ -0,0 +1,4 @@
+package org.apache.griffin.dqc;
+
+public interface IDQCFlow {
+}
diff --git a/griffin-dqc/src/main/java/org/apache/griffin/dqc/SACFlow.java
b/griffin-dqc/src/main/java/org/apache/griffin/dqc/SACFlow.java
new file mode 100644
index 00000000..37e76fc3
--- /dev/null
+++ b/griffin-dqc/src/main/java/org/apache/griffin/dqc/SACFlow.java
@@ -0,0 +1,91 @@
+package org.apache.griffin.dqc;
+
+import org.springframework.batch.core.Step;
+import
org.springframework.batch.core.listener.ExecutionContextPromotionListener;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.StepBuilder;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.PlatformTransactionManager;
+
+import java.util.Random;
+
+/**
+ * Split-Apply-Combine Flow
+ */
+@Component
+public class SACFlow implements IDQCFlow{
+ private final Random random;
+
+ public SACFlow() {
+ this.random = new Random();
+ }
+
+
+ @Bean
+ public Step setup(JobRepository jobRepository, PlatformTransactionManager
transactionManager) {
+ return new StepBuilder("setup", jobRepository)
+ .tasklet((contribution, chunkContext) -> {
+
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("setup_key",
"setup_value");
+ Thread.sleep(random.nextInt(3000));
+ return RepeatStatus.FINISHED;
+ }, transactionManager)
+ .listener(executionContextPromotionListener()) // Add listener
+ .build();
+ }
+
+ public ExecutionContextPromotionListener
executionContextPromotionListener() {
+ ExecutionContextPromotionListener listener = new
ExecutionContextPromotionListener();
+ listener.setKeys(new String[]{"setup_key", "source_count",
"target_count"});
+
+ return listener;
+ }
+
+ @Bean
+ public Step scanSource(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
+ return new StepBuilder("scanSource",
jobRepository).tasklet((contribution, chunkContext) -> {
+ System.out.println(System.currentTimeMillis() + " : " +
Thread.currentThread() + ": running scanSource");
+ String setupKey = (String)
chunkContext.getStepContext().getJobExecutionContext().get("setup_key");
+ System.out.println("setup_key from setup: " + setupKey);
+
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("source_count",
100);
+ Thread.sleep(random.nextInt(3000));
+ return RepeatStatus.FINISHED;
+ }, transactionManager)
+ .listener(executionContextPromotionListener()) // Add listener
+ .build();
+ }
+
+ @Bean
+ public Step scanTarget(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
+ return new StepBuilder("scanTarget",
jobRepository).tasklet((contribution, chunkContext) -> {
+ System.out.println(System.currentTimeMillis() + " : " +
Thread.currentThread() + ": running scanTarget");
+ Thread.sleep(random.nextInt(3000));
+
System.out.println(chunkContext.getStepContext().getJobExecutionContext().get("setup_key"));
+
chunkContext.getStepContext().getStepExecution().getExecutionContext().put("target_count",
100);
+ return RepeatStatus.FINISHED;
+ }, transactionManager)
+ .listener(executionContextPromotionListener()) // Add listener
+ .build();
+ }
+
+ // @Qualifier("step1")
+ @Bean
+ public Step check(JobRepository jobRepository, PlatformTransactionManager
transactionManager) {
+ return new StepBuilder("check", jobRepository).tasklet((contribution,
chunkContext) -> {
+ System.out.println(System.currentTimeMillis() + " : " +
Thread.currentThread() + ": running check");
+ Thread.sleep(random.nextInt(3000));
+
System.out.println(chunkContext.getStepContext().getJobExecutionContext().get("setup_key"));
+ int source_count = (int)
chunkContext.getStepContext().getJobExecutionContext().get(("source_count"));
+ int target_count = (int)
chunkContext.getStepContext().getJobExecutionContext().get(("target_count"));
+ if (source_count == target_count) {
+ System.out.println("source_count == target_count");
+ } else {
+ System.out.println("source_count != target_count");
+ }
+ return RepeatStatus.FINISHED;
+ }, transactionManager).build();
+ }
+
+
+}
diff --git a/griffin-dqc/src/main/resources/application.properties
b/griffin-dqc/src/main/resources/application.properties
new file mode 100644
index 00000000..f40e3980
--- /dev/null
+++ b/griffin-dqc/src/main/resources/application.properties
@@ -0,0 +1,11 @@
+spring.application.name=griffin-dqc
+
+spring.datasource.url=jdbc:h2:mem:testdb
+spring.datasource.driverClassName=org.h2.Driver
+spring.datasource.username=sa
+spring.datasource.password=password
+spring.h2.console.enabled=true
+spring.batch.jdbc.initialize-schema=always
+spring.batch.job.enabled=true
+
+spring.sql.init.schema-locations=classpath:/org/springframework/batch/core/schema-h2.sql
\ No newline at end of file