This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b6688944e7 [HUDI-4432] Checkpoint management for muti-writer scenario
(#7383)
2b6688944e7 is described below
commit 2b6688944e7ddde3e2dbe5f8a2dfe029a610c926
Author: RexAn <[email protected]>
AuthorDate: Wed Dec 14 00:14:30 2022 +0800
[HUDI-4432] Checkpoint management for muti-writer scenario (#7383)
* Checkpoint management for muti-writer scenario
* Remove old logic
* Add sinceVersion to config and minor changes
Co-authored-by: Sagar Sumit <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 27 ++++++--
.../apache/hudi/client/HoodieFlinkWriteClient.java | 7 +-
.../apache/hudi/client/HoodieJavaWriteClient.java | 7 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 6 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 10 +++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 32 +++++----
.../org/apache/hudi/HoodieStreamingSink.scala | 78 ++++++++++++++--------
.../hudi/functional/TestStructuredStreaming.scala | 74 +++++++++++++++++++-
8 files changed, 184 insertions(+), 57 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index cb7eb1147ca..1fcaff04559 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,10 +18,6 @@
package org.apache.hudi.client;
-import com.codahale.metrics.Timer;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -99,6 +95,11 @@ import
org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
+
+import com.codahale.metrics.Timer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -111,6 +112,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -209,16 +211,24 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
return commit(instantTime, writeStatuses, extraMetadata, actionType,
Collections.emptyMap());
}
+ public boolean commit(String instantTime, O writeStatuses,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds,
+ Option.empty());
+ }
+
public abstract boolean commit(String instantTime, O writeStatuses,
Option<Map<String, String>> extraMetadata,
- String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds);
+ String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc);
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType) {
- return commitStats(instantTime, stats, extraMetadata, commitActionType,
Collections.emptyMap());
+ return commitStats(instantTime, stats, extraMetadata, commitActionType,
Collections.emptyMap(), Option.empty());
}
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
- String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+ String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
// Skip the empty commit if not allowed
if (!config.allowEmptyCommit() && stats.isEmpty()) {
return true;
@@ -234,6 +244,9 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
lastCompletedTxnAndMetadata.isPresent() ?
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(inflightInstant, metadata);
+ if (extraPreCommitFunc.isPresent()) {
+ extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
+ }
commit(table, commitActionType, instantTime, metadata, stats);
// already within lock, and so no lock requried for archival
postCommit(table, metadata, instantTime, extraMetadata, false);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index cacf8ddb834..5a7150687ac 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -74,6 +74,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
@@ -115,7 +116,9 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
}
@Override
- public boolean commit(String instantTime, List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds) {
+ public boolean commit(String instantTime, List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
List<HoodieWriteStat> writeStats =
writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
// for eager flush, multiple write stat may share one file path.
List<HoodieWriteStat> merged = writeStats.stream()
@@ -123,7 +126,7 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
.values().stream()
.map(duplicates ->
duplicates.stream().reduce(WriteStatMerger::merge).get())
.collect(Collectors.toList());
- return commitStats(instantTime, merged, extraMetadata, commitActionType,
partitionToReplacedFileIds);
+ return commitStats(instantTime, merged, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc);
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index b6951bc6b78..1f205d0e823 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@@ -82,9 +83,11 @@ public class HoodieJavaWriteClient<T extends
HoodieRecordPayload> extends
List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata,
String commitActionType,
- Map<String, List<String>> partitionToReplacedFileIds) {
+ Map<String, List<String>> partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds);
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds,
+ extraPreCommitFunc);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 54a8607ee47..9734db6cd6a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -71,6 +71,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
@@ -120,10 +121,11 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
- String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds) {
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds);
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index c3825e3426c..8e5bb9aad04 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -434,6 +434,16 @@ object DataSourceWriteOptions {
+ " within a streaming microbatch. Turning this on, could hide the write
status errors while the spark checkpoint moves ahead." +
"So, would recommend users to use this with caution.")
+ val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.write.streaming.checkpoint.identifier")
+ .noDefaultValue()
+ .sinceVersion("0.13.0")
+ .withDocumentation("A stream identifier used for HUDI to fetch the right
checkpoint(`batch id` to be more specific) "
+ + "corresponding this writer. Please note that keep the identifier an
unique value for different writer "
+ + "if under multi-writer scenario. If the value is not set, will only
keep the checkpoint info in the memory. "
+ + "This could introduce the potential issue that the job is
restart(`batch id` is lost) while spark checkpoint write fails, "
+ + "causing spark will retry and rewrite the data.")
+
val META_SYNC_CLIENT_TOOL_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.meta.sync.client.tool.class")
.defaultValue(classOf[HiveSyncTool].getName)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f761090d492..93c55b7b676 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -62,6 +62,7 @@ import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}
+import java.util.function.BiConsumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
@@ -81,7 +82,8 @@ object HoodieSparkSqlWriter {
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty,
- asyncClusteringTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty)
+ asyncClusteringTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty,
+ extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata]] = Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String],
common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
@@ -183,7 +185,7 @@ object HoodieSparkSqlWriter {
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
- }
+ }
tableConfig = tableMetaClient.getTableConfig
val commitActionType = CommitUtils.getCommitActionType(operation,
tableConfig.getTableType)
@@ -203,14 +205,14 @@ object HoodieSparkSqlWriter {
val sourceSchema = convertStructTypeToAvroSchema(df.schema,
avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig,
tableMetaClient).orElse {
- // In case we need to reconcile the schema and schema evolution is
enabled,
- // we will force-apply schema evolution to the writer's schema
- if (shouldReconcileSchema &&
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
{
- val allowOperationMetaDataField =
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(),
"false").toBoolean
-
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
allowOperationMetaDataField)))
- } else {
- None
- }
+ // In case we need to reconcile the schema and schema evolution is
enabled,
+ // we will force-apply schema evolution to the writer's schema
+ if (shouldReconcileSchema &&
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
{
+ val allowOperationMetaDataField =
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(),
"false").toBoolean
+
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
allowOperationMetaDataField)))
+ } else {
+ None
+ }
}
// NOTE: Target writer's schema is deduced based on
@@ -381,7 +383,7 @@ object HoodieSparkSqlWriter {
val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
- TableInstantInfo(basePath, instantTime, commitActionType, operation))
+ TableInstantInfo(basePath, instantTime, commitActionType,
operation), extraPreCommitFn)
(writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, clusteringInstant, writeClient, tableConfig)
}
@@ -570,7 +572,7 @@ object HoodieSparkSqlWriter {
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient):
Option[InternalSchema] = {
if
(!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
- Option.empty[InternalSchema]
+ Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
@@ -887,7 +889,8 @@ object HoodieSparkSqlWriter {
client:
SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
- tableInstantInfo: TableInstantInfo
+ tableInstantInfo:
TableInstantInfo,
+ extraPreCommitFn:
Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]]
): (Boolean,
common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() ==
0) {
log.info("Proceeding to commit the write.")
@@ -897,7 +900,8 @@ object HoodieSparkSqlWriter {
client.commit(tableInstantInfo.instantTime,
writeResult.getWriteStatuses,
common.util.Option.of(new java.util.HashMap[String,
String](mapAsJavaMap(metaMap))),
tableInstantInfo.commitActionType,
- writeResult.getPartitionToReplaceFileIds)
+ writeResult.getPartitionToReplaceFileIds,
+ common.util.Option.ofNullable(extraPreCommitFn.orNull))
if (commitSuccess) {
log.info("Commit " + tableInstantInfo.instantTime + " successful!")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 23b79a5ed41..ae50e3c56c1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -16,20 +16,20 @@
*/
package org.apache.hudi
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService,
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.HoodieRecordPayload
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecordPayload}
import org.apache.hudi.common.table.marker.MarkerType
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils,
CompactionUtils, StringUtils}
+import org.apache.hudi.common.util.ValidationUtils.{checkArgument, checkState}
+import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils,
CompactionUtils, JsonUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieCorruptedDataException,
HoodieException, TableNotFoundException}
import org.apache.log4j.LogManager
@@ -39,7 +39,7 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import java.lang
-import java.util.function.Function
+import java.util.function.{BiConsumer, Function}
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
@@ -75,8 +75,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,
STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
private val ignoreFailedBatch =
options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,
STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
- // This constant serves as the checkpoint key for streaming sink so that
each microbatch is processed exactly-once.
- private val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
private var isAsyncCompactorServiceShutdownAbnormally = false
private var isAsyncClusteringServiceShutdownAbnormally = false
@@ -115,18 +113,38 @@ class HoodieStreamingSink(sqlContext: SQLContext,
var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.DIRECT.name())
// we need auto adjustment enabled for streaming sink since async table
services are feasible within the same JVM.
updatedOptions =
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
- // Add batchId as checkpoint to the extra metadata. To enable same
checkpoint metadata structure for multi-writers,
- // SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed
of applicationId and queryId), e.g.
- // "_hudi_streaming_sink_checkpoint" :
"{\"$batchId\":\"${sqlContext.sparkContext.applicationId}-$queryId\"}"
- // NOTE: In case of multi-writers, this map should be mutable and sorted
by key to facilitate merging of batchIds.
- // HUDI-4432 tracks the implementation of checkpoint management for
multi-writer.
- val checkpointMap = Map(batchId.toString ->
s"${sqlContext.sparkContext.applicationId}-$queryId")
- updatedOptions = updatedOptions.updated(SINK_CHECKPOINT_KEY,
HoodieSinkCheckpoint.toJson(checkpointMap))
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
- sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
+ sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
+ extraPreCommitFn = Some(new BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata] {
+
+ override def accept(metaClient: HoodieTableMetaClient,
+ newCommitMetadata: HoodieCommitMetadata): Unit
= {
+ options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+ case Some(identifier) =>
+ // Fetch the latestCommit with checkpoint Info again to
avoid concurrency issue in multi-write scenario.
+ val lastCheckpointCommitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
+ metaClient.getActiveTimeline.getCommitsTimeline,
SINK_CHECKPOINT_KEY)
+ var checkpointString = ""
+ if (lastCheckpointCommitMetadata.isPresent) {
+ val lastCheckpoint =
lastCheckpointCommitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
+ if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+ checkpointString =
HoodieSinkCheckpoint.toJson(HoodieSinkCheckpoint.fromJson(lastCheckpoint) +
(identifier -> batchId.toString))
+ } else {
+ checkpointString =
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
+ }
+ } else {
+ checkpointString =
HoodieSinkCheckpoint.toJson(Map(identifier -> batchId.toString))
+ }
+
+ newCommitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
checkpointString)
+ case None =>
+ // No op since keeping batch id in memory only.
+ }
+ }
+ }))
)
match {
case Success((true, commitOps, compactionInstantOps,
clusteringInstant, client, tableConfig)) =>
@@ -298,14 +316,19 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private def canSkipBatch(incomingBatchId: Long, operationType: String):
Boolean = {
if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
- // get the latest checkpoint from the commit metadata to check if the
microbatch has already been prcessed or not
- val commitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
- metaClient.get.getActiveTimeline.getCommitsTimeline,
SINK_CHECKPOINT_KEY)
- if (commitMetadata.isPresent) {
- val lastCheckpoint =
commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
- if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
- latestCommittedBatchId =
HoodieSinkCheckpoint.fromJson(lastCheckpoint).keys.head.toLong
- }
+ options.get(STREAMING_CHECKPOINT_IDENTIFIER.key()) match {
+ case Some(identifier) =>
+ // get the latest checkpoint from the commit metadata to check if
the microbatch has already been prcessed or not
+ val commitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
+ metaClient.get.getActiveTimeline.getCommitsTimeline,
SINK_CHECKPOINT_KEY)
+ if (commitMetadata.isPresent) {
+ val lastCheckpoint =
commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
+ if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+
HoodieSinkCheckpoint.fromJson(lastCheckpoint).get(identifier).foreach(commitBatchId
=>
+ latestCommittedBatchId = commitBatchId.toLong)
+ }
+ }
+ case None =>
}
latestCommittedBatchId >= incomingBatchId
} else {
@@ -322,13 +345,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
object HoodieSinkCheckpoint {
lazy val mapper: ObjectMapper = {
- val _mapper = new ObjectMapper
- _mapper.setSerializationInclusion(Include.NON_ABSENT)
- _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ val _mapper = JsonUtils.getObjectMapper
_mapper.registerModule(DefaultScalaModule)
_mapper
}
+ // This constant serves as the checkpoint key for streaming sink so that
each microbatch is processed exactly-once.
+ val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"
+
def toJson(checkpoint: Map[String, String]): String = {
mapper.writeValueAsString(checkpoint)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 1382bafb762..d4651f9ab78 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -18,22 +18,24 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
+import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestTable}
-import org.apache.hudi.common.util.CollectionUtils
+import org.apache.hudi.common.util.{CollectionUtils, CommitUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers, HoodieSinkCheckpoint}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
@@ -257,6 +259,72 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true,
isAsyncCompaction)
}
+ @Test
+ def testStructuredStreamingWithCheckpoint(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ val schema = inputDF1.schema
+
+ inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+ val query1 = spark.readStream
+ .schema(schema)
+ .json(sourcePath)
+ .writeStream
+ .format("org.apache.hudi")
+ .options(commonOpts)
+ .outputMode(OutputMode.Append)
+ .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
+ .option("checkpointLocation", s"${basePath}/checkpoint1")
+ .start(destPath)
+
+ query1.processAllAvailable()
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(fs.getConf).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build
+
+ assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "0")
+
+ // Add another identifier checkpoint info to the commit.
+ val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+
+ inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+
+ val query2 = spark.readStream
+ .schema(schema)
+ .json(sourcePath)
+ .writeStream
+ .format("org.apache.hudi")
+ .options(commonOpts)
+ .outputMode(OutputMode.Append)
+ .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier2")
+ .option("checkpointLocation", s"${basePath}/checkpoint2")
+ .start(destPath)
+ query2.processAllAvailable()
+ query1.processAllAvailable()
+
+ query1.stop()
+ query2.stop()
+
+ assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier2", "0")
+ assertLatestCheckpointInfoMatched(metaClient, "streaming_identifier1", "1")
+ }
+
+ def assertLatestCheckpointInfoMatched(metaClient: HoodieTableMetaClient,
+ identifier: String,
+ expectBatchId: String): Unit = {
+ metaClient.reloadActiveTimeline()
+ val lastCheckpointCommitMetadata =
CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
+ metaClient.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
+
+ assertTrue(lastCheckpointCommitMetadata.isPresent)
+ val checkpointMap =
HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))
+
+ assertEquals(checkpointMap.get(identifier).orNull, expectBatchId)
+ }
+
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, tableType: HoodieTableType,
isInlineClustering: Boolean,
isAsyncClustering: Boolean,
partitionOfRecords: String,
checkClusteringResult: String => Unit): Unit = {