This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 28b1989d07f [HUDI-8619] Fix stream source parameters (#12415)
28b1989d07f is described below
commit 28b1989d07f99150416fc031c7e74e006d7faa0c
Author: Lin Liu <[email protected]>
AuthorDate: Tue Dec 3 17:54:34 2024 -0800
[HUDI-8619] Fix stream source parameters (#12415)
* Fix stream source parameters
* Add default value for some tests
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 20 +++++++++++---------
.../apache/hudi/functional/TestStreamingSource.scala | 3 ++-
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 1b3b8e2caed..4044c01db4a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
+import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE,
WRITE_TABLE_VERSION}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.io.storage.HoodieSparkIOFactory
@@ -244,23 +244,25 @@ class DefaultSource extends RelationProvider
// Check if the streaming table read version is set. If set, use the
corresponding source
// which uses the version corresponding to IncrementalRelation to read the
data. And, also
// does the checkpoint management based on the version.
+ val targetTableVersion: Integer = Integer.parseInt(
+ parameters.getOrElse(WRITE_TABLE_VERSION.key,
WRITE_TABLE_VERSION.defaultValue.toString))
if (SparkConfigUtils.containsConfigProperty(parameters,
STREAMING_READ_TABLE_VERSION)) {
- val writeTableVersion =
Integer.parseInt(parameters(STREAMING_READ_TABLE_VERSION.key))
- if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+ val sourceTableVersion =
Integer.parseInt(parameters(STREAMING_READ_TABLE_VERSION.key))
+ if (sourceTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
new HoodieStreamSourceV2(
- sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
} else {
new HoodieStreamSourceV1(
- sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
}
} else {
- val writeTableVersion =
metaClient.getTableConfig.getTableVersion.versionCode()
- if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+ val sourceTableVersion =
metaClient.getTableConfig.getTableVersion.versionCode()
+ if (sourceTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
new HoodieStreamSourceV2(
- sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
} else {
new HoodieStreamSourceV1(
- sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(targetTableVersion))
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 84c0c3baab5..aeef270ad76 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -23,7 +23,7 @@ import
org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.HoodieCompactionConfig
-import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE,
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE,
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE,
WRITE_TABLE_VERSION}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.util.JavaConversions
import org.apache.spark.sql.streaming.StreamTest
@@ -271,6 +271,7 @@ class TestStreamingSource extends StreamTest {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key, startTimestamp)
+ .option(WRITE_TABLE_VERSION.key,
HoodieTableVersion.current().versionCode().toString)
.option(STREAMING_READ_TABLE_VERSION.key,
streamingReadTableVersion.toString)
.load(tablePath)
.select("id", "name", "price", "ts")