This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b70d75c001 [HUDI-8619] Use new config for streaming source table
version (#12401)
8b70d75c001 is described below
commit 8b70d75c001606b29b5ad088baef9db017c69e17
Author: Lin Liu <[email protected]>
AuthorDate: Mon Dec 2 20:52:24 2024 -0800
[HUDI-8619] Use new config for streaming source table version (#12401)
* Create V1 and V2 stream sources
Ensure correct completion time is used in V1 and V2 sources
Update utilize the checkpoint translation
Comments
Fix logical problem
Add test
* fix test and source creation
* Add separate config
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 5 +++++
.../src/main/scala/org/apache/hudi/DefaultSource.scala | 11 +++++------
.../spark/sql/hudi/streaming/HoodieStreamSourceV2.scala | 1 -
.../org/apache/hudi/functional/TestStreamingSource.scala | 9 ++++-----
4 files changed, 14 insertions(+), 12 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 622367c581f..c4b419b5285 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -120,6 +120,11 @@ object DataSourceReadOptions {
+ "completion_time <= END_COMMIT are fetched out. "
+ "Point in time type queries make more sense with begin and end
completion times specified.")
+ val STREAMING_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.read.streaming.table.version")
+ .noDefaultValue()
+ .withDocumentation("The table version assumed for streaming read")
+
val INCREMENTAL_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.table.version")
.noDefaultValue()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 034914f8318..1b3b8e2caed 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig,
TypedProperties}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.model.WriteConcurrencyMode
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
@@ -36,8 +36,7 @@ import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.util.{PathUtils, SparkConfigUtils}
-
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit,
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit,
HoodieStreamSourceV1, HoodieStreamSourceV2}
@@ -242,11 +241,11 @@ class DefaultSource extends RelationProvider
}
val metaClient = HoodieTableMetaClient.builder()
.setConf(storageConf.newInstance()).setBasePath(tablePath.toString).build()
- // Check if the incremental table read version is set. If set, use the
corresponding source
+ // Check if the streaming table read version is set. If set, use the
corresponding source
// which uses the version corresponding to IncrementalRelation to read the
data. And, also
// does the checkpoint management based on the version.
- if (SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
- val writeTableVersion =
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+ if (SparkConfigUtils.containsConfigProperty(parameters,
STREAMING_READ_TABLE_VERSION)) {
+ val writeTableVersion =
Integer.parseInt(parameters(STREAMING_READ_TABLE_VERSION.key))
if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
new HoodieStreamSourceV2(
sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index b1c8e6bf90d..f825aa7e186 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
* @param schemaOption
* @param parameters
*/
-// TODO(yihua): handle V1/V2 checkpoint
class HoodieStreamSourceV2(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
metadataPath: String,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index ccce19ea0b6..84c0c3baab5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -17,7 +17,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.DataSourceReadOptions.{INCREMENTAL_READ_TABLE_VERSION,
START_OFFSET}
+import org.apache.hudi.DataSourceReadOptions.{START_OFFSET,
STREAMING_READ_TABLE_VERSION}
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD,
RECORDKEY_FIELD}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
@@ -267,15 +267,14 @@ class TestStreamingSource extends StreamTest {
// If the request time is used, i.e., V1, then the second record is
included in the output.
// Otherwise, only third record in the output.
val startTimestamp = instants.get(1).requestedTime
-
- for (incrementalReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
+ for (streamingReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key, startTimestamp)
- .option(INCREMENTAL_READ_TABLE_VERSION.key,
incrementalReadTableVersion.toString)
+ .option(STREAMING_READ_TABLE_VERSION.key,
streamingReadTableVersion.toString)
.load(tablePath)
.select("id", "name", "price", "ts")
- val expectedRows = if (incrementalReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
+ val expectedRows = if (streamingReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
} else {
Seq(Row("3", "a1", "12", "002"))