vinothchandar commented on a change in pull request #991: Hudi Test Suite 
(Refactor) 
URL: https://github.com/apache/incubator-hudi/pull/991#discussion_r344980862
 
 

 ##########
 File path: hudi-bench/src/main/java/org/apache/hudi/bench/dag/DagUtils.java
 ##########
 @@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bench.dag;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.bench.configuration.DeltaConfig.Config;
+import org.apache.hudi.bench.dag.nodes.BulkInsertNode;
+import org.apache.hudi.bench.dag.nodes.CompactNode;
+import org.apache.hudi.bench.dag.nodes.DagNode;
+import org.apache.hudi.bench.dag.nodes.HiveQueryNode;
+import org.apache.hudi.bench.dag.nodes.HiveSyncNode;
+import org.apache.hudi.bench.dag.nodes.InsertNode;
+import org.apache.hudi.bench.dag.nodes.RollbackNode;
+import org.apache.hudi.bench.dag.nodes.ScheduleCompactNode;
+import org.apache.hudi.bench.dag.nodes.SparkSQLQueryNode;
+import org.apache.hudi.bench.dag.nodes.UpsertNode;
+import org.apache.hudi.bench.dag.nodes.ValidateNode;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * Utility class to SerDe workflow dag
+ */
+public class DagUtils {
+
+  static final ObjectMapper mapper = new ObjectMapper();
+
+  /**
+   * Converts a YAML path to {@link WorkflowDag}
+   */
+  public static WorkflowDag convertYamlPathToDag(FileSystem fs, String path) 
throws IOException {
+    InputStream is = fs.open(new Path(path));
+    return convertYamlToDag(toString(is));
+  }
+
+  /**
+   * Converts a YAML representation to {@link WorkflowDag}
+   */
+  public static WorkflowDag convertYamlToDag(String yaml) throws IOException {
+    Map<String, DagNode> allNodes = new HashMap<>();
+    final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
+    final JsonNode jsonNode = yamlReader.readTree(yaml);
+    Iterator<Entry<String, JsonNode>> itr = jsonNode.fields();
+    while (itr.hasNext()) {
+      Entry<String, JsonNode> dagNode = itr.next();
+      allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, 
dagNode.getValue()));
+    }
+    return new WorkflowDag(findRootNodes(allNodes));
+  }
+
+  /**
+   * Converts {@link WorkflowDag} to a YAML representation
+   */
+  public static String convertDagToYaml(WorkflowDag dag) throws IOException {
+    final ObjectMapper yamlWriter = new ObjectMapper(new 
YAMLFactory().disable(Feature.WRITE_DOC_START_MARKER)
+        
.enable(Feature.MINIMIZE_QUOTES).enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES));
+    JsonNode yamlNode = mapper.createObjectNode();
+    convertDagToYaml(yamlNode, dag.getNodeList());
+    return 
yamlWriter.writerWithDefaultPrettyPrinter().writeValueAsString(yamlNode);
+  }
+
+  private static void convertDagToYaml(JsonNode yamlNode, List<DagNode> 
dagNodes) throws IOException {
+    for (DagNode dagNode : dagNodes) {
+      String name = 
dagNode.getConfig().getOtherConfigs().getOrDefault(Config.NODE_NAME, 
dagNode.getName()).toString();
+      ((ObjectNode) yamlNode).put(name, convertDagNodeToJsonNode(dagNode));
+      if (dagNode.getChildNodes().size() > 0) {
+        convertDagToYaml(yamlNode, dagNode.getChildNodes());
+      }
+    }
+  }
+
+  private static DagNode convertJsonToDagNode(Map<String, DagNode> allNodes, 
JsonNode node) throws IOException {
+    String type = node.get(Config.TYPE).asText();
+    final DagNode retNode = convertJsonToDagNode(node, type);
+    
Arrays.asList(node.get(Config.DEPENDENCIES).textValue().split(",")).stream().forEach(dep
 -> {
+      DagNode parentNode = allNodes.get(dep);
+      if (parentNode != null) {
+        parentNode.addChildNode(retNode);
+      }
+    });
+    return retNode;
+  }
+
+  private static List<DagNode> findRootNodes(Map<String, DagNode> allNodes) {
+    final List<DagNode> rootNodes = new ArrayList<>();
+    allNodes.entrySet().stream().forEach(entry -> {
+      if (entry.getValue().getParentNodes().size() < 1) {
+        rootNodes.add(entry.getValue());
+      }
+    });
+    return rootNodes;
+  }
+
+  private static DagNode convertJsonToDagNode(JsonNode node, String type) {
 
 Review comment:
   +1. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to