This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a67619b23 [HUDI-8272] Fix incremental queries in integration tests 
on Spark (#12057)
d7a67619b23 is described below

commit d7a67619b23706d3ced8e54ef21b13c7c90f2ddf
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Oct 6 12:12:57 2024 -0700

    [HUDI-8272] Fix incremental queries in integration tests on Spark (#12057)
---
 docker/demo/sparksql-incremental.commands          | 74 +++++++---------------
 .../java/org/apache/hudi/integ/ITTestBase.java     |  6 +-
 .../org/apache/hudi/integ/ITTestHoodieDemo.java    | 28 +++-----
 3 files changed, 34 insertions(+), 74 deletions(-)

diff --git a/docker/demo/sparksql-incremental.commands 
b/docker/demo/sparksql-incremental.commands
index 87724977663..0c402563187 100644
--- a/docker/demo/sparksql-incremental.commands
+++ b/docker/demo/sparksql-incremental.commands
@@ -28,70 +28,40 @@ import org.apache.hadoop.fs.FileSystem;
 
 val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
 val beginInstantTime = HoodieDataSourceHelpers.listCommitsSince(fs, 
"/user/hive/warehouse/stock_ticks_cow", "00000").get(0)
-println("Begin instant time for incremental query: " + beginInstantTime)
-val hoodieIncQueryDF =  spark.read.format("org.apache.hudi").
+println("Begin instant time for COW incremental query: " + beginInstantTime)
+val hoodieIncQueryDF =  spark.read.format("hudi").
                       option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
                       option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), 
beginInstantTime).
                       load("/user/hive/warehouse/stock_ticks_cow");
+println("stock_ticks_cow incremental count: " + hoodieIncQueryDF.count)
 hoodieIncQueryDF.registerTempTable("stock_ticks_cow_incr")
 spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from 
stock_ticks_cow_incr where  symbol = 'GOOG'").show(100, false);
 
-spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, 
close from stock_ticks_cow_incr").
-    write.format("org.apache.hudi").
-    option("hoodie.insert.shuffle.parallelism", "2").
-    option("hoodie.upsert.shuffle.parallelism","2").
-    option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
-    option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
-    option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "key").
-    option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
-    option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
-    option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
-    option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), 
"stock_ticks_derived_mor").
-    option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
-    option(HiveSyncConfigHolder.HIVE_URL.key(), 
"jdbc:hive2://hiveserver:10000").
-    option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
-    option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
-    option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
-    option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
-    option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
classOf[MultiPartKeysValueExtractor].getCanonicalName).
-    option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
-    mode(SaveMode.Overwrite).
-    save("/user/hive/warehouse/stock_ticks_derived_mor");
-
-spark.sql("select count(*) from stock_ticks_derived_mor_ro").show(20, false)
-spark.sql("select count(*) from stock_ticks_derived_mor_rt").show(20, false)
-
-val hoodieIncQueryBsDF =  spark.read.format("org.apache.hudi").
+val hoodieIncQueryBsDF =  spark.read.format("hudi").
                       option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
                       option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), 
"00000000000001").
                       load("/user/hive/warehouse/stock_ticks_cow_bs");
+println("stock_ticks_cow_bs incremental count: " + hoodieIncQueryBsDF.count)
 hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
 spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from 
stock_ticks_cow_bs_incr where  symbol = 'GOOG'").show(100, false);
 
-spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, 
close from stock_ticks_cow_bs_incr").
-    write.format("org.apache.hudi").
-    option("hoodie.insert.shuffle.parallelism", "2").
-    option("hoodie.upsert.shuffle.parallelism","2").
-    option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
-    option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
-    option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "key").
-    option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
-    option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
-    option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
-    option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), 
"stock_ticks_derived_mor_bs").
-    option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
-    option(HiveSyncConfigHolder.HIVE_URL.key(), 
"jdbc:hive2://hiveserver:10000").
-    option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
-    option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
-    option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
-    option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
-    option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
classOf[MultiPartKeysValueExtractor].getCanonicalName).
-    option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
-    mode(SaveMode.Overwrite).
-    save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
+val morBeginInstantTime = HoodieDataSourceHelpers.listCommitsSince(fs, 
"/user/hive/warehouse/stock_ticks_mor", "00000").get(0)
+println("Begin instant time for MOR incremental query: " + morBeginInstantTime)
+
+val hoodieMorIncQueryDF =  spark.read.format("hudi").
+                      option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
+                      option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), 
morBeginInstantTime).
+                      load("/user/hive/warehouse/stock_ticks_mor");
+println("stock_ticks_mor incremental count: " + hoodieMorIncQueryDF.count)
+hoodieMorIncQueryDF.registerTempTable("stock_ticks_mor_incr")
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from 
stock_ticks_mor_incr where symbol = 'GOOG'").show(100, false);
 
-spark.sql("show tables").show(20, false)
-spark.sql("select count(*) from stock_ticks_derived_mor_bs_ro").show(20, false)
-spark.sql("select count(*) from stock_ticks_derived_mor_bs_rt").show(20, false)
+val hoodieMorIncQueryBsDF =  spark.read.format("hudi").
+                      option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
+                      option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), 
"00000000000001").
+                      load("/user/hive/warehouse/stock_ticks_mor_bs");
+println("stock_ticks_mor_bs incremental count: " + hoodieMorIncQueryBsDF.count)
+hoodieIncQueryBsDF.registerTempTable("stock_ticks_mor_bs_incr")
+spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from 
stock_ticks_mor_bs_incr where symbol = 'GOOG'").show(100, false);
 
 System.exit(0);
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index 86b450f845e..736ab3e2e1d 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -113,8 +113,10 @@ public abstract class ITTestBase {
   static String getSparkShellCommand(String commandFile) {
     return new StringBuilder().append("spark-shell --jars 
").append(HUDI_SPARK_BUNDLE)
         .append(" --master local[2] --driver-class-path 
").append(HADOOP_CONF_DIR)
-        .append(
-            " --conf spark.sql.hive.convertMetastoreParquet=false 
--deploy-mode client  --driver-memory 1G --executor-memory 1G --num-executors 1 
")
+        .append(" --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer")
+        .append(" --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+        .append(" --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+        .append(" --deploy-mode client  --driver-memory 1G --executor-memory 
1G --num-executors 1")
         .append(" -i ").append(commandFile).toString();
   }
 
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 34ff7b0f914..f38913cd094 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -131,13 +131,14 @@ public class ITTestHoodieDemo extends ITTestBase {
     // testPrestoAfterSecondBatch();
     // testTrinoAfterSecondBatch();
     testSparkSQLAfterSecondBatch();
-    // TODO(HUDI-8271, HUDI-8272): fix incremental queries in integration 
tests on Hive and Spark
+    // TODO(HUDI-8271): fix incremental queries in integration tests on Hive
     // testIncrementalHiveQueryBeforeCompaction();
-    // testIncrementalSparkSQLQuery();
+    testIncrementalSparkSQLQuery();
 
     // compaction
     scheduleAndRunCompaction();
 
+    testIncrementalSparkSQLQuery();
     // testHiveAfterSecondBatchAfterCompaction();
     // testPrestoAfterSecondBatchAfterCompaction();
     // testTrinoAfterSecondBatchAfterCompaction();
@@ -498,24 +499,11 @@ public class ITTestHoodieDemo extends ITTestBase {
 
   private void testIncrementalSparkSQLQuery() throws Exception {
     Pair<String, String> stdOutErrPair = 
executeSparkSQLCommand(SPARKSQL_INCREMENTAL_COMMANDS, true);
-    assertStdOutContains(stdOutErrPair, "|GOOG  |2018-08-31 10:59:00|9021  
|1227.1993|1227.215|", 2);
-    assertStdOutContains(stdOutErrPair, "|default  |stock_ticks_cow            
  |false      |\n"
-        + "|default  |stock_ticks_cow_bs           |false      |\n"
-        + "|default  |stock_ticks_derived_mor      |false      |\n"
-        + "|default  |stock_ticks_derived_mor_bs   |false      |\n"
-        + "|default  |stock_ticks_derived_mor_bs_ro|false      |\n"
-        + "|default  |stock_ticks_derived_mor_bs_rt|false      |\n"
-        + "|default  |stock_ticks_derived_mor_ro   |false      |\n"
-        + "|default  |stock_ticks_derived_mor_rt   |false      |\n"
-        + "|default  |stock_ticks_mor              |false      |\n"
-        + "|default  |stock_ticks_mor_bs           |false      |\n"
-        + "|default  |stock_ticks_mor_bs_ro        |false      |\n"
-        + "|default  |stock_ticks_mor_bs_rt        |false      |\n"
-        + "|default  |stock_ticks_mor_ro           |false      |\n"
-        + "|default  |stock_ticks_mor_rt           |false      |\n"
-        + "|         |stock_ticks_cow_bs_incr      |true       |\n"
-        + "|         |stock_ticks_cow_incr         |true       |");
-    assertStdOutContains(stdOutErrPair, "|count(1)|\n+--------+\n|99     |", 
4);
+    assertStdOutContains(stdOutErrPair, "stock_ticks_cow incremental count: 
99", 1);
+    assertStdOutContains(stdOutErrPair, "stock_ticks_cow_bs incremental count: 
99", 1);
+    assertStdOutContains(stdOutErrPair, "stock_ticks_mor incremental count: 
99", 1);
+    assertStdOutContains(stdOutErrPair, "stock_ticks_mor_bs incremental count: 
99", 1);
+    assertStdOutContains(stdOutErrPair, "|GOOG  |2018-08-31 10:59:00|9021  
|1227.1993|1227.215|", 4);
   }
 
   private void scheduleAndRunCompaction() throws Exception {

Reply via email to