This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b6688944e7 [HUDI-4432] Checkpoint management for muti-writer scenario 
(#7383)
2b6688944e7 is described below

commit 2b6688944e7ddde3e2dbe5f8a2dfe029a610c926
Author: RexAn <[email protected]>
AuthorDate: Wed Dec 14 00:14:30 2022 +0800

    [HUDI-4432] Checkpoint management for muti-writer scenario (#7383)
    
    * Checkpoint management for muti-writer scenario
    
    * Remove old logic
    
    * Add sinceVersion to config and minor changes
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 27 ++++++--
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  7 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  7 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |  6 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  | 10 +++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 32 +++++----
 .../org/apache/hudi/HoodieStreamingSink.scala      | 78 ++++++++++++++--------
 .../hudi/functional/TestStructuredStreaming.scala  | 74 +++++++++++++++++++-
 8 files changed, 184 insertions(+), 57 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index cb7eb1147ca..1fcaff04559 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.client;
 
-import com.codahale.metrics.Timer;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.async.AsyncArchiveService;
 import org.apache.hudi.async.AsyncCleanerService;
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -99,6 +95,11 @@ import 
org.apache.hudi.table.action.savepoint.SavepointHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
 import org.apache.hudi.table.upgrade.UpgradeDowngrade;
+
+import com.codahale.metrics.Timer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -111,6 +112,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -209,16 +211,24 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     return commit(instantTime, writeStatuses, extraMetadata, actionType, 
Collections.emptyMap());
   }
 
+  public boolean commit(String instantTime, O writeStatuses, 
Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds) {
+    return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds,
+        Option.empty());
+  }
+
   public abstract boolean commit(String instantTime, O writeStatuses, 
Option<Map<String, String>> extraMetadata,
-                                 String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds);
+                                 String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds,
+                                 Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc);
 
   public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
                              String commitActionType) {
-    return commitStats(instantTime, stats, extraMetadata, commitActionType, 
Collections.emptyMap());
+    return commitStats(instantTime, stats, extraMetadata, commitActionType, 
Collections.emptyMap(), Option.empty());
   }
 
   public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
-                             String commitActionType, Map<String, 
List<String>> partitionToReplaceFileIds) {
+                             String commitActionType, Map<String, 
List<String>> partitionToReplaceFileIds,
+                             Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
     // Skip the empty commit if not allowed
     if (!config.allowEmptyCommit() && stats.isEmpty()) {
       return true;
@@ -234,6 +244,9 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
         lastCompletedTxnAndMetadata.isPresent() ? 
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
     try {
       preCommit(inflightInstant, metadata);
+      if (extraPreCommitFunc.isPresent()) {
+        extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
+      }
       commit(table, commitActionType, instantTime, metadata, stats);
       // already within lock, and so no lock requried for archival
       postCommit(table, metadata, instantTime, extraMetadata, false);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index cacf8ddb834..5a7150687ac 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -74,6 +74,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /**
@@ -115,7 +116,9 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  public boolean commit(String instantTime, List<WriteStatus> writeStatuses, 
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds) {
+  public boolean commit(String instantTime, List<WriteStatus> writeStatuses, 
Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
     List<HoodieWriteStat> writeStats = 
writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
     // for eager flush, multiple write stat may share one file path.
     List<HoodieWriteStat> merged = writeStats.stream()
@@ -123,7 +126,7 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
         .values().stream()
         .map(duplicates -> 
duplicates.stream().reduce(WriteStatMerger::merge).get())
         .collect(Collectors.toList());
-    return commitStats(instantTime, merged, extraMetadata, commitActionType, 
partitionToReplacedFileIds);
+    return commitStats(instantTime, merged, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index b6951bc6b78..1f205d0e823 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@@ -82,9 +83,11 @@ public class HoodieJavaWriteClient<T extends 
HoodieRecordPayload> extends
                         List<WriteStatus> writeStatuses,
                         Option<Map<String, String>> extraMetadata,
                         String commitActionType,
-                        Map<String, List<String>> partitionToReplacedFileIds) {
+                        Map<String, List<String>> partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
     List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds);
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds,
+        extraPreCommitFunc);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 54a8607ee47..9734db6cd6a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -71,6 +71,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 @SuppressWarnings("checkstyle:LineLength")
@@ -120,10 +121,11 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
    */
   @Override
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
-                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds) {
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
     List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds);
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index c3825e3426c..8e5bb9aad04 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -434,6 +434,16 @@ object DataSourceWriteOptions {
       + " within a streaming microbatch. Turning this on, could hide the write 
status errors while the spark checkpoint moves ahead." +
       "So, would recommend users to use this with caution.")
 
+  val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.write.streaming.checkpoint.identifier")
+    .noDefaultValue()
+    .sinceVersion("0.13.0")
+    .withDocumentation("A stream identifier used for HUDI to fetch the right 
checkpoint(`batch id` to be more specific) "
+      + "corresponding this writer. Please note that keep the identifier an 
unique value for different writer "
+      + "if under multi-writer scenario. If the value is not set, will only 
keep the checkpoint info in the memory. "
+      + "This could introduce the potential issue that the job is 
restart(`batch id` is lost) while spark checkpoint write fails, "
+      + "causing spark will retry and rewrite the data.")
+
   val META_SYNC_CLIENT_TOOL_CLASS_NAME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.meta.sync.client.tool.class")
     .defaultValue(classOf[HiveSyncTool].getName)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f761090d492..93c55b7b676 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -62,6 +62,7 @@ import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{SPARK_VERSION, SparkContext}
 
+import java.util.function.BiConsumer
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters.setAsJavaSetConverter
 import scala.collection.mutable
@@ -81,7 +82,8 @@ object HoodieSparkSqlWriter {
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
             hoodieWriteClient: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
             asyncCompactionTriggerFn: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = 
Option.empty,
-            asyncClusteringTriggerFn: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = 
Option.empty)
+            asyncClusteringTriggerFn: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = 
Option.empty,
+            extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, 
HoodieCommitMetadata]] = Option.empty)
   : (Boolean, common.util.Option[String], common.util.Option[String], 
common.util.Option[String],
     SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
 
@@ -183,7 +185,7 @@ object HoodieSparkSqlWriter {
           
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
           
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
           .initTable(sparkContext.hadoopConfiguration, path)
-        }
+      }
       tableConfig = tableMetaClient.getTableConfig
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
@@ -203,14 +205,14 @@ object HoodieSparkSqlWriter {
 
       val sourceSchema = convertStructTypeToAvroSchema(df.schema, 
avroRecordName, avroRecordNamespace)
       val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, 
tableMetaClient).orElse {
-          // In case we need to reconcile the schema and schema evolution is 
enabled,
-          // we will force-apply schema evolution to the writer's schema
-          if (shouldReconcileSchema && 
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
 {
-            val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
-            
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
 allowOperationMetaDataField)))
-          } else {
-            None
-          }
+        // In case we need to reconcile the schema and schema evolution is 
enabled,
+        // we will force-apply schema evolution to the writer's schema
+        if (shouldReconcileSchema && 
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
 {
+          val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
+          
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
 allowOperationMetaDataField)))
+        } else {
+          None
+        }
       }
 
       // NOTE: Target writer's schema is deduced based on
@@ -381,7 +383,7 @@ object HoodieSparkSqlWriter {
       val (writeSuccessful, compactionInstant, clusteringInstant) =
         commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
           writeResult, parameters, writeClient, tableConfig, jsc,
-          TableInstantInfo(basePath, instantTime, commitActionType, operation))
+          TableInstantInfo(basePath, instantTime, commitActionType, 
operation), extraPreCommitFn)
 
       (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, clusteringInstant, writeClient, tableConfig)
     }
@@ -570,7 +572,7 @@ object HoodieSparkSqlWriter {
   def getLatestTableInternalSchema(config: HoodieConfig,
                                    tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
     if 
(!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
-       Option.empty[InternalSchema]
+      Option.empty[InternalSchema]
     } else {
       try {
         val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
@@ -887,7 +889,8 @@ object HoodieSparkSqlWriter {
                                              client: 
SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
                                              tableConfig: HoodieTableConfig,
                                              jsc: JavaSparkContext,
-                                             tableInstantInfo: TableInstantInfo
+                                             tableInstantInfo: 
TableInstantInfo,
+                                             extraPreCommitFn: 
Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]]
                                             ): (Boolean, 
common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
     if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 
0) {
       log.info("Proceeding to commit the write.")
@@ -897,7 +900,8 @@ object HoodieSparkSqlWriter {
         client.commit(tableInstantInfo.instantTime, 
writeResult.getWriteStatuses,
           common.util.Option.of(new java.util.HashMap[String, 
String](mapAsJavaMap(metaMap))),
           tableInstantInfo.commitActionType,
-          writeResult.getPartitionToReplaceFileIds)
+          writeResult.getPartitionToReplaceFileIds,
+          common.util.Option.ofNullable(extraPreCommitFn.orNull))
 
       if (commitSuccess) {
         log.info("Commit " + tableInstantInfo.instantTime + " successful!")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 23b79a5ed41..ae50e3c56c1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -16,20 +16,20 @@
  */
 package org.apache.hudi
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
 import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, 
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.HoodieRecordPayload
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload}
 import org.apache.hudi.common.table.marker.MarkerType
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, 
CompactionUtils, StringUtils}
+import org.apache.hudi.common.util.ValidationUtils.{checkArgument, checkState}
+import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, 
CompactionUtils, JsonUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.{HoodieCorruptedDataException, 
HoodieException, TableNotFoundException}
 import org.apache.log4j.LogManager
@@ -39,7 +39,7 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
 import java.lang
-import java.util.function.Function
+import java.util.function.{BiConsumer, Function}
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
 
@@ -75,8 +75,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
   private val ignoreFailedBatch = 
options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,
     STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
-  // This constant serves as the checkpoint key for streaming sink so that 
each microbatch is processed exactly-once.
-  private val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
 
   private var isAsyncCompactorServiceShutdownAbnormally = false
   private var isAsyncClusteringServiceShutdownAbnormally = false
@@ -115,18 +113,38 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
-    // Add batchId as checkpoint to the extra metadata. To enable same 
checkpoint metadata structure for multi-writers,
-    // SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed 
of applicationId and queryId), e.g.
-    // "_hudi_streaming_sink_checkpoint" : 
"{\"$batchId\":\"${sqlContext.sparkContext.applicationId}-$queryId\"}"
-    // NOTE: In case of multi-writers, this map should be mutable and sorted 
by key to facilitate merging of batchIds.
-    //       HUDI-4432 tracks the implementation of checkpoint management for 
multi-writer.
-    val checkpointMap = Map(batchId.toString -> 
s"${sqlContext.sparkContext.applicationId}-$queryId")
-    updatedOptions = updatedOptions.updated(SINK_CHECKPOINT_KEY, 
HoodieSinkCheckpoint.toJson(checkpointMap))
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
-          sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
+          sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
+          extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, 
HoodieCommitMetadata] {
+
+            override def accept(metaClient: HoodieTableMetaClient,
+                                newCommitMetadata: HoodieCommitMetadata): Unit 
= {
+              options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+                case Some(identifier) =>
+                  // Fetch the latestCommit with checkpoint Info again to 
avoid concurrency issue in multi-write scenario.
+                  val lastCheckpointCommitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
+                    metaClient.getActiveTimeline.getCommitsTimeline, 
SINK_CHECKPOINT_KEY)
+                  var checkpointString = ""
+                  if (lastCheckpointCommitMetadata.isPresent) {
+                    val lastCheckpoint = 
lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
+                    if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+                      checkpointString = 
HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) + 
(identifier -> batchId.toString))
+                    } else {
+                      checkpointString = 
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
+                    }
+                  } else {
+                    checkpointString = 
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
+                  }
+
+                  newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, 
checkpointString)
+                case None =>
+                  // No op since keeping batch id in memory only.
+              }
+            }
+          }))
       )
       match {
         case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
@@ -298,14 +316,19 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private def canSkipBatch(incomingBatchId: Long, operationType: String): 
Boolean = {
     if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
-      // get the latest checkpoint from the commit metadata to check if the 
microbatch has already been prcessed or not
-      val commitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
-        metaClient.get.getActiveTimeline.getCommitsTimeline, 
SINK_CHECKPOINT_KEY)
-      if (commitMetadata.isPresent) {
-        val lastCheckpoint = 
commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
-        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
-          latestCommittedBatchId = 
HoodieSinkCheckpoint.fromJson(lastCheckpoint).keys.head.toLong
-        }
+      options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+        case Some(identifier) =>
+          // get the latest checkpoint from the commit metadata to check if 
the microbatch has already been prcessed or not
+          val commitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
+            metaClient.get.getActiveTimeline.getCommitsTimeline, 
SINK_CHECKPOINT_KEY)
+          if (commitMetadata.isPresent) {
+            val lastCheckpoint = 
commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
+            if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+              
HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId
 =>
+                latestCommittedBatchId = commitBatchId.toLong)
+            }
+          }
+        case None =>
       }
       latestCommittedBatchId >= incomingBatchId
     } else {
@@ -322,13 +345,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 object HoodieSinkCheckpoint {
 
   lazy val mapper: ObjectMapper = {
-    val _mapper = new ObjectMapper
-    _mapper.setSerializationInclusion(Include.NON_ABSENT)
-    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    val _mapper = JsonUtils.getObjectMapper
     _mapper.registerModule(DefaultScalaModule)
     _mapper
   }
 
+  // This constant serves as the checkpoint key for streaming sink so that 
each microbatch is processed exactly-once.
+  val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
+
   def toJson(checkpoint: Map[String, String]): String = {
     mapper.writeValueAsString(checkpoint)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 1382bafb762..d4651f9ab78 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -18,22 +18,24 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
+import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
 import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestTable}
-import org.apache.hudi.common.util.CollectionUtils
+import org.apache.hudi.common.util.{CollectionUtils, CommitUtils}
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieStorageConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.TableNotFoundException
 import org.apache.hudi.testutils.HoodieClientTestBase
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers, HoodieSinkCheckpoint}
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 
@@ -257,6 +259,72 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
     structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, 
isAsyncCompaction)
   }
 
+  @Test
+  def testStructuredStreamingWithCheckpoint(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    val schema = inputDF1.schema
+
+    inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+    val query1 = spark.readStream
+      .schema(schema)
+      .json(sourcePath)
+      .writeStream
+      .format("org.apache.hudi")
+      .options(commonOpts)
+      .outputMode(OutputMode.Append)
+      .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
+      .option("checkpointLocation", s"${basePath}/checkpoint1")
+      .start(destPath)
+
+    query1.processAllAvailable()
+    val metaClient = HoodieTableMetaClient.builder
+      
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
+
+    assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0")
+
+    // Add another identifier checkpoint info to the commit.
+    val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+
+    inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+    val query2 = spark.readStream
+      .schema(schema)
+      .json(sourcePath)
+      .writeStream
+      .format("org.apache.hudi")
+      .options(commonOpts)
+      .outputMode(OutputMode.Append)
+      .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier2")
+      .option("checkpointLocation", s"${basePath}/checkpoint2")
+      .start(destPath)
+    query2.processAllAvailable()
+    query1.processAllAvailable()
+
+    query1.stop()
+    query2.stop()
+
+    assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
+    assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")
+  }
+
+  def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
+                                        identifier: String,
+                                        expectBatchId: String): Unit = {
+    metaClient.reloadActiveTimeline()
+    val lastCheckpointCommitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
+      metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
+
+    assertTrue(lastCheckpointCommitMetadata.isPresent)
+    val checkpointMap = 
HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))
+
+    assertEquals(checkpointMap.get(identifier).orNull, expectBatchId)
+  }
+
   def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, tableType: HoodieTableType,
                                                  isInlineClustering: Boolean, 
isAsyncClustering: Boolean,
                                                  partitionOfRecords: String, 
checkClusteringResult: String => Unit): Unit = {

Reply via email to