This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7a7e55d8b20b231bbc5cbededc480636d5d916c0 Author: Sagar Sumit <[email protected]> AuthorDate: Tue Aug 2 09:13:06 2022 +0530 [HUDI-4025] Add Presto and Trino query node to validate queries (#5578) * Add Presto and Trino query nodes to hudi-integ-test * Add yamls for query validation * Add presto-jdbc and trino-jdbc to integ-test-bundle --- .../test-suite/deltastreamer-hive-sync-presto.yaml | 36 ++++--- hudi-integ-test/pom.xml | 10 ++ .../hudi/integ/testsuite/HoodieTestSuiteJob.java | 23 +++++ .../integ/testsuite/configuration/DeltaConfig.java | 54 +++++++++- .../apache/hudi/integ/testsuite/dag/DagUtils.java | 114 ++++++++++++++------- .../integ/testsuite/dag/nodes/BaseQueryNode.java | 62 +++++++++++ .../integ/testsuite/dag/nodes/HiveQueryNode.java | 46 +++------ .../integ/testsuite/dag/nodes/PrestoQueryNode.java | 60 +++++++++++ .../integ/testsuite/dag/nodes/TrinoQueryNode.java | 60 +++++++++++ .../src/test/resources/unit-test-cow-dag.yaml | 22 +++- packaging/hudi-integ-test-bundle/pom.xml | 2 + pom.xml | 14 +++ 12 files changed, 414 insertions(+), 89 deletions(-) diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml b/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml similarity index 72% copy from hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml copy to docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml index 23691659ca..61ea13c18e 100644 --- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml +++ b/docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + dag_name: unit-test-cow-dag dag_rounds: 1 dag_intermittent_delay_mins: 10 @@ -33,10 +34,6 @@ dag_content: num_records_insert: 100 type: InsertNode deps: first_insert - first_rollback: - config: - deps: second_insert - type: RollbackNode third_insert: config: record_size: 70000 @@ -44,7 +41,7 @@ dag_content: repeat_count: 1 num_records_insert: 100 type: InsertNode - deps: first_rollback + deps: second_insert first_upsert: config: record_size: 70000 @@ -59,16 +56,23 @@ dag_content: engine: "mr" type: HiveSyncNode deps: first_upsert - first_hive_query: + first_presto_query: config: - hive_props: - prop2: "set spark.yarn.queue=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" - hive_queries: - query1: "select count(*) from testdb1.table1" - result1: 300 - query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb.table1" + result1: 400 + query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1" result2: 0 - type: HiveQueryNode - deps: first_hive_sync \ No newline at end of file + type: PrestoQueryNode + deps: first_hive_sync +# first_trino_query: +# config: +# trino_queries: +# query1: "select count(*) from testdb1.table1" +# result1: 300 +# query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" +# result2: 0 +# type: TrinoQueryNode +# deps: first_presto_query \ No newline at end of file diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index 04e55fc086..eb08de742e 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -394,6 +394,16 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>com.facebook.presto</groupId> + <artifactId>presto-jdbc</artifactId> + </dependency> + + <dependency> + <groupId>io.trino</groupId> + <artifactId>trino-jdbc</artifactId> + </dependency> + <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 5e2f9812ba..8adea6b179 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -62,6 +62,7 @@ import java.util.List; import java.util.Map; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; /** * This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency @@ -317,5 +318,27 @@ public class HoodieTestSuiteJob { @Parameter(names = {"--test-continuous-mode"}, description = "Tests continuous mode in deltastreamer.") public Boolean testContinousMode = false; + + @Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL in the format jdbc:presto://<host>:<port>/<catalog>/<schema> " + + "e.g. URL to connect to Presto running on localhost port 8080 with the catalog `hive` and the schema `sales`: " + + "jdbc:presto://localhost:8080/hive/sales") + public String prestoJdbcUrl = EMPTY_STRING; + + @Parameter(names = {"--presto-jdbc-username"}, description = "Username to use for authentication") + public String prestoUsername = "test"; + + @Parameter(names = {"--presto-jdbc-password"}, description = "Password corresponding to the username to use for authentication") + public String prestoPassword; + + @Parameter(names = {"--trino-jdbc-url"}, description = "Trino JDBC URL in the format jdbc:trino://<host>:<port>/<catalog>/<schema> " + + "e.g. URL to connect to Trino running on localhost port 8080 with the catalog `hive` and the schema `sales`: " + + "jdbc:trino://localhost:8080/hive/sales") + public String trinoJdbcUrl = EMPTY_STRING; + + @Parameter(names = {"--trino-jdbc-username"}, description = "Username to use for authentication") + public String trinoUsername = "test"; + + @Parameter(names = {"--trino-jdbc-password"}, description = "Password corresponding to the username to use for authentication") + public String trinoPassword; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index a781d19cb7..03182d2784 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -43,7 +43,7 @@ public class DeltaConfig implements Serializable { private final SerializableConfiguration configuration; public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType, - SerializableConfiguration configuration) { + SerializableConfiguration configuration) { this.deltaOutputMode = deltaOutputMode; this.deltaInputType = deltaInputType; this.configuration = configuration; @@ -74,6 +74,10 @@ public class DeltaConfig implements Serializable { public static final String CHILDREN = "children"; public static final String HIVE_QUERIES = "hive_queries"; public static final String HIVE_PROPERTIES = "hive_props"; + public static final String PRESTO_QUERIES = "presto_queries"; + public static final String PRESTO_PROPERTIES = "presto_props"; + public static final String TRINO_QUERIES = "trino_queries"; + public static final String TRINO_PROPERTIES = "trino_props"; private static String NUM_RECORDS_INSERT = "num_records_insert"; private static String NUM_RECORDS_UPSERT = "num_records_upsert"; private static String NUM_RECORDS_DELETE = "num_records_delete"; @@ -283,7 +287,7 @@ public class DeltaConfig implements Serializable { public Option<String> getPartitionField() { return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty() - : Option.of(configsMap.get(PARTITION_FIELD).toString()); + : Option.of(configsMap.get(PARTITION_FIELD).toString()); } public String getMergeCondition() { @@ -319,7 +323,7 @@ public class DeltaConfig implements Serializable { public List<Pair<String, Integer>> getHiveQueries() { try { - return (List<Pair<String, Integer>>) this.configsMap.getOrDefault("hive_queries", new ArrayList<>()); + return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(HIVE_QUERIES, new ArrayList<>()); } catch (Exception e) { throw new RuntimeException("unable to get hive queries from configs"); } @@ -333,6 +337,30 @@ public class DeltaConfig implements Serializable { return (List<String>) this.configsMap.getOrDefault(HIVE_PROPERTIES, new ArrayList<>()); } + public List<String> getPrestoProperties() { + return (List<String>) this.configsMap.getOrDefault(PRESTO_PROPERTIES, new ArrayList<>()); + } + + public List<String> getTrinoProperties() { + return (List<String>) this.configsMap.getOrDefault(TRINO_PROPERTIES, new ArrayList<>()); + } + + public List<Pair<String, Integer>> getPrestoQueries() { + try { + return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(PRESTO_QUERIES, new ArrayList<>()); + } catch (Exception e) { + throw new RuntimeException("unable to get presto queries from configs"); + } + } + + public List<Pair<String, Integer>> getTrinoQueries() { + try { + return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(TRINO_QUERIES, new ArrayList<>()); + } catch (Exception e) { + throw new RuntimeException("unable to get trino queries from configs"); + } + } + @Override public String toString() { try { @@ -449,6 +477,26 @@ public class DeltaConfig implements Serializable { return this; } + public Builder withPrestoProperties(List<String> prestoProperties) { + this.configsMap.put(PRESTO_PROPERTIES, prestoProperties); + return this; + } + + public Builder withTrinoProperties(List<String> trinoProperties) { + this.configsMap.put(TRINO_PROPERTIES, trinoProperties); + return this; + } + + public Builder withPrestoQueryAndResults(List<Pair<String, Integer>> prestoQueries) { + this.configsMap.put(PRESTO_QUERIES, prestoQueries); + return this; + } + + public Builder withTrinoQueryAndResults(List<Pair<String, Integer>> trinoQueries) { + this.configsMap.put(TRINO_QUERIES, trinoQueries); + return this; + } + public Builder withConfigsMap(Map<String, Object> configsMap) { this.configsMap = configsMap; return this; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java index 789d7e3423..1d78d0fdbb 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java @@ -53,7 +53,13 @@ import java.util.Map.Entry; import java.util.stream.Collectors; import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_PROPERTIES; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_QUERIES; import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.NO_DEPENDENCY_VALUE; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_PROPERTIES; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_QUERIES; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.TRINO_PROPERTIES; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.TRINO_QUERIES; /** * Utility class to SerDe workflow dag. @@ -172,7 +178,8 @@ public class DagUtils { DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)) .withName(name).build(); return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config); - } catch (ClassNotFoundException e) { + } + catch (ClassNotFoundException e) { throw new RuntimeException(e); } } @@ -192,11 +199,23 @@ public class DagUtils { while (itr.hasNext()) { Entry<String, JsonNode> entry = itr.next(); switch (entry.getKey()) { - case DeltaConfig.Config.HIVE_QUERIES: - configsMap.put(DeltaConfig.Config.HIVE_QUERIES, getHiveQueries(entry)); + case HIVE_QUERIES: + configsMap.put(HIVE_QUERIES, getQueries(entry)); + break; + case HIVE_PROPERTIES: + configsMap.put(HIVE_PROPERTIES, getQuerySessionProperties(entry)); + break; + case PRESTO_QUERIES: + configsMap.put(PRESTO_QUERIES, getQueries(entry)); + break; + case PRESTO_PROPERTIES: + configsMap.put(PRESTO_PROPERTIES, getQuerySessionProperties(entry)); + break; + case TRINO_QUERIES: + configsMap.put(TRINO_QUERIES, getQueries(entry)); break; - case DeltaConfig.Config.HIVE_PROPERTIES: - configsMap.put(DeltaConfig.Config.HIVE_PROPERTIES, getProperties(entry)); + case TRINO_PROPERTIES: + configsMap.put(TRINO_PROPERTIES, getQuerySessionProperties(entry)); break; default: configsMap.put(entry.getKey(), getValue(entry.getValue())); @@ -206,25 +225,27 @@ public class DagUtils { return configsMap; } - private static List<Pair<String, Integer>> getHiveQueries(Entry<String, JsonNode> entry) { + private static List<Pair<String, Integer>> getQueries(Entry<String, JsonNode> entry) { List<Pair<String, Integer>> queries = new ArrayList<>(); try { List<JsonNode> flattened = new ArrayList<>(); flattened.add(entry.getValue()); - queries = (List<Pair<String, Integer>>)getHiveQueryMapper().readValue(flattened.toString(), List.class); - } catch (Exception e) { + queries = (List<Pair<String, Integer>>) getQueryMapper().readValue(flattened.toString(), List.class); + } + catch (Exception e) { e.printStackTrace(); } return queries; } - private static List<String> getProperties(Entry<String, JsonNode> entry) { + private static List<String> getQuerySessionProperties(Entry<String, JsonNode> entry) { List<String> properties = new ArrayList<>(); try { List<JsonNode> flattened = new ArrayList<>(); flattened.add(entry.getValue()); - properties = (List<String>)getHivePropertyMapper().readValue(flattened.toString(), List.class); - } catch (Exception e) { + properties = (List<String>) getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class); + } + catch (Exception e) { e.printStackTrace(); } return properties; @@ -233,15 +254,20 @@ public class DagUtils { private static Object getValue(JsonNode node) { if (node.isInt()) { return node.asInt(); - } else if (node.isLong()) { + } + else if (node.isLong()) { return node.asLong(); - } else if (node.isShort()) { + } + else if (node.isShort()) { return node.asInt(); - } else if (node.isBoolean()) { + } + else if (node.isBoolean()) { return node.asBoolean(); - } else if (node.isDouble()) { + } + else if (node.isDouble()) { return node.asDouble(); - } else if (node.isFloat()) { + } + else if (node.isFloat()) { return node.asDouble(); } return node.textValue(); @@ -254,13 +280,28 @@ public class DagUtils { while (itr.hasNext()) { Entry<String, JsonNode> entry = itr.next(); switch (entry.getKey()) { - case DeltaConfig.Config.HIVE_QUERIES: - ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES, - MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries()))); + case HIVE_QUERIES: + ((ObjectNode) configNode).put(HIVE_QUERIES, + MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries()))); + break; + case HIVE_PROPERTIES: + ((ObjectNode) configNode).put(HIVE_PROPERTIES, + MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties()))); + case PRESTO_QUERIES: + ((ObjectNode) configNode).put(PRESTO_QUERIES, + MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries()))); + break; + case PRESTO_PROPERTIES: + ((ObjectNode) configNode).put(PRESTO_PROPERTIES, + MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties()))); break; - case DeltaConfig.Config.HIVE_PROPERTIES: - ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES, - MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties()))); + case TRINO_QUERIES: + ((ObjectNode) configNode).put(TRINO_QUERIES, + MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries()))); + break; + case TRINO_PROPERTIES: + ((ObjectNode) configNode).put(TRINO_PROPERTIES, + MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties()))); break; default: break; @@ -293,21 +334,22 @@ public class DagUtils { return result.toString("utf-8"); } - private static ObjectMapper getHiveQueryMapper() { + private static ObjectMapper getQueryMapper() { SimpleModule module = new SimpleModule(); ObjectMapper queryMapper = new ObjectMapper(); - module.addSerializer(List.class, new HiveQuerySerializer()); - module.addDeserializer(List.class, new HiveQueryDeserializer()); + module.addSerializer(List.class, new QuerySerializer()); + module.addDeserializer(List.class, new QueryDeserializer()); queryMapper.registerModule(module); return queryMapper; } - private static final class HiveQuerySerializer extends JsonSerializer<List> { + private static final class QuerySerializer extends JsonSerializer<List> { Integer index = 0; + @Override public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializers) throws IOException { gen.writeStartObject(); - for (Pair pair : (List<Pair>)pairs) { + for (Pair pair : (List<Pair>) pairs) { gen.writeStringField("query" + index, pair.getLeft().toString()); gen.writeNumberField("result" + index, Integer.parseInt(pair.getRight().toString())); index++; @@ -316,7 +358,7 @@ public class DagUtils { } } - private static final class HiveQueryDeserializer extends JsonDeserializer<List> { + private static final class QueryDeserializer extends JsonDeserializer<List> { @Override public List deserialize(JsonParser parser, DeserializationContext context) throws IOException { List<Pair<String, Integer>> pairs = new ArrayList<>(); @@ -334,7 +376,8 @@ public class DagUtils { if (fieldName.contains("query")) { query = parser.getValueAsString(); - } else if (fieldName.contains("result")) { + } + else if (fieldName.contains("result")) { result = parser.getValueAsInt(); pairs.add(Pair.of(query, result)); } @@ -344,21 +387,22 @@ public class DagUtils { } } - private static ObjectMapper getHivePropertyMapper() { + private static ObjectMapper getQueryEnginePropertyMapper() { SimpleModule module = new SimpleModule(); ObjectMapper propMapper = new ObjectMapper(); - module.addSerializer(List.class, new HivePropertySerializer()); - module.addDeserializer(List.class, new HivePropertyDeserializer()); + module.addSerializer(List.class, new QueryEnginePropertySerializer()); + module.addDeserializer(List.class, new QueryEnginePropertyDeserializer()); propMapper.registerModule(module); return propMapper; } - private static final class HivePropertySerializer extends JsonSerializer<List> { + private static final class QueryEnginePropertySerializer extends JsonSerializer<List> { Integer index = 0; + @Override public void serialize(List props, JsonGenerator gen, SerializerProvider serializers) throws IOException { gen.writeStartObject(); - for (String prop : (List<String>)props) { + for (String prop : (List<String>) props) { gen.writeStringField("prop" + index, prop); index++; } @@ -366,7 +410,7 @@ public class DagUtils { } } - private static final class HivePropertyDeserializer extends JsonDeserializer<List> { + private static final class QueryEnginePropertyDeserializer extends JsonDeserializer<List> { @Override public List deserialize(JsonParser parser, DeserializationContext context) throws IOException { List<String> props = new ArrayList<>(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java new file mode 100644 index 0000000000..c1a3d23791 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.common.util.collection.Pair; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +public abstract class BaseQueryNode extends DagNode<Boolean> { + + public void setSessionProperties(List<String> properties, Statement stmt) throws SQLException { + for (String prop : properties) { + executeStatement(prop, stmt); + } + } + + public void executeAndValidateQueries(List<Pair<String, Integer>> queriesWithResult, Statement stmt) throws SQLException { + for (Pair<String, Integer> queryAndResult : queriesWithResult) { + log.info("Running {}", queryAndResult.getLeft()); + ResultSet res = stmt.executeQuery(queryAndResult.getLeft()); + if (!res.next()) { + log.info("res.next() was False - typically this means the query returned no rows."); + assert 0 == queryAndResult.getRight(); + } + else { + Integer result = res.getInt(1); + if (!queryAndResult.getRight().equals(result)) { + throw new AssertionError( + "QUERY: " + queryAndResult.getLeft() + + " | EXPECTED RESULT = " + queryAndResult.getRight() + + " | ACTUAL RESULT = " + result + ); + } + } + log.info("Successfully validated query!"); + } + } + + private void executeStatement(String query, Statement stmt) throws SQLException { + log.info("Executing statement {}", stmt.toString()); + stmt.execute(query); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java index 2da5c558d2..fd04cc34c6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java @@ -19,7 +19,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; @@ -28,8 +28,6 @@ import org.apache.hudi.sync.common.HoodieSyncConfig; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; @@ -39,7 +37,7 @@ import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER; /** * A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config. */ -public class HiveQueryNode extends DagNode<Boolean> { +public class HiveQueryNode extends BaseQueryNode { private HiveServiceProvider hiveServiceProvider; @@ -61,37 +59,17 @@ public class HiveQueryNode extends DagNode<Boolean> { .getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat); HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties); this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter()); - Connection con = DriverManager.getConnection(hiveSyncConfig.getString(HIVE_URL), - hiveSyncConfig.getString(HIVE_USER), hiveSyncConfig.getString(HIVE_PASS)); - Statement stmt = con.createStatement(); - stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat"); - for (String hiveProperty : this.config.getHiveProperties()) { - executeStatement(hiveProperty, stmt); + try (Connection con = DriverManager.getConnection(hiveSyncConfig.getString(HIVE_URL), + hiveSyncConfig.getString(HIVE_USER), hiveSyncConfig.getString(HIVE_PASS))) { + Statement stmt = con.createStatement(); + stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat"); + setSessionProperties(this.config.getHiveProperties(), stmt); + executeAndValidateQueries(this.config.getHiveQueries(), stmt); + stmt.close(); + this.hiveServiceProvider.stopLocalHiveServiceIfNeeded(); } - for (Pair<String, Integer> queryAndResult : this.config.getHiveQueries()) { - log.info("Running {}", queryAndResult.getLeft()); - ResultSet res = stmt.executeQuery(queryAndResult.getLeft()); - if (!res.next()) { - log.info("res.next() was False - typically this means the query returned no rows."); - assert 0 == queryAndResult.getRight(); - } else { - Integer result = res.getInt(1); - if (!queryAndResult.getRight().equals(result)) { - throw new AssertionError( - "QUERY: " + queryAndResult.getLeft() - + " | EXPECTED RESULT = " + queryAndResult.getRight() - + " | ACTUAL RESULT = " + result - ); - } - } - log.info("Successfully validated query!"); + catch (Exception e) { + throw new HoodieValidationException("Hive query validation failed due to " + e.getMessage(), e); } - this.hiveServiceProvider.stopLocalHiveServiceIfNeeded(); } - - private void executeStatement(String query, Statement stmt) throws SQLException { - log.info("Executing statement {}", stmt.toString()); - stmt.execute(query); - } - } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java new file mode 100644 index 0000000000..9a9bafcf6f --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; + +public class PrestoQueryNode extends BaseQueryNode { + + public PrestoQueryNode(DeltaConfig.Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext context, int curItrCount) throws Exception { + log.info("Executing presto query node {}", this.getName()); + String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl; + if (StringUtils.isNullOrEmpty(url)) { + throw new IllegalArgumentException("Presto JDBC connection url not provided. Please set --presto-jdbc-url."); + } + String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername; + String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword; + try { + Class.forName("com.facebook.presto.jdbc.PrestoDriver"); + } catch (ClassNotFoundException e) { + throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); + } + try (Connection connection = DriverManager.getConnection(url, user, pass)) { + Statement stmt = connection.createStatement(); + setSessionProperties(this.config.getPrestoProperties(), stmt); + executeAndValidateQueries(this.config.getPrestoQueries(), stmt); + stmt.close(); + } + catch (Exception e) { + throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); + } + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java new file mode 100644 index 0000000000..ffcc901f67 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; + +public class TrinoQueryNode extends BaseQueryNode{ + + public TrinoQueryNode(DeltaConfig.Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext context, int curItrCount) throws Exception { + log.info("Executing trino query node {}", this.getName()); + String url = context.getHoodieTestSuiteWriter().getCfg().trinoJdbcUrl; + if (StringUtils.isNullOrEmpty(url)) { + throw new IllegalArgumentException("Trino JDBC connection url not provided. Please set --trino-jdbc-url."); + } + String user = context.getHoodieTestSuiteWriter().getCfg().trinoUsername; + String pass = context.getHoodieTestSuiteWriter().getCfg().trinoPassword; + try { + Class.forName("io.trino.jdbc.TrinoDriver"); + } catch (ClassNotFoundException e) { + throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e); + } + try (Connection connection = DriverManager.getConnection(url, user, pass)) { + Statement stmt = connection.createStatement(); + setSessionProperties(this.config.getTrinoProperties(), stmt); + executeAndValidateQueries(this.config.getTrinoQueries(), stmt); + stmt.close(); + } + catch (Exception e) { + throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e); + } + } +} diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml index 23691659ca..8228c53e54 100644 --- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml @@ -71,4 +71,24 @@ dag_content: query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" result2: 0 type: HiveQueryNode - deps: first_hive_sync \ No newline at end of file + deps: first_hive_sync + first_presto_query: + config: + presto_props: + prop1: "SET SESSION hive.parquet_use_column_names = true" + presto_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 + query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + result2: 0 + type: PrestoQueryNode + deps: first_hive_query + first_trino_query: + config: + trino_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 + query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + result2: 0 + type: TrinoQueryNode + deps: first_presto_query \ No newline at end of file diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index 689da9f2e0..167183bc1e 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -100,6 +100,8 @@ <include>org.apache.hbase.thirdparty:hbase-shaded-protobuf</include> <include>org.apache.htrace:htrace-core4</include> <include>commons-io:commons-io</include> + <include>com.facebook.presto:presto-jdbc</include> + <include>io.trino:trino-jdbc</include> <include>org.jetbrains.kotlin:kotlin-stdlib-jdk8</include> <include>org.jetbrains.kotlin:kotlin-stdlib</include> diff --git a/pom.xml b/pom.xml index c5913823ee..86c6115f89 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,8 @@ <hadoop.version>2.10.1</hadoop.version> <hive.groupid>org.apache.hive</hive.groupid> <hive.version>2.3.1</hive.version> + <presto.version>0.273</presto.version> + <trino.version>390</trino.version> <hive.exec.classifier>core</hive.exec.classifier> <metrics.version>4.1.1</metrics.version> <orc.version>1.6.0</orc.version> @@ -1069,6 +1071,18 @@ </exclusions> </dependency> + <dependency> + <groupId>com.facebook.presto</groupId> + <artifactId>presto-jdbc</artifactId> + <version>${presto.version}</version> + </dependency> + + <dependency> + <groupId>io.trino</groupId> + <artifactId>trino-jdbc</artifactId> + <version>${trino.version}</version> + </dependency> + <!-- Zookeeper --> <dependency> <groupId>org.apache.curator</groupId>
