This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7a7e55d8b20b231bbc5cbededc480636d5d916c0
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Aug 2 09:13:06 2022 +0530

    [HUDI-4025] Add Presto and Trino query node to validate queries (#5578)
    
    * Add Presto and Trino query nodes to hudi-integ-test
    * Add yamls for query validation
    * Add presto-jdbc and trino-jdbc to integ-test-bundle
---
 .../test-suite/deltastreamer-hive-sync-presto.yaml |  36 ++++---
 hudi-integ-test/pom.xml                            |  10 ++
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |  23 +++++
 .../integ/testsuite/configuration/DeltaConfig.java |  54 +++++++++-
 .../apache/hudi/integ/testsuite/dag/DagUtils.java  | 114 ++++++++++++++-------
 .../integ/testsuite/dag/nodes/BaseQueryNode.java   |  62 +++++++++++
 .../integ/testsuite/dag/nodes/HiveQueryNode.java   |  46 +++------
 .../integ/testsuite/dag/nodes/PrestoQueryNode.java |  60 +++++++++++
 .../integ/testsuite/dag/nodes/TrinoQueryNode.java  |  60 +++++++++++
 .../src/test/resources/unit-test-cow-dag.yaml      |  22 +++-
 packaging/hudi-integ-test-bundle/pom.xml           |   2 +
 pom.xml                                            |  14 +++
 12 files changed, 414 insertions(+), 89 deletions(-)

diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml 
b/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml
similarity index 72%
copy from hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
copy to docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml
index 23691659ca..61ea13c18e 100644
--- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 dag_name: unit-test-cow-dag
 dag_rounds: 1
 dag_intermittent_delay_mins: 10
@@ -33,10 +34,6 @@ dag_content:
       num_records_insert: 100
     type: InsertNode
     deps: first_insert
-  first_rollback:
-    config:
-    deps: second_insert
-    type: RollbackNode
   third_insert:
     config:
       record_size: 70000
@@ -44,7 +41,7 @@ dag_content:
       repeat_count: 1
       num_records_insert: 100
     type: InsertNode
-    deps: first_rollback
+    deps: second_insert
   first_upsert:
     config:
       record_size: 70000
@@ -59,16 +56,23 @@ dag_content:
       engine: "mr"
     type: HiveSyncNode
     deps: first_upsert
-  first_hive_query:
+  first_presto_query:
     config:
-      hive_props:
-        prop2: "set spark.yarn.queue="
-        prop3: "set hive.strict.checks.large.query=false"
-        prop4: "set hive.stats.autogather=false"
-      hive_queries:
-        query1: "select count(*) from testdb1.table1"
-        result1: 300
-        query2: "select count(*) from testdb1.table1 group   by `_row_key` 
having count(*) > 1"
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 400
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
         result2: 0
-    type: HiveQueryNode
-    deps: first_hive_sync
\ No newline at end of file
+    type: PrestoQueryNode
+    deps: first_hive_sync
+#  first_trino_query:
+#    config:
+#      trino_queries:
+#        query1: "select count(*) from testdb1.table1"
+#        result1: 300
+#        query2: "select count(*) from testdb1.table1 group   by `_row_key` 
having count(*) > 1"
+#        result2: 0
+#    type: TrinoQueryNode
+#    deps: first_presto_query
\ No newline at end of file
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index 04e55fc086..eb08de742e 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -394,6 +394,16 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-jdbc</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.trino</groupId>
+      <artifactId>trino-jdbc</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 5e2f9812ba..8adea6b179 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -62,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 
 /**
  * This is the entry point for running a Hudi Test Suite. Although this class 
has similarities with {@link HoodieDeltaStreamer} this class does not extend it 
since do not want to create a dependency
@@ -317,5 +318,27 @@ public class HoodieTestSuiteJob {
 
     @Parameter(names = {"--test-continuous-mode"}, description = "Tests 
continuous mode in deltastreamer.")
     public Boolean testContinousMode = false;
+
+    @Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL 
in the format jdbc:presto://<host>:<port>/<catalog>/<schema>  "
+        + "e.g. URL to connect to Presto running on localhost port 8080 with 
the catalog `hive` and the schema `sales`: "
+        + "jdbc:presto://localhost:8080/hive/sales")
+    public String prestoJdbcUrl = EMPTY_STRING;
+
+    @Parameter(names = {"--presto-jdbc-username"}, description = "Username to 
use for authentication")
+    public String prestoUsername = "test";
+
+    @Parameter(names = {"--presto-jdbc-password"}, description = "Password 
corresponding to the username to use for authentication")
+    public String prestoPassword;
+
+    @Parameter(names = {"--trino-jdbc-url"}, description = "Trino JDBC URL in 
the format jdbc:trino://<host>:<port>/<catalog>/<schema>  "
+        + "e.g. URL to connect to Trino running on localhost port 8080 with 
the catalog `hive` and the schema `sales`: "
+        + "jdbc:trino://localhost:8080/hive/sales")
+    public String trinoJdbcUrl = EMPTY_STRING;
+
+    @Parameter(names = {"--trino-jdbc-username"}, description = "Username to 
use for authentication")
+    public String trinoUsername = "test";
+
+    @Parameter(names = {"--trino-jdbc-password"}, description = "Password 
corresponding to the username to use for authentication")
+    public String trinoPassword;
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index a781d19cb7..03182d2784 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -43,7 +43,7 @@ public class DeltaConfig implements Serializable {
   private final SerializableConfiguration configuration;
 
   public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType 
deltaInputType,
-                     SerializableConfiguration configuration) {
+      SerializableConfiguration configuration) {
     this.deltaOutputMode = deltaOutputMode;
     this.deltaInputType = deltaInputType;
     this.configuration = configuration;
@@ -74,6 +74,10 @@ public class DeltaConfig implements Serializable {
     public static final String CHILDREN = "children";
     public static final String HIVE_QUERIES = "hive_queries";
     public static final String HIVE_PROPERTIES = "hive_props";
+    public static final String PRESTO_QUERIES = "presto_queries";
+    public static final String PRESTO_PROPERTIES = "presto_props";
+    public static final String TRINO_QUERIES = "trino_queries";
+    public static final String TRINO_PROPERTIES = "trino_props";
     private static String NUM_RECORDS_INSERT = "num_records_insert";
     private static String NUM_RECORDS_UPSERT = "num_records_upsert";
     private static String NUM_RECORDS_DELETE = "num_records_delete";
@@ -283,7 +287,7 @@ public class DeltaConfig implements Serializable {
 
     public Option<String> getPartitionField() {
       return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
-              : Option.of(configsMap.get(PARTITION_FIELD).toString());
+          : Option.of(configsMap.get(PARTITION_FIELD).toString());
     }
 
     public String getMergeCondition() {
@@ -319,7 +323,7 @@ public class DeltaConfig implements Serializable {
 
     public List<Pair<String, Integer>> getHiveQueries() {
       try {
-        return (List<Pair<String, Integer>>) 
this.configsMap.getOrDefault("hive_queries", new ArrayList<>());
+        return (List<Pair<String, Integer>>) 
this.configsMap.getOrDefault(HIVE_QUERIES, new ArrayList<>());
       } catch (Exception e) {
         throw new RuntimeException("unable to get hive queries from configs");
       }
@@ -333,6 +337,30 @@ public class DeltaConfig implements Serializable {
       return (List<String>) this.configsMap.getOrDefault(HIVE_PROPERTIES, new 
ArrayList<>());
     }
 
+    public List<String> getPrestoProperties() {
+      return (List<String>) this.configsMap.getOrDefault(PRESTO_PROPERTIES, 
new ArrayList<>());
+    }
+
+    public List<String> getTrinoProperties() {
+      return (List<String>) this.configsMap.getOrDefault(TRINO_PROPERTIES, new 
ArrayList<>());
+    }
+
+    public List<Pair<String, Integer>> getPrestoQueries() {
+      try {
+        return (List<Pair<String, Integer>>) 
this.configsMap.getOrDefault(PRESTO_QUERIES, new ArrayList<>());
+      } catch (Exception e) {
+        throw new RuntimeException("unable to get presto queries from 
configs");
+      }
+    }
+
+    public List<Pair<String, Integer>> getTrinoQueries() {
+      try {
+        return (List<Pair<String, Integer>>) 
this.configsMap.getOrDefault(TRINO_QUERIES, new ArrayList<>());
+      } catch (Exception e) {
+        throw new RuntimeException("unable to get trino queries from configs");
+      }
+    }
+
     @Override
     public String toString() {
       try {
@@ -449,6 +477,26 @@ public class DeltaConfig implements Serializable {
         return this;
       }
 
+      public Builder withPrestoProperties(List<String> prestoProperties) {
+        this.configsMap.put(PRESTO_PROPERTIES, prestoProperties);
+        return this;
+      }
+
+      public Builder withTrinoProperties(List<String> trinoProperties) {
+        this.configsMap.put(TRINO_PROPERTIES, trinoProperties);
+        return this;
+      }
+
+      public Builder withPrestoQueryAndResults(List<Pair<String, Integer>> 
prestoQueries) {
+        this.configsMap.put(PRESTO_QUERIES, prestoQueries);
+        return this;
+      }
+
+      public Builder withTrinoQueryAndResults(List<Pair<String, Integer>> 
trinoQueries) {
+        this.configsMap.put(TRINO_QUERIES, trinoQueries);
+        return this;
+      }
+
       public Builder withConfigsMap(Map<String, Object> configsMap) {
         this.configsMap = configsMap;
         return this;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
index 789d7e3423..1d78d0fdbb 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
@@ -53,7 +53,13 @@ import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME;
+import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_PROPERTIES;
+import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_QUERIES;
 import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.NO_DEPENDENCY_VALUE;
+import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_PROPERTIES;
+import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_QUERIES;
+import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.TRINO_PROPERTIES;
+import static 
org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.TRINO_QUERIES;
 
 /**
  * Utility class to SerDe workflow dag.
@@ -172,7 +178,8 @@ public class DagUtils {
       DeltaConfig.Config config = 
DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
           .withName(name).build();
       return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
-    } catch (ClassNotFoundException e) {
+    }
+    catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
   }
@@ -192,11 +199,23 @@ public class DagUtils {
     while (itr.hasNext()) {
       Entry<String, JsonNode> entry = itr.next();
       switch (entry.getKey()) {
-        case DeltaConfig.Config.HIVE_QUERIES:
-          configsMap.put(DeltaConfig.Config.HIVE_QUERIES, 
getHiveQueries(entry));
+        case HIVE_QUERIES:
+          configsMap.put(HIVE_QUERIES, getQueries(entry));
+          break;
+        case HIVE_PROPERTIES:
+          configsMap.put(HIVE_PROPERTIES, getQuerySessionProperties(entry));
+          break;
+        case PRESTO_QUERIES:
+          configsMap.put(PRESTO_QUERIES, getQueries(entry));
+          break;
+        case PRESTO_PROPERTIES:
+          configsMap.put(PRESTO_PROPERTIES, getQuerySessionProperties(entry));
+          break;
+        case TRINO_QUERIES:
+          configsMap.put(TRINO_QUERIES, getQueries(entry));
           break;
-        case DeltaConfig.Config.HIVE_PROPERTIES:
-          configsMap.put(DeltaConfig.Config.HIVE_PROPERTIES, 
getProperties(entry));
+        case TRINO_PROPERTIES:
+          configsMap.put(TRINO_PROPERTIES, getQuerySessionProperties(entry));
           break;
         default:
           configsMap.put(entry.getKey(), getValue(entry.getValue()));
@@ -206,25 +225,27 @@ public class DagUtils {
     return configsMap;
   }
 
-  private static List<Pair<String, Integer>> getHiveQueries(Entry<String, 
JsonNode> entry) {
+  private static List<Pair<String, Integer>> getQueries(Entry<String, 
JsonNode> entry) {
     List<Pair<String, Integer>> queries = new ArrayList<>();
     try {
       List<JsonNode> flattened = new ArrayList<>();
       flattened.add(entry.getValue());
-      queries = (List<Pair<String, 
Integer>>)getHiveQueryMapper().readValue(flattened.toString(), List.class);
-    } catch (Exception e) {
+      queries = (List<Pair<String, Integer>>) 
getQueryMapper().readValue(flattened.toString(), List.class);
+    }
+    catch (Exception e) {
       e.printStackTrace();
     }
     return queries;
   }
 
-  private static List<String> getProperties(Entry<String, JsonNode> entry) {
+  private static List<String> getQuerySessionProperties(Entry<String, 
JsonNode> entry) {
     List<String> properties = new ArrayList<>();
     try {
       List<JsonNode> flattened = new ArrayList<>();
       flattened.add(entry.getValue());
-      properties = 
(List<String>)getHivePropertyMapper().readValue(flattened.toString(), 
List.class);
-    } catch (Exception e) {
+      properties = (List<String>) 
getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class);
+    }
+    catch (Exception e) {
       e.printStackTrace();
     }
     return properties;
@@ -233,15 +254,20 @@ public class DagUtils {
   private static Object getValue(JsonNode node) {
     if (node.isInt()) {
       return node.asInt();
-    } else if (node.isLong()) {
+    }
+    else if (node.isLong()) {
       return node.asLong();
-    } else if (node.isShort()) {
+    }
+    else if (node.isShort()) {
       return node.asInt();
-    } else if (node.isBoolean()) {
+    }
+    else if (node.isBoolean()) {
       return node.asBoolean();
-    } else if (node.isDouble()) {
+    }
+    else if (node.isDouble()) {
       return node.asDouble();
-    } else if (node.isFloat()) {
+    }
+    else if (node.isFloat()) {
       return node.asDouble();
     }
     return node.textValue();
@@ -254,13 +280,28 @@ public class DagUtils {
     while (itr.hasNext()) {
       Entry<String, JsonNode> entry = itr.next();
       switch (entry.getKey()) {
-        case DeltaConfig.Config.HIVE_QUERIES:
-          ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES,
-              
MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+        case HIVE_QUERIES:
+          ((ObjectNode) configNode).put(HIVE_QUERIES,
+              
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+          break;
+        case HIVE_PROPERTIES:
+          ((ObjectNode) configNode).put(HIVE_PROPERTIES,
+              
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
+        case PRESTO_QUERIES:
+          ((ObjectNode) configNode).put(PRESTO_QUERIES,
+              
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+          break;
+        case PRESTO_PROPERTIES:
+          ((ObjectNode) configNode).put(PRESTO_PROPERTIES,
+              
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
           break;
-        case DeltaConfig.Config.HIVE_PROPERTIES:
-          ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES,
-              
MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
+        case TRINO_QUERIES:
+          ((ObjectNode) configNode).put(TRINO_QUERIES,
+              
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
+          break;
+        case TRINO_PROPERTIES:
+          ((ObjectNode) configNode).put(TRINO_PROPERTIES,
+              
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
           break;
         default:
           break;
@@ -293,21 +334,22 @@ public class DagUtils {
     return result.toString("utf-8");
   }
 
-  private static ObjectMapper getHiveQueryMapper() {
+  private static ObjectMapper getQueryMapper() {
     SimpleModule module = new SimpleModule();
     ObjectMapper queryMapper = new ObjectMapper();
-    module.addSerializer(List.class, new HiveQuerySerializer());
-    module.addDeserializer(List.class, new HiveQueryDeserializer());
+    module.addSerializer(List.class, new QuerySerializer());
+    module.addDeserializer(List.class, new QueryDeserializer());
     queryMapper.registerModule(module);
     return queryMapper;
   }
 
-  private static final class HiveQuerySerializer extends JsonSerializer<List> {
+  private static final class QuerySerializer extends JsonSerializer<List> {
     Integer index = 0;
+
     @Override
     public void serialize(List pairs, JsonGenerator gen, SerializerProvider 
serializers) throws IOException {
       gen.writeStartObject();
-      for (Pair pair : (List<Pair>)pairs) {
+      for (Pair pair : (List<Pair>) pairs) {
         gen.writeStringField("query" + index, pair.getLeft().toString());
         gen.writeNumberField("result" + index, 
Integer.parseInt(pair.getRight().toString()));
         index++;
@@ -316,7 +358,7 @@ public class DagUtils {
     }
   }
 
-  private static final class HiveQueryDeserializer extends 
JsonDeserializer<List> {
+  private static final class QueryDeserializer extends JsonDeserializer<List> {
     @Override
     public List deserialize(JsonParser parser, DeserializationContext context) 
throws IOException {
       List<Pair<String, Integer>> pairs = new ArrayList<>();
@@ -334,7 +376,8 @@ public class DagUtils {
 
           if (fieldName.contains("query")) {
             query = parser.getValueAsString();
-          } else if (fieldName.contains("result")) {
+          }
+          else if (fieldName.contains("result")) {
             result = parser.getValueAsInt();
             pairs.add(Pair.of(query, result));
           }
@@ -344,21 +387,22 @@ public class DagUtils {
     }
   }
 
-  private static ObjectMapper getHivePropertyMapper() {
+  private static ObjectMapper getQueryEnginePropertyMapper() {
     SimpleModule module = new SimpleModule();
     ObjectMapper propMapper = new ObjectMapper();
-    module.addSerializer(List.class, new HivePropertySerializer());
-    module.addDeserializer(List.class, new HivePropertyDeserializer());
+    module.addSerializer(List.class, new QueryEnginePropertySerializer());
+    module.addDeserializer(List.class, new QueryEnginePropertyDeserializer());
     propMapper.registerModule(module);
     return propMapper;
   }
 
-  private static final class HivePropertySerializer extends 
JsonSerializer<List> {
+  private static final class QueryEnginePropertySerializer extends 
JsonSerializer<List> {
     Integer index = 0;
+
     @Override
     public void serialize(List props, JsonGenerator gen, SerializerProvider 
serializers) throws IOException {
       gen.writeStartObject();
-      for (String prop : (List<String>)props) {
+      for (String prop : (List<String>) props) {
         gen.writeStringField("prop" + index, prop);
         index++;
       }
@@ -366,7 +410,7 @@ public class DagUtils {
     }
   }
 
-  private static final class HivePropertyDeserializer extends 
JsonDeserializer<List> {
+  private static final class QueryEnginePropertyDeserializer extends 
JsonDeserializer<List> {
     @Override
     public List deserialize(JsonParser parser, DeserializationContext context) 
throws IOException {
       List<String> props = new ArrayList<>();
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java
new file mode 100644
index 0000000000..c1a3d23791
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+public abstract class BaseQueryNode extends DagNode<Boolean> {
+
+  public void setSessionProperties(List<String> properties, Statement stmt) 
throws SQLException {
+    for (String prop : properties) {
+      executeStatement(prop, stmt);
+    }
+  }
+
+  public void executeAndValidateQueries(List<Pair<String, Integer>> 
queriesWithResult, Statement stmt) throws SQLException {
+    for (Pair<String, Integer> queryAndResult : queriesWithResult) {
+      log.info("Running {}", queryAndResult.getLeft());
+      ResultSet res = stmt.executeQuery(queryAndResult.getLeft());
+      if (!res.next()) {
+        log.info("res.next() was False - typically this means the query 
returned no rows.");
+        assert 0 == queryAndResult.getRight();
+      }
+      else {
+        Integer result = res.getInt(1);
+        if (!queryAndResult.getRight().equals(result)) {
+          throw new AssertionError(
+              "QUERY: " + queryAndResult.getLeft()
+                  + " | EXPECTED RESULT = " + queryAndResult.getRight()
+                  + " | ACTUAL RESULT = " + result
+          );
+        }
+      }
+      log.info("Successfully validated query!");
+    }
+  }
+
+  private void executeStatement(String query, Statement stmt) throws 
SQLException {
+    log.info("Executing statement {}", stmt.toString());
+    stmt.execute(query);
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index 2da5c558d2..fd04cc34c6 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
@@ -28,8 +28,6 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Statement;
 
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
@@ -39,7 +37,7 @@ import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
 /**
  * A hive query node in the DAG of operations for a workflow. used to perform 
a hive query with given config.
  */
-public class HiveQueryNode extends DagNode<Boolean> {
+public class HiveQueryNode extends BaseQueryNode {
 
   private HiveServiceProvider hiveServiceProvider;
 
@@ -61,37 +59,17 @@ public class HiveQueryNode extends DagNode<Boolean> {
         .getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
     
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
-    Connection con = 
DriverManager.getConnection(hiveSyncConfig.getString(HIVE_URL),
-        hiveSyncConfig.getString(HIVE_USER), 
hiveSyncConfig.getString(HIVE_PASS));
-    Statement stmt = con.createStatement();
-    stmt.execute("set 
hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
-    for (String hiveProperty : this.config.getHiveProperties()) {
-      executeStatement(hiveProperty, stmt);
+    try (Connection con = 
DriverManager.getConnection(hiveSyncConfig.getString(HIVE_URL),
+        hiveSyncConfig.getString(HIVE_USER), 
hiveSyncConfig.getString(HIVE_PASS))) {
+      Statement stmt = con.createStatement();
+      stmt.execute("set 
hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
+      setSessionProperties(this.config.getHiveProperties(), stmt);
+      executeAndValidateQueries(this.config.getHiveQueries(), stmt);
+      stmt.close();
+      this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
     }
-    for (Pair<String, Integer> queryAndResult : this.config.getHiveQueries()) {
-      log.info("Running {}", queryAndResult.getLeft());
-      ResultSet res = stmt.executeQuery(queryAndResult.getLeft());
-      if (!res.next()) {
-        log.info("res.next() was False - typically this means the query 
returned no rows.");
-        assert 0 == queryAndResult.getRight();
-      } else {
-        Integer result = res.getInt(1);
-        if (!queryAndResult.getRight().equals(result)) {
-          throw new AssertionError(
-              "QUERY: " + queryAndResult.getLeft()
-                  + " | EXPECTED RESULT = " + queryAndResult.getRight()
-                  + " | ACTUAL RESULT = " + result
-          );
-        }
-      }
-      log.info("Successfully validated query!");
+    catch (Exception e) {
+      throw new HoodieValidationException("Hive query validation failed due to 
" + e.getMessage(), e);
     }
-    this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
   }
-
-  private void executeStatement(String query, Statement stmt) throws 
SQLException {
-    log.info("Executing statement {}", stmt.toString());
-    stmt.execute(query);
-  }
-
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
new file mode 100644
index 0000000000..9a9bafcf6f
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+public class PrestoQueryNode extends BaseQueryNode {
+
+  public PrestoQueryNode(DeltaConfig.Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
+    log.info("Executing presto query node {}", this.getName());
+    String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
+    if (StringUtils.isNullOrEmpty(url)) {
+      throw new IllegalArgumentException("Presto JDBC connection url not 
provided. Please set --presto-jdbc-url.");
+    }
+    String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
+    String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
+    try {
+      Class.forName("com.facebook.presto.jdbc.PrestoDriver");
+    } catch (ClassNotFoundException e) {
+      throw new HoodieValidationException("Presto query validation failed due 
to " + e.getMessage(), e);
+    }
+    try (Connection connection = DriverManager.getConnection(url, user, pass)) 
{
+      Statement stmt = connection.createStatement();
+      setSessionProperties(this.config.getPrestoProperties(), stmt);
+      executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
+      stmt.close();
+    }
+    catch (Exception e) {
+      throw new HoodieValidationException("Presto query validation failed due 
to " + e.getMessage(), e);
+    }
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java
new file mode 100644
index 0000000000..ffcc901f67
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+public class TrinoQueryNode extends BaseQueryNode{
+
+  public TrinoQueryNode(DeltaConfig.Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
+    log.info("Executing trino query node {}", this.getName());
+    String url = context.getHoodieTestSuiteWriter().getCfg().trinoJdbcUrl;
+    if (StringUtils.isNullOrEmpty(url)) {
+      throw new IllegalArgumentException("Trino JDBC connection url not 
provided. Please set --trino-jdbc-url.");
+    }
+    String user = context.getHoodieTestSuiteWriter().getCfg().trinoUsername;
+    String pass = context.getHoodieTestSuiteWriter().getCfg().trinoPassword;
+    try {
+      Class.forName("io.trino.jdbc.TrinoDriver");
+    } catch (ClassNotFoundException e) {
+      throw new HoodieValidationException("Trino query validation failed due 
to " + e.getMessage(), e);
+    }
+    try (Connection connection = DriverManager.getConnection(url, user, pass)) 
{
+      Statement stmt = connection.createStatement();
+      setSessionProperties(this.config.getTrinoProperties(), stmt);
+      executeAndValidateQueries(this.config.getTrinoQueries(), stmt);
+      stmt.close();
+    }
+    catch (Exception e) {
+      throw new HoodieValidationException("Trino query validation failed due 
to " + e.getMessage(), e);
+    }
+  }
+}
diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml 
b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
index 23691659ca..8228c53e54 100644
--- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
+++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
@@ -71,4 +71,24 @@ dag_content:
         query2: "select count(*) from testdb1.table1 group   by `_row_key` 
having count(*) > 1"
         result2: 0
     type: HiveQueryNode
-    deps: first_hive_sync
\ No newline at end of file
+    deps: first_hive_sync
+  first_presto_query:
+    config:
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb1.table1"
+        result1: 300
+        query2: "select count(*) from testdb1.table1 group   by `_row_key` 
having count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: first_hive_query
+  first_trino_query:
+    config:
+      trino_queries:
+        query1: "select count(*) from testdb1.table1"
+        result1: 300
+        query2: "select count(*) from testdb1.table1 group   by `_row_key` 
having count(*) > 1"
+        result2: 0
+    type: TrinoQueryNode
+    deps: first_presto_query
\ No newline at end of file
diff --git a/packaging/hudi-integ-test-bundle/pom.xml 
b/packaging/hudi-integ-test-bundle/pom.xml
index 689da9f2e0..167183bc1e 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -100,6 +100,8 @@
                   
<include>org.apache.hbase.thirdparty:hbase-shaded-protobuf</include>
                   <include>org.apache.htrace:htrace-core4</include>
                   <include>commons-io:commons-io</include>
+                  <include>com.facebook.presto:presto-jdbc</include>
+                  <include>io.trino:trino-jdbc</include>
 
                   <include>org.jetbrains.kotlin:kotlin-stdlib-jdk8</include>
                   <include>org.jetbrains.kotlin:kotlin-stdlib</include>
diff --git a/pom.xml b/pom.xml
index c5913823ee..86c6115f89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,8 @@
     <hadoop.version>2.10.1</hadoop.version>
     <hive.groupid>org.apache.hive</hive.groupid>
     <hive.version>2.3.1</hive.version>
+    <presto.version>0.273</presto.version>
+    <trino.version>390</trino.version>
     <hive.exec.classifier>core</hive.exec.classifier>
     <metrics.version>4.1.1</metrics.version>
     <orc.version>1.6.0</orc.version>
@@ -1069,6 +1071,18 @@
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>com.facebook.presto</groupId>
+        <artifactId>presto-jdbc</artifactId>
+        <version>${presto.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.trino</groupId>
+        <artifactId>trino-jdbc</artifactId>
+        <version>${trino.version}</version>
+      </dependency>
+
       <!-- Zookeeper -->
       <dependency>
         <groupId>org.apache.curator</groupId>

Reply via email to