This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b12be0d9e2d [HUDI-6132] Fixing checkpoint management for multiple
streaming writers (#8558)
b12be0d9e2d is described below
commit b12be0d9e2d469e347d67b04584e281b48a634df
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 2 03:54:52 2023 -0700
[HUDI-6132] Fixing checkpoint management for multiple streaming writers
(#8558)
Each writer updates the checkpoint in commit metadata
with its own batchId info only. When checking to skip the
current batch, we walk back in the timeline and find the
current writer's last committed batchId. Also fixed bulk insert
row writer path for checkpoint management with streaming writes.
---
.../org/apache/hudi/common/util/CommitUtils.java | 43 +++++++++++--
.../main/java/org/apache/hudi/DataSourceUtils.java | 8 +++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 ++
.../org/apache/hudi/HoodieStreamingSink.scala | 72 ++++------------------
.../hudi/functional/TestStructuredStreaming.scala | 51 +++++++++++----
5 files changed, 101 insertions(+), 78 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index a3ac8762f1a..05ec523bd2c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -29,11 +29,13 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -46,6 +48,7 @@ public class CommitUtils {
private static final Logger LOG = LoggerFactory.getLogger(CommitUtils.class);
private static final String NULL_SCHEMA_STR =
Schema.create(Schema.Type.NULL).toString();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Gets the commit action type for given write operation and table type.
@@ -145,19 +148,24 @@ public class CommitUtils {
/**
* Process previous commits metadata in the timeline to determine the
checkpoint given a checkpoint key.
* NOTE: This is very similar in intent to
DeltaSync#getLatestCommitMetadataWithValidCheckpointInfo except that
- * different deployment models (deltastreamer or spark structured
streaming) could have different checkpoint keys.
+ * different deployment models (deltastreamer or spark structured streaming)
could have different checkpoint keys.
*
- * @param timeline completed commits in active timeline.
+ * @param timeline completed commits in active timeline.
* @param checkpointKey the checkpoint key in the extra metadata of the
commit.
+ * @param keyToLookup key of interest for which checkpoint is looked up
for.
* @return An optional commit metadata with latest checkpoint.
*/
- public static Option<HoodieCommitMetadata>
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline, String
checkpointKey) {
- return (Option<HoodieCommitMetadata>)
timeline.filterCompletedInstants().getReverseOrderedInstants().map(instant -> {
+ public static Option<String>
getValidCheckpointForCurrentWriter(HoodieTimeline timeline, String
checkpointKey,
+ String
keyToLookup) {
+ return (Option<String>)
timeline.getWriteTimeline().getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- if (StringUtils.nonEmpty(commitMetadata.getMetadata(checkpointKey))) {
- return Option.of(commitMetadata);
+ // process commits only with checkpoint entries
+ String checkpointValue = commitMetadata.getMetadata(checkpointKey);
+ if (StringUtils.nonEmpty(checkpointValue)) {
+ // return if checkpoint for "keyForLookup" exists.
+ return readCheckpointValue(checkpointValue, keyToLookup);
} else {
return Option.empty();
}
@@ -166,4 +174,27 @@ public class CommitUtils {
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
+
+ public static Option<String> readCheckpointValue(String value, String id) {
+ try {
+ Map<String, String> checkpointMap = OBJECT_MAPPER.readValue(value,
Map.class);
+ if (!checkpointMap.containsKey(id)) {
+ return Option.empty();
+ }
+ String checkpointVal = checkpointMap.get(id);
+ return Option.of(checkpointVal);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to parse checkpoint as map", e);
+ }
+ }
+
+ public static String getCheckpointValueAsString(String identifier, String
batchId) {
+ try {
+ Map<String, String> checkpointMap = new HashMap<>();
+ checkpointMap.put(identifier, batchId);
+ return OBJECT_MAPPER.writeValueAsString(checkpointMap);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to parse checkpoint as map", e);
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 10a99e36681..9ce0ff2d44b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -58,6 +58,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString;
+
/**
* Utilities used throughout the data source.
*/
@@ -141,6 +143,12 @@ public class DataSourceUtils {
}
});
}
+ if
(properties.containsKey(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())) {
+ extraMetadataMap.put(HoodieStreamingSink.SINK_CHECKPOINT_KEY(),
+
getCheckpointValueAsString(properties.getOrDefault(DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().key(),
+
DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER().defaultValue()),
+
properties.get(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID())));
+ }
return extraMetadataMap;
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 897fd3e81e1..acd0b2c2250 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -97,6 +97,11 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)
+ /**
+ * For spark streaming use-cases, holds the batch Id.
+ */
+ val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id"
+
private val log = LoggerFactory.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index c7538754a8c..ac0dbffe542 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -16,10 +16,8 @@
*/
package org.apache.hudi
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
+import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService,
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -108,42 +106,23 @@ class HoodieStreamingSink(sqlContext: SQLContext,
return
}
+
// Override to use direct markers. In Structured streaming, timeline
server is closed after
// first micro-batch and subsequent micro-batches do not have timeline
server running.
// Thus, we can't use timeline-server-based markers.
var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.DIRECT.name())
// we need auto adjustment enabled for streaming sink since async table
services are feasible within the same JVM.
updatedOptions =
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+ updatedOptions =
updatedOptions.updated(HoodieSparkSqlWriter.SPARK_STREAMING_BATCH_ID,
batchId.toString)
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata] {
-
- override def accept(metaClient: HoodieTableMetaClient,
- newCommitMetadata: HoodieCommitMetadata): Unit
= {
- getStreamIdentifier(options) match {
- case Some(identifier) =>
- // Fetch the latestCommit with checkpoint Info again to
avoid concurrency issue in multi-write scenario.
- val lastCheckpointCommitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
- metaClient.getActiveTimeline.getCommitsTimeline,
SINK_CHECKPOINT_KEY)
- var checkpointString = ""
- if (lastCheckpointCommitMetadata.isPresent) {
- val lastCheckpoint =
lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
- if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
- checkpointString =
HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) +
(identifier -> batchId.toString))
- } else {
- checkpointString =
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
- }
- } else {
- checkpointString =
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
- }
-
- newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
checkpointString)
- case None =>
- // No op since keeping batch id in memory only.
- }
+ override def accept(metaClient: HoodieTableMetaClient,
newCommitMetadata: HoodieCommitMetadata): Unit = {
+ val identifier =
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(),
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
+ newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
CommitUtils.getCheckpointValueAsString(identifier, String.valueOf(batchId)))
}
}))
)
@@ -328,19 +307,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private def canSkipBatch(incomingBatchId: Long, operationType: String):
Boolean = {
if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
- getStreamIdentifier(options) match {
- case Some(identifier) =>
- // get the latest checkpoint from the commit metadata to check if
the microbatch has already been processed or not
- val commitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
- metaClient.get.getActiveTimeline.getCommitsTimeline,
SINK_CHECKPOINT_KEY)
- if (commitMetadata.isPresent) {
- val lastCheckpoint =
commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
- if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
-
HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId
=>
- latestCommittedBatchId = commitBatchId.toLong)
- }
- }
- case None =>
+ val identifier =
options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(),
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue())
+ // get the latest checkpoint from the commit metadata to check if the
microbatch has already been processed or not
+ val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter(
+ metaClient.get.getActiveTimeline.getWriteTimeline,
SINK_CHECKPOINT_KEY, identifier)
+ if (lastCheckpoint.isPresent) {
+ latestCommittedBatchId = lastCheckpoint.get().toLong
}
latestCommittedBatchId >= incomingBatchId
} else {
@@ -350,26 +322,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
}
-/**
- * SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of
applicationId and queryId).
- * This is a util object to serialize/deserialize map to/from json.
- */
-object HoodieSinkCheckpoint {
-
- lazy val mapper: ObjectMapper = {
- val _mapper = JsonUtils.getObjectMapper
- _mapper.registerModule(DefaultScalaModule)
- _mapper
- }
+object HoodieStreamingSink {
// This constant serves as the checkpoint key for streaming sink so that
each microbatch is processed exactly-once.
val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
-
- def toJson(checkpoint: Map[String, String]): String = {
- mapper.writeValueAsString(checkpoint)
- }
-
- def fromJson(json: String): Map[String, String] = {
- mapper.readValue(json, classOf[Map[String, String]])
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index ea3b4649735..a1e89760bd5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -19,10 +19,10 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
-import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{FileSlice, HoodieTableType,
WriteConcurrencyMode}
+import org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{CollectionUtils,
CommitUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieSparkClientTestBase
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers, HoodieSinkCheckpoint}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.types.StructType
@@ -284,7 +284,7 @@ class TestStructuredStreaming extends
HoodieSparkClientTestBase {
.start(destPath)
query1.processAllAvailable()
- val metaClient = HoodieTableMetaClient.builder
+ var metaClient = HoodieTableMetaClient.builder
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0")
@@ -313,16 +313,44 @@ class TestStructuredStreaming extends
HoodieSparkClientTestBase {
assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")
+
+
+ inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+ val query3 = spark.readStream
+ .schema(schema)
+ .json(sourcePath)
+ .writeStream
+ .format("org.apache.hudi")
+ .options(commonOpts)
+ .outputMode(OutputMode.Append)
+ .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
+ .option("checkpointLocation", s"${basePath}/checkpoint1")
+ .start(destPath)
+
+ query3.processAllAvailable()
+ metaClient = HoodieTableMetaClient.builder
+
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
+
+ assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "2")
+ assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
}
@Test
def testStructuredStreamingForDefaultIdentifier(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+ testStructuredStreamingInternal()
+ }
+ @Test
+ def testStructuredStreamingWithBulkInsert(): Unit = {
+ testStructuredStreamingInternal("bulk_insert")
+ }
+
+ def testStructuredStreamingInternal(operation : String = "upsert"): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
val schema = inputDF1.schema
-
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
val query1 = spark.readStream
@@ -331,6 +359,7 @@ class TestStructuredStreaming extends
HoodieSparkClientTestBase {
.writeStream
.format("org.apache.hudi")
.options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key(), operation)
.outputMode(OutputMode.Append)
.option("checkpointLocation", s"$basePath/checkpoint1")
.start(destPath)
@@ -343,17 +372,13 @@ class TestStructuredStreaming extends
HoodieSparkClientTestBase {
query1.stop()
}
- def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
+ def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
identifier: String,
expectBatchId: String): Unit = {
metaClient.reloadActiveTimeline()
- val lastCheckpointCommitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
- metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
-
- assertTrue(lastCheckpointCommitMetadata.isPresent)
- val checkpointMap =
HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))
-
- assertEquals(expectBatchId, checkpointMap.get(identifier).orNull)
+ val lastCheckpoint = CommitUtils.getValidCheckpointForCurrentWriter(
+ metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY,
identifier)
+ assertEquals(lastCheckpoint.get(), expectBatchId)
}
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, tableType: HoodieTableType,