guoyuepeng commented on PR #667: URL: https://github.com/apache/griffin/pull/667#issuecomment-2410586185
@toyboxman the following is the poc, based on which i am designing the local workflow. `package demo.workflows; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.stereotype.Component; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.transaction.PlatformTransactionManager; @Component public class DQCompareFlow { @Autowired public JobRepository jobRepository; @Autowired public PlatformTransactionManager transactionManager; Logger logger = LoggerFactory.getLogger(DQCompareFlow.class); @Bean public Job dqcFlow() { return new JobBuilder("DQC_JOB", this.jobRepository) .start(new StepBuilder("setup", this.jobRepository).tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { logger.info("PrepareJob was run"); return RepeatStatus.FINISHED; } }, transactionManager).build()) .split(new SimpleAsyncTaskExecutor()).add( new FlowBuilder<Flow>("fetchSourceCount") .start(new StepBuilder("fetchSourceStep", this.jobRepository).tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { logger.info("FetchSourceStep started"); Thread.sleep(90000); // Sleep for 90 seconds logger.info("FetchSourceStep was run"); return RepeatStatus.FINISHED; } }, transactionManager).build()) .build(), new FlowBuilder<Flow>("fetchTargetCount") .start(new StepBuilder("fetchTargetStep", this.jobRepository).tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { logger.info("FetchTargetStep was run"); return RepeatStatus.FINISHED; } }, transactionManager).build()) .build() ) .next(new StepBuilder("compareResult", this.jobRepository).tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { logger.info("CompareStep was run"); return RepeatStatus.FINISHED; } }, transactionManager).build()) .end() // Add this line to end the job definition .build(); // Add this line to build the Job object } } ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@griffin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org