This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d7a67619b23 [HUDI-8272] Fix incremental queries in integration tests
on Spark (#12057)
d7a67619b23 is described below
commit d7a67619b23706d3ced8e54ef21b13c7c90f2ddf
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Oct 6 12:12:57 2024 -0700
[HUDI-8272] Fix incremental queries in integration tests on Spark (#12057)
---
docker/demo/sparksql-incremental.commands | 74 +++++++---------------
.../java/org/apache/hudi/integ/ITTestBase.java | 6 +-
.../org/apache/hudi/integ/ITTestHoodieDemo.java | 28 +++-----
3 files changed, 34 insertions(+), 74 deletions(-)
diff --git a/docker/demo/sparksql-incremental.commands
b/docker/demo/sparksql-incremental.commands
index 87724977663..0c402563187 100644
--- a/docker/demo/sparksql-incremental.commands
+++ b/docker/demo/sparksql-incremental.commands
@@ -28,70 +28,40 @@ import org.apache.hadoop.fs.FileSystem;
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val beginInstantTime = HoodieDataSourceHelpers.listCommitsSince(fs,
"/user/hive/warehouse/stock_ticks_cow", "00000").get(0)
-println("Begin instant time for incremental query: " + beginInstantTime)
-val hoodieIncQueryDF = spark.read.format("org.apache.hudi").
+println("Begin instant time for COW incremental query: " + beginInstantTime)
+val hoodieIncQueryDF = spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(),
beginInstantTime).
load("/user/hive/warehouse/stock_ticks_cow");
+println("stock_ticks_cow incremental count: " + hoodieIncQueryDF.count)
hoodieIncQueryDF.registerTempTable("stock_ticks_cow_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_cow_incr where symbol = 'GOOG'").show(100, false);
-spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open,
close from stock_ticks_cow_incr").
- write.format("org.apache.hudi").
- option("hoodie.insert.shuffle.parallelism", "2").
- option("hoodie.upsert.shuffle.parallelism","2").
- option(DataSourceWriteOptions.TABLE_TYPE.key(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
- option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
- option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "key").
- option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
- option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
- option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
- option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(),
"stock_ticks_derived_mor").
- option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
- option(HiveSyncConfigHolder.HIVE_URL.key(),
"jdbc:hive2://hiveserver:10000").
- option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
- option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
- option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
- option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
- option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
classOf[MultiPartKeysValueExtractor].getCanonicalName).
- option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
- mode(SaveMode.Overwrite).
- save("/user/hive/warehouse/stock_ticks_derived_mor");
-
-spark.sql("select count(*) from stock_ticks_derived_mor_ro").show(20, false)
-spark.sql("select count(*) from stock_ticks_derived_mor_rt").show(20, false)
-
-val hoodieIncQueryBsDF = spark.read.format("org.apache.hudi").
+val hoodieIncQueryBsDF = spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(),
"00000000000001").
load("/user/hive/warehouse/stock_ticks_cow_bs");
+println("stock_ticks_cow_bs incremental count: " + hoodieIncQueryBsDF.count)
hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_cow_bs_incr where symbol = 'GOOG'").show(100, false);
-spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open,
close from stock_ticks_cow_bs_incr").
- write.format("org.apache.hudi").
- option("hoodie.insert.shuffle.parallelism", "2").
- option("hoodie.upsert.shuffle.parallelism","2").
- option(DataSourceWriteOptions.TABLE_TYPE.key(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
- option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
- option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "key").
- option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
- option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
- option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
- option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(),
"stock_ticks_derived_mor_bs").
- option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
- option(HiveSyncConfigHolder.HIVE_URL.key(),
"jdbc:hive2://hiveserver:10000").
- option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
- option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
- option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
- option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
- option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
classOf[MultiPartKeysValueExtractor].getCanonicalName).
- option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
- mode(SaveMode.Overwrite).
- save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
+val morBeginInstantTime = HoodieDataSourceHelpers.listCommitsSince(fs,
"/user/hive/warehouse/stock_ticks_mor", "00000").get(0)
+println("Begin instant time for MOR incremental query: " + morBeginInstantTime)
+
+val hoodieMorIncQueryDF = spark.read.format("hudi").
+ option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
+ option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(),
morBeginInstantTime).
+ load("/user/hive/warehouse/stock_ticks_mor");
+println("stock_ticks_mor incremental count: " + hoodieMorIncQueryDF.count)
+hoodieMorIncQueryDF.registerTempTable("stock_ticks_mor_incr")
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_mor_incr where symbol = 'GOOG'").show(100, false);
-spark.sql("show tables").show(20, false)
-spark.sql("select count(*) from stock_ticks_derived_mor_bs_ro").show(20, false)
-spark.sql("select count(*) from stock_ticks_derived_mor_bs_rt").show(20, false)
+val hoodieMorIncQueryBsDF = spark.read.format("hudi").
+ option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
+ option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(),
"00000000000001").
+ load("/user/hive/warehouse/stock_ticks_mor_bs");
+println("stock_ticks_mor_bs incremental count: " + hoodieMorIncQueryBsDF.count)
+hoodieIncQueryBsDF.registerTempTable("stock_ticks_mor_bs_incr")
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_mor_bs_incr where symbol = 'GOOG'").show(100, false);
System.exit(0);
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index 86b450f845e..736ab3e2e1d 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -113,8 +113,10 @@ public abstract class ITTestBase {
static String getSparkShellCommand(String commandFile) {
return new StringBuilder().append("spark-shell --jars
").append(HUDI_SPARK_BUNDLE)
.append(" --master local[2] --driver-class-path
").append(HADOOP_CONF_DIR)
- .append(
- " --conf spark.sql.hive.convertMetastoreParquet=false
--deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 1
")
+ .append(" --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer")
+ .append(" --conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ .append(" --conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+ .append(" --deploy-mode client --driver-memory 1G --executor-memory
1G --num-executors 1")
.append(" -i ").append(commandFile).toString();
}
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 34ff7b0f914..f38913cd094 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -131,13 +131,14 @@ public class ITTestHoodieDemo extends ITTestBase {
// testPrestoAfterSecondBatch();
// testTrinoAfterSecondBatch();
testSparkSQLAfterSecondBatch();
- // TODO(HUDI-8271, HUDI-8272): fix incremental queries in integration
tests on Hive and Spark
+ // TODO(HUDI-8271): fix incremental queries in integration tests on Hive
// testIncrementalHiveQueryBeforeCompaction();
- // testIncrementalSparkSQLQuery();
+ testIncrementalSparkSQLQuery();
// compaction
scheduleAndRunCompaction();
+ testIncrementalSparkSQLQuery();
// testHiveAfterSecondBatchAfterCompaction();
// testPrestoAfterSecondBatchAfterCompaction();
// testTrinoAfterSecondBatchAfterCompaction();
@@ -498,24 +499,11 @@ public class ITTestHoodieDemo extends ITTestBase {
private void testIncrementalSparkSQLQuery() throws Exception {
Pair<String, String> stdOutErrPair =
executeSparkSQLCommand(SPARKSQL_INCREMENTAL_COMMANDS, true);
- assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021
|1227.1993|1227.215|", 2);
- assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow
|false |\n"
- + "|default |stock_ticks_cow_bs |false |\n"
- + "|default |stock_ticks_derived_mor |false |\n"
- + "|default |stock_ticks_derived_mor_bs |false |\n"
- + "|default |stock_ticks_derived_mor_bs_ro|false |\n"
- + "|default |stock_ticks_derived_mor_bs_rt|false |\n"
- + "|default |stock_ticks_derived_mor_ro |false |\n"
- + "|default |stock_ticks_derived_mor_rt |false |\n"
- + "|default |stock_ticks_mor |false |\n"
- + "|default |stock_ticks_mor_bs |false |\n"
- + "|default |stock_ticks_mor_bs_ro |false |\n"
- + "|default |stock_ticks_mor_bs_rt |false |\n"
- + "|default |stock_ticks_mor_ro |false |\n"
- + "|default |stock_ticks_mor_rt |false |\n"
- + "| |stock_ticks_cow_bs_incr |true |\n"
- + "| |stock_ticks_cow_incr |true |");
- assertStdOutContains(stdOutErrPair, "|count(1)|\n+--------+\n|99 |",
4);
+ assertStdOutContains(stdOutErrPair, "stock_ticks_cow incremental count:
99", 1);
+ assertStdOutContains(stdOutErrPair, "stock_ticks_cow_bs incremental count:
99", 1);
+ assertStdOutContains(stdOutErrPair, "stock_ticks_mor incremental count:
99", 1);
+ assertStdOutContains(stdOutErrPair, "stock_ticks_mor_bs incremental count:
99", 1);
+ assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021
|1227.1993|1227.215|", 4);
}
private void scheduleAndRunCompaction() throws Exception {