This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b70d75c001 [HUDI-8619] Use new config for streaming source table 
version (#12401)
8b70d75c001 is described below

commit 8b70d75c001606b29b5ad088baef9db017c69e17
Author: Lin Liu <[email protected]>
AuthorDate: Mon Dec 2 20:52:24 2024 -0800

    [HUDI-8619] Use new config for streaming source table version (#12401)
    
    * Create V1 and V2 stream sources
    
    Ensure correct completion time is used in V1 and V2 sources
    
    Update utilize the checkpoint translation
    
    Comments
    
    Fix logical problem
    
    Add test
    
    * fix test and source creation
    
    * Add separate config
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../src/main/scala/org/apache/hudi/DataSourceOptions.scala    |  5 +++++
 .../src/main/scala/org/apache/hudi/DefaultSource.scala        | 11 +++++------
 .../spark/sql/hudi/streaming/HoodieStreamSourceV2.scala       |  1 -
 .../org/apache/hudi/functional/TestStreamingSource.scala      |  9 ++++-----
 4 files changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 622367c581f..c4b419b5285 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -120,6 +120,11 @@ object DataSourceReadOptions {
       + "completion_time <= END_COMMIT are fetched out. "
       + "Point in time type queries make more sense with begin and end 
completion times specified.")
 
+  val STREAMING_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.read.streaming.table.version")
+    .noDefaultValue()
+    .withDocumentation("The table version assumed for streaming read")
+
   val INCREMENTAL_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.incr.table.version")
     .noDefaultValue()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 034914f8318..1b3b8e2caed 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig, 
TypedProperties}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model.WriteConcurrencyMode
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
@@ -36,8 +36,7 @@ import org.apache.hudi.io.storage.HoodieSparkIOFactory
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.util.{PathUtils, SparkConfigUtils}
-
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, 
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, 
HoodieStreamSourceV1, HoodieStreamSourceV2}
@@ -242,11 +241,11 @@ class DefaultSource extends RelationProvider
     }
     val metaClient = HoodieTableMetaClient.builder()
       
.setConf(storageConf.newInstance()).setBasePath(tablePath.toString).build()
-    // Check if the incremental table read version is set. If set, use the 
corresponding source
+    // Check if the streaming table read version is set. If set, use the 
corresponding source
     // which uses the version corresponding to IncrementalRelation to read the 
data. And, also
     // does the checkpoint management based on the version.
-    if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
-      val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+    if (SparkConfigUtils.containsConfigProperty(parameters, 
STREAMING_READ_TABLE_VERSION)) {
+      val writeTableVersion = 
Integer.parseInt(parameters(STREAMING_READ_TABLE_VERSION.key))
       if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
         new HoodieStreamSourceV2(
           sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index b1c8e6bf90d..f825aa7e186 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
   * @param schemaOption
   * @param parameters
   */
-// TODO(yihua): handle V1/V2 checkpoint
 class HoodieStreamSourceV2(sqlContext: SQLContext,
                            metaClient: HoodieTableMetaClient,
                            metadataPath: String,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index ccce19ea0b6..84c0c3baab5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.DataSourceReadOptions.{INCREMENTAL_READ_TABLE_VERSION, 
START_OFFSET}
+import org.apache.hudi.DataSourceReadOptions.{START_OFFSET, 
STREAMING_READ_TABLE_VERSION}
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
@@ -267,15 +267,14 @@ class TestStreamingSource extends StreamTest {
       // If the request time is used, i.e., V1, then the second record is 
included in the output.
       // Otherwise, only third record in the output.
       val startTimestamp = instants.get(1).requestedTime
-
-      for (incrementalReadTableVersion <- 
List(HoodieTableVersion.SIX.versionCode(), 
HoodieTableVersion.EIGHT.versionCode())) {
+      for (streamingReadTableVersion <- 
List(HoodieTableVersion.SIX.versionCode(), 
HoodieTableVersion.EIGHT.versionCode())) {
         val df = spark.readStream
           .format("org.apache.hudi")
           .option(START_OFFSET.key, startTimestamp)
-          .option(INCREMENTAL_READ_TABLE_VERSION.key, 
incrementalReadTableVersion.toString)
+          .option(STREAMING_READ_TABLE_VERSION.key, 
streamingReadTableVersion.toString)
           .load(tablePath)
           .select("id", "name", "price", "ts")
-        val expectedRows = if (incrementalReadTableVersion == 
HoodieTableVersion.EIGHT.versionCode()) {
+        val expectedRows = if (streamingReadTableVersion == 
HoodieTableVersion.EIGHT.versionCode()) {
           Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
         } else {
           Seq(Row("3", "a1", "12", "002"))

Reply via email to