This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 28b1989d07f [HUDI-8619] Fix stream source parameters (#12415)
28b1989d07f is described below

commit 28b1989d07f99150416fc031c7e74e006d7faa0c
Author: Lin Liu <[email protected]>
AuthorDate: Tue Dec 3 17:54:34 2024 -0800

    [HUDI-8619] Fix stream source parameters (#12415)
    
    * Fix stream source parameters
    
    * Add default value for some tests
---
 .../main/scala/org/apache/hudi/DefaultSource.scala   | 20 +++++++++++---------
 .../apache/hudi/functional/TestStreamingSource.scala |  3 ++-
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 1b3b8e2caed..4044c01db4a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.log.InstantRange.RangeType
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
+import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, 
WRITE_TABLE_VERSION}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.io.storage.HoodieSparkIOFactory
@@ -244,23 +244,25 @@ class DefaultSource extends RelationProvider
     // Check if the streaming table read version is set. If set, use the 
corresponding source
     // which uses the version corresponding to IncrementalRelation to read the 
data. And, also
     // does the checkpoint management based on the version.
+    val targetTableVersion: Integer = Integer.parseInt(
+      parameters.getOrElse(WRITE_TABLE_VERSION.key, 
WRITE_TABLE_VERSION.defaultValue.toString))
     if (SparkConfigUtils.containsConfigProperty(parameters, 
STREAMING_READ_TABLE_VERSION)) {
-      val writeTableVersion = 
Integer.parseInt(parameters(STREAMING_READ_TABLE_VERSION.key))
-      if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+      val sourceTableVersion = 
Integer.parseInt(parameters(STREAMING_READ_TABLE_VERSION.key))
+      if (sourceTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
         new HoodieStreamSourceV2(
-          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
       } else {
         new HoodieStreamSourceV1(
-          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
       }
     } else {
-      val writeTableVersion = 
metaClient.getTableConfig.getTableVersion.versionCode()
-      if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+      val sourceTableVersion = 
metaClient.getTableConfig.getTableVersion.versionCode()
+      if (sourceTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
         new HoodieStreamSourceV2(
-          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
       } else {
         new HoodieStreamSourceV1(
-          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 84c0c3baab5..aeef270ad76 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -23,7 +23,7 @@ import 
org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
 import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.config.HoodieCompactionConfig
-import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, 
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, 
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE, 
WRITE_TABLE_VERSION}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.util.JavaConversions
 import org.apache.spark.sql.streaming.StreamTest
@@ -271,6 +271,7 @@ class TestStreamingSource extends StreamTest {
         val df = spark.readStream
           .format("org.apache.hudi")
           .option(START_OFFSET.key, startTimestamp)
+          .option(WRITE_TABLE_VERSION.key, 
HoodieTableVersion.current().versionCode().toString)
           .option(STREAMING_READ_TABLE_VERSION.key, 
streamingReadTableVersion.toString)
           .load(tablePath)
           .select("id", "name", "price", "ts")

Reply via email to