This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8ccb80cb0521ee217d6cb7ae9869c3bf975be8b1 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Sat Jan 28 22:50:23 2023 -0800 [HUDI-5639] Fixing stream identifier for single writer with spark streaming ingest (#7783) --- .../scala/org/apache/hudi/DataSourceOptions.scala | 2 +- .../main/scala/org/apache/hudi/DefaultSource.scala | 20 +++++++++++---- .../org/apache/hudi/HoodieStreamingSink.scala | 20 ++++++++++++--- .../hudi/functional/TestStructuredStreaming.scala | 30 +++++++++++++++++++++- 4 files changed, 61 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 9ed04dae626..f3a169fc06e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -448,7 +448,7 @@ object DataSourceWriteOptions { val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.streaming.checkpoint.identifier") - .noDefaultValue() + .defaultValue("default_single_writer") .sinceVersion("0.13.0") .withDocumentation("A stream identifier used for HUDI to fetch the right checkpoint(`batch id` to be more specific) " + "corresponding this writer. Please note that keep the identifier an unique value for different writer " diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index d2a4372462d..42629789139 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -18,20 +18,19 @@ package org.apache.hudi import org.apache.hadoop.fs.Path - import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION} +import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER} import org.apache.hudi.cdc.CDCRelation import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieRecord, WriteConcurrencyMode} import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE import org.apache.hudi.exception.HoodieException import org.apache.hudi.util.PathUtils - import org.apache.log4j.LogManager - import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, HoodieStreamSource} @@ -40,6 +39,7 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} +import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ /** @@ -161,6 +161,7 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { + validateMultiWriterConfigs(optParams) new HoodieStreamingSink( sqlContext, optParams, @@ -168,6 +169,15 @@ class DefaultSource extends RelationProvider outputMode) } + def validateMultiWriterConfigs(options: Map[String, String]) : Unit = { + if (WriteConcurrencyMode.valueOf(options.getOrDefault(WRITE_CONCURRENCY_MODE.key(), + WRITE_CONCURRENCY_MODE.defaultValue())) == WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) { + // ensure some valid value is set for identifier + checkState(options.contains(STREAMING_CHECKPOINT_IDENTIFIER.key()), "For multi-writer scenarios, please set " + + STREAMING_CHECKPOINT_IDENTIFIER.key() + ". Each writer should set different values for this identifier") + } + } + override def shortName(): String = "hudi_v1" override def sourceSchema(sqlContext: SQLContext, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index ae50e3c56c1..9830d323081 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -23,14 +23,15 @@ import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload} +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload, WriteConcurrencyMode} +import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE import org.apache.hudi.common.table.marker.MarkerType import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ValidationUtils.{checkArgument, checkState} import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, JsonUtils, StringUtils} -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieLockConfig, HoodieWriteConfig} import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException} import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -122,7 +123,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, override def accept(metaClient: HoodieTableMetaClient, newCommitMetadata: HoodieCommitMetadata): Unit = { - options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match { + getStreamIdentifier(options) match { case Some(identifier) => // Fetch the latestCommit with checkpoint Info again to avoid concurrency issue in multi-write scenario. val lastCheckpointCommitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( @@ -225,6 +226,17 @@ class HoodieStreamingSink(sqlContext: SQLContext, } } + private def getStreamIdentifier(options: Map[String, String]) : Option[String] = { + if (WriteConcurrencyMode.valueOf(options.getOrDefault(WRITE_CONCURRENCY_MODE.key(), + WRITE_CONCURRENCY_MODE.defaultValue())) == WriteConcurrencyMode.SINGLE_WRITER) { + // for single writer model, we will fetch default if not set. + Some(options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())) + } else { + // incase of multi-writer scenarios, there is not default. + options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) + } + } + override def toString: String = s"HoodieStreamingSink[${options("path")}]" @annotation.tailrec @@ -316,7 +328,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = { if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) { - options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match { + getStreamIdentifier(options) match { case Some(identifier) => // get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index a26858dabb4..1e3356a9583 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -315,7 +315,35 @@ class TestStructuredStreaming extends HoodieClientTestBase { assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1") } - def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient, + @Test + def testStructuredStreamingForDefaultIdentifier(): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + + val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + val schema = inputDF1.schema + + inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + + val query1 = spark.readStream + .schema(schema) + .json(sourcePath) + .writeStream + .format("org.apache.hudi") + .options(commonOpts) + .outputMode(OutputMode.Append) + .option("checkpointLocation", s"$basePath/checkpoint1") + .start(destPath) + + query1.processAllAvailable() + val metaClient = HoodieTableMetaClient.builder + .setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build + + assertLatestCheckpointInfoMatched(metaClient, STREAMING_CHECKPOINT_IDENTIFIER.defaultValue(), "0") + query1.stop() + } + + def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient, identifier: String, expectBatchId: String): Unit = { metaClient.reloadActiveTimeline()
