yanghua commented on a change in pull request #991: Hudi Test Suite (Refactor)
URL: https://github.com/apache/incubator-hudi/pull/991#discussion_r341897178
##########
File path:
hudi-bench/src/main/java/org/apache/hudi/bench/dag/scheduler/DagScheduler.java
##########
@@ -0,0 +1,113 @@
+package org.apache.hudi.bench.dag.scheduler;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.hudi.bench.dag.WorkflowDag;
+import org.apache.hudi.bench.dag.nodes.BulkInsertNode;
+import org.apache.hudi.bench.dag.nodes.CleanNode;
+import org.apache.hudi.bench.dag.nodes.CompactNode;
+import org.apache.hudi.bench.dag.nodes.DagNode;
+import org.apache.hudi.bench.dag.nodes.HiveQueryNode;
+import org.apache.hudi.bench.dag.nodes.HiveSyncNode;
+import org.apache.hudi.bench.dag.nodes.InsertNode;
+import org.apache.hudi.bench.dag.nodes.RollbackNode;
+import org.apache.hudi.bench.dag.nodes.ScheduleCompactNode;
+import org.apache.hudi.bench.dag.nodes.UpsertNode;
+import org.apache.hudi.bench.dag.nodes.ValidateNode;
+import org.apache.hudi.bench.generator.DeltaGenerator;
+import org.apache.hudi.bench.writer.DeltaWriter;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class DagScheduler {
+
+ private static Logger log = LogManager.getLogger(DagScheduler.class);
+ private WorkflowDag workflowDag;
+ private DeltaGenerator deltaGenerator;
+ private DeltaWriter deltaWriter;
+
+ public DagScheduler(WorkflowDag workflowDag, DeltaWriter deltaWriter,
DeltaGenerator deltaGenerator) {
+ this.workflowDag = workflowDag;
+ this.deltaWriter = deltaWriter;
+ this.deltaGenerator = deltaGenerator;
+ }
+
+ public void schedule() throws Exception {
+ ExecutorService service = Executors.newFixedThreadPool(2);
+ try {
+ execute(service, workflowDag.getNodeList());
+ service.shutdown();
+ } finally {
+ if (!service.isShutdown()) {
+ log.info("Forcing shutdown of executor service, this might kill
running tasks");
+ service.shutdownNow();
+ }
+ }
+ }
+
+ private void execute(ExecutorService service, List<DagNode> nodes) throws
Exception {
+ // Nodes at the same level are executed in parallel
+ Queue<DagNode> queue = new PriorityQueue<>(nodes);
+ log.info("----------- Running workloads ----------");
+ do {
+ List<Future> futures = new ArrayList<>();
+ Set<DagNode> childNodes = new HashSet<>();
+ while (queue.size() > 0) {
+ DagNode nodeToExecute = queue.poll();
+ futures.add(service.submit(() -> executeNode(nodeToExecute)));
+ if (nodeToExecute.getChildNodes().size() > 0) {
+ childNodes.addAll(nodeToExecute.getChildNodes());
+ }
+ }
+ queue.addAll(childNodes);
+ childNodes.clear();
+ for (Future future : futures) {
+ future.get(1, TimeUnit.HOURS);
+ }
+ } while (queue.size() > 0);
+ log.info("----------- Finished workloads ----------");
+ }
+
+ private void executeNode(DagNode node) {
+ if (node.isCompleted()) {
+ throw new RuntimeException("DagNode already completed! Cannot
re-execute");
+ }
+ try {
+ if (node instanceof InsertNode) {
Review comment:
With refactoring `DagNode` (provide an abstract `execute` method), here we
could replace these `if/else` with a single line like:
```
node.execute(xxx);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services