This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b12be0d9e2d [HUDI-6132] Fixing checkpoint management for multiple 
streaming writers (#8558)
b12be0d9e2d is described below

commit b12be0d9e2d469e347d67b04584e281b48a634df
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 2 03:54:52 2023 -0700

    [HUDI-6132] Fixing checkpoint management for multiple streaming writers 
(#8558)
    
    Each writer updates the checkpoint in commit metadata
    with its own batchId info only. When checking to skip the
    current batch, we walk back in the timeline and find the
    current writer's last committed batchId. Also fixed bulk insert
    row writer path for checkpoint management with streaming writes.
---
 .../org/apache/hudi/common/util/CommitUtils.java   | 43 +++++++++++--
 .../main/java/org/apache/hudi/DataSourceUtils.java |  8 +++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  5 ++
 .../org/apache/hudi/HoodieStreamingSink.scala      | 72 ++++------------------
 .../hudi/functional/TestStructuredStreaming.scala  | 51 +++++++++++----
 5 files changed, 101 insertions(+), 78 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index a3ac8762f1a..05ec523bd2c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -29,11 +29,13 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +48,7 @@ public class CommitUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(CommitUtils.class);
   private static final String NULL_SCHEMA_STR = 
Schema.create(Schema.Type.NULL).toString();
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   /**
    * Gets the commit action type for given write operation and table type.
@@ -145,19 +148,24 @@ public class CommitUtils {
   /**
    * Process previous commits metadata in the timeline to determine the 
checkpoint given a checkpoint key.
    * NOTE: This is very similar in intent to 
DeltaSync#getLatestCommitMetadataWithValidCheckpointInfo except that
-   *       different deployment models (deltastreamer or spark structured 
streaming) could have different checkpoint keys.
+   * different deployment models (deltastreamer or spark structured streaming) 
could have different checkpoint keys.
    *
-   * @param timeline completed commits in active timeline.
+   * @param timeline      completed commits in active timeline.
    * @param checkpointKey the checkpoint key in the extra metadata of the 
commit.
+   * @param keyToLookup   key of interest for which checkpoint is looked up 
for.
    * @return An optional commit metadata with latest checkpoint.
    */
-  public static Option<HoodieCommitMetadata> 
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline, String 
checkpointKey) {
-    return (Option<HoodieCommitMetadata>) 
timeline.filterCompletedInstants().getReverseOrderedInstants().map(instant -> {
+  public static Option<String> 
getValidCheckpointForCurrentWriter(HoodieTimeline timeline, String 
checkpointKey,
+                                                                  String 
keyToLookup) {
+    return (Option<String>) 
timeline.getWriteTimeline().getReverseOrderedInstants().map(instant -> {
       try {
         HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
             .fromBytes(timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-        if (StringUtils.nonEmpty(commitMetadata.getMetadata(checkpointKey))) {
-          return Option.of(commitMetadata);
+        // process commits only with checkpoint entries
+        String checkpointValue = commitMetadata.getMetadata(checkpointKey);
+        if (StringUtils.nonEmpty(checkpointValue)) {
+          // return if checkpoint for "keyForLookup" exists.
+          return readCheckpointValue(checkpointValue, keyToLookup);
         } else {
           return Option.empty();
         }
@@ -166,4 +174,27 @@ public class CommitUtils {
       }
     }).filter(Option::isPresent).findFirst().orElse(Option.empty());
   }
+
+  public static Option<String> readCheckpointValue(String value, String id) {
+    try {
+      Map<String, String> checkpointMap = OBJECT_MAPPER.readValue(value, 
Map.class);
+      if (!checkpointMap.containsKey(id)) {
+        return Option.empty();
+      }
+      String checkpointVal = checkpointMap.get(id);
+      return Option.of(checkpointVal);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to parse checkpoint as map", e);
+    }
+  }
+
+  public static String getCheckpointValueAsString(String identifier, String 
batchId) {
+    try {
+      Map<String, String> checkpointMap = new HashMap<>();
+      checkpointMap.put(identifier, batchId);
+      return OBJECT_MAPPER.writeValueAsString(checkpointMap);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to parse checkpoint as map", e);
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 10a99e36681..9ce0ff2d44b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -58,6 +58,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString;
+
 /**
  * Utilities used throughout the data source.
  */
@@ -141,6 +143,12 @@ public class DataSourceUtils {
         }
       });
     }
+    if 
(properties.containsKey(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())) {
+      extraMetadataMap.put(HoodieStreamingSink.SINK_CHECKPOINT_KEY(),
+          
getCheckpointValueAsString(properties.getOrDefault(DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().key(),
+                  
DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().defaultValue()),
+              
properties.get(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())));
+    }
     return extraMetadataMap;
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 897fd3e81e1..acd0b2c2250 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -97,6 +97,11 @@ object HoodieSparkSqlWriter {
     ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
       .defaultValue(false)
 
+  /**
+   * For spark streaming use-cases, holds the batch Id.
+   */
+  val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id"
+
   private val log = LoggerFactory.getLogger(getClass)
   private var tableExists: Boolean = false
   private var asyncCompactionTriggerFnDefined: Boolean = false
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index c7538754a8c..ac0dbffe542 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -16,10 +16,8 @@
  */
 package org.apache.hudi
 
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
+import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
 import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, 
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -108,42 +106,23 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       return
     }
 
+
     // Override to use direct markers. In Structured streaming, timeline 
server is closed after
     // first micro-batch and subsequent micro-batches do not have timeline 
server running.
     // Thus, we can't use timeline-server-based markers.
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    updatedOptions = 
updatedOptions.updated(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID, 
batchId.toString)
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
           extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient, 
HoodieCommitMetadata] {
-
-            override def accept(metaClient: HoodieTableMetaClient,
-                                newCommitMetadata: HoodieCommitMetadata): Unit 
= {
-              getStreamIdentifier(options) match {
-                case Some(identifier) =>
-                  // Fetch the latestCommit with checkpoint Info again to 
avoid concurrency issue in multi-write scenario.
-                  val lastCheckpointCommitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
-                    metaClient.getActiveTimeline.getCommitsTimeline, 
SINK_CHECKPOINT_KEY)
-                  var checkpointString = ""
-                  if (lastCheckpointCommitMetadata.isPresent) {
-                    val lastCheckpoint = 
lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
-                    if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
-                      checkpointString = 
HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) + 
(identifier -> batchId.toString))
-                    } else {
-                      checkpointString = 
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
-                    }
-                  } else {
-                    checkpointString = 
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
-                  }
-
-                  newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, 
checkpointString)
-                case None =>
-                  // No op since keeping batch id in memory only.
-              }
+            override def accept(metaClient: HoodieTableMetaClient, 
newCommitMetadata: HoodieCommitMetadata): Unit = {
+              val identifier = 
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), 
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
+              newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY, 
CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
             }
           }))
       )
@@ -328,19 +307,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private def canSkipBatch(incomingBatchId: Long, operationType: String): 
Boolean = {
     if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
-      getStreamIdentifier(options) match {
-        case Some(identifier) =>
-          // get the latest checkpoint from the commit metadata to check if 
the microbatch has already been processed or not
-          val commitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
-            metaClient.get.getActiveTimeline.getCommitsTimeline, 
SINK_CHECKPOINT_KEY)
-          if (commitMetadata.isPresent) {
-            val lastCheckpoint = 
commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
-            if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
-              
HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId
 =>
-                latestCommittedBatchId = commitBatchId.toLong)
-            }
-          }
-        case None =>
+      val identifier = 
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), 
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
+      // get the latest checkpoint from the commit metadata to check if the 
microbatch has already been processed or not
+      val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter(
+        metaClient.get.getActiveTimeline.getWriteTimeline, 
SINK_CHECKPOINT_KEY, identifier)
+      if (lastCheckpoint.isPresent) {
+        latestCommittedBatchId = lastCheckpoint.get().toLong
       }
       latestCommittedBatchId >= incomingBatchId
     } else {
@@ -350,26 +322,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
   }
 }
 
-/**
- * SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of 
applicationId and queryId).
- * This is a util object to serialize/deserialize map to/from json.
- */
-object HoodieSinkCheckpoint {
-
-  lazy val mapper: ObjectMapper = {
-    val _mapper = JsonUtils.getObjectMapper
-    _mapper.registerModule(DefaultScalaModule)
-    _mapper
-  }
+object HoodieStreamingSink {
 
   // This constant serves as the checkpoint key for streaming sink so that 
each microbatch is processed exactly-once.
   val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
-
-  def toJson(checkpoint: Map[String, String]): String = {
-    mapper.writeValueAsString(checkpoint)
-  }
-
-  def fromJson(json: String): Map[String, String] = {
-    mapper.readValue(json, classOf[Map[String, String]])
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index ea3b4649735..a1e89760bd5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -19,10 +19,10 @@ package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
-import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.{FileSlice, HoodieTableType, 
WriteConcurrencyMode}
+import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{CollectionUtils, 
CommitUtils}
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.TableNotFoundException
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers, HoodieSinkCheckpoint}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
 import org.apache.spark.sql.types.StructType
@@ -284,7 +284,7 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       .start(destPath)
 
     query1.processAllAvailable()
-    val metaClient = HoodieTableMetaClient.builder
+    var metaClient = HoodieTableMetaClient.builder
       
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
 
     assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0")
@@ -313,16 +313,44 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
 
     assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
     assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")
+
+
+    inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+    val query3 = spark.readStream
+      .schema(schema)
+      .json(sourcePath)
+      .writeStream
+      .format("org.apache.hudi")
+      .options(commonOpts)
+      .outputMode(OutputMode.Append)
+      .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
+      .option("checkpointLocation", s"${basePath}/checkpoint1")
+      .start(destPath)
+
+    query3.processAllAvailable()
+    metaClient = HoodieTableMetaClient.builder
+      
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
+
+    assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "2")
+    assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
   }
 
   @Test
   def testStructuredStreamingForDefaultIdentifier(): Unit = {
-    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+    testStructuredStreamingInternal()
+  }
 
+  @Test
+  def testStructuredStreamingWithBulkInsert(): Unit = {
+    testStructuredStreamingInternal("bulk_insert")
+  }
+
+  def testStructuredStreamingInternal(operation : String = "upsert"): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
     val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     val schema = inputDF1.schema
-
     inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
 
     val query1 = spark.readStream
@@ -331,6 +359,7 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       .writeStream
       .format("org.apache.hudi")
       .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key(), operation)
       .outputMode(OutputMode.Append)
       .option("checkpointLocation", s"$basePath/checkpoint1")
       .start(destPath)
@@ -343,17 +372,13 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
     query1.stop()
   }
 
-    def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
+  def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
                                         identifier: String,
                                         expectBatchId: String): Unit = {
     metaClient.reloadActiveTimeline()
-    val lastCheckpointCommitMetadata = 
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
-      metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
-
-    assertTrue(lastCheckpointCommitMetadata.isPresent)
-    val checkpointMap = 
HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))
-
-    assertEquals(expectBatchId, checkpointMap.get(identifier).orNull)
+    val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter(
+      metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY, 
identifier)
+    assertEquals(lastCheckpoint.get(), expectBatchId)
   }
 
   def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, tableType: HoodieTableType,

Reply via email to