This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf39619de8b [HUDI-8619] Add checkpoint translation for Spark streaming
source (#12392)
bf39619de8b is described below
commit bf39619de8ba2a75acb0b1eb4e0050e2b33f2444
Author: Lin Liu <[email protected]>
AuthorDate: Mon Dec 2 17:02:19 2024 -0800
[HUDI-8619] Add checkpoint translation for Spark streaming source (#12392)
Co-authored-by: Sagar Sumit <[email protected]>
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 41 ++++-
.../org/apache/hudi/HoodieStreamingSink.scala | 17 +-
.../sql/hudi/streaming/HoodieSourceOffset.scala | 6 +-
.../sql/hudi/streaming/HoodieStreamSourceV1.scala | 205 +++++++++++++++++++++
...reamSource.scala => HoodieStreamSourceV2.scala} | 66 +++----
.../hudi/functional/TestStreamingSource.scala | 50 ++++-
6 files changed, 324 insertions(+), 61 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 29a706bc593..034914f8318 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,24 +21,26 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig,
TypedProperties}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.model.WriteConcurrencyMode
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
import org.apache.hudi.common.table.log.InstantRange.RangeType
-import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.io.storage.HoodieSparkIOFactory
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.util.{PathUtils, SparkConfigUtils}
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
-import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit,
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit,
HoodieStreamSource}
+import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit,
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit,
HoodieStreamSourceV1, HoodieStreamSourceV2}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -232,7 +234,36 @@ class DefaultSource extends RelationProvider
HoodieSpecifiedOffsetRangeLimit(instantTime)
}
- new HoodieStreamSource(sqlContext, metadataPath, schema, parameters,
offsetRangeLimit)
+ val storageConf =
HadoopFSUtils.getStorageConf(sqlContext.sparkSession.sessionState.newHadoopConf())
+ val tablePath: StoragePath = {
+ val path = new StoragePath(parameters.getOrElse("path", "Missing 'path'
option"))
+ val fs = new HoodieHadoopStorage(path, storageConf)
+ TablePathUtils.getTablePath(fs, path).get()
+ }
+ val metaClient = HoodieTableMetaClient.builder()
+
.setConf(storageConf.newInstance()).setBasePath(tablePath.toString).build()
+ // Check if the incremental table read version is set. If set, use the
corresponding source
+ // which uses the version corresponding to IncrementalRelation to read the
data. And, also
+ // does the checkpoint management based on the version.
+ if (SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
+ val writeTableVersion =
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+ if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+ new HoodieStreamSourceV2(
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ } else {
+ new HoodieStreamSourceV1(
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ }
+ } else {
+ val writeTableVersion =
metaClient.getTableConfig.getTableVersion.versionCode()
+ if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+ new HoodieStreamSourceV2(
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ } else {
+ new HoodieStreamSourceV1(
+ sqlContext, metaClient, metadataPath, schema, parameters,
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+ }
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 602d9e2de76..e323d3a045b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -22,14 +22,13 @@ import
org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService,
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.{HoodieCommitMetadata,
WriteConcurrencyMode}
+import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.marker.MarkerType
import org.apache.hudi.common.table.timeline.HoodieInstant
-import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils,
CompactionUtils, ConfigUtils}
+import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils,
CompactionUtils}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.{HoodieCorruptedDataException,
HoodieException, TableNotFoundException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -45,7 +44,6 @@ import java.util.function.{BiConsumer, Function}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
-// TODO(yihua): handle V1/V2 checkpoint
class HoodieStreamingSink(sqlContext: SQLContext,
options: Map[String, String],
partitionColumns: Seq[String],
@@ -216,17 +214,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
}
- private def getStreamIdentifier(options: Map[String, String]) :
Option[String] = {
- if (ConfigUtils.resolveEnum(classOf[WriteConcurrencyMode],
options.getOrElse(WRITE_CONCURRENCY_MODE.key(),
- WRITE_CONCURRENCY_MODE.defaultValue())) ==
WriteConcurrencyMode.SINGLE_WRITER) {
- // for single writer model, we will fetch default if not set.
- Some(options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(),
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()))
- } else {
- // incase of multi-writer scenarios, there is not default.
- options.get(STREAMING_CHECKPOINT_IDENTIFIER.key())
- }
- }
-
override def toString: String = s"HoodieStreamingSink[${options("path")}]"
@annotation.tailrec
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
index 31d2020ce08..54e9294a74a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
@@ -24,7 +24,7 @@ import
com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
-case class HoodieSourceOffset(completionTime: String) extends Offset {
+case class HoodieSourceOffset(offsetCommitTime: String) extends Offset {
override val json: String = {
HoodieSourceOffset.toJson(this)
@@ -33,13 +33,13 @@ case class HoodieSourceOffset(completionTime: String)
extends Offset {
override def equals(obj: Any): Boolean = {
obj match {
case HoodieSourceOffset(otherCompletionTime) =>
- otherCompletionTime == completionTime
+ otherCompletionTime == offsetCommitTime
case _=> false
}
}
override def hashCode(): Int = {
- completionTime.hashCode
+ offsetCommitTime.hashCode
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
new file mode 100644
index 00000000000..a43b77551a8
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT
+import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils
+import org.apache.hudi.common.table.checkpoint.{CheckpointUtils,
StreamerCheckpointV1}
+import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling._
+import
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling,
handleHollowCommitIfNeeded}
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
IncrementalRelationV1, MergeOnReadIncrementalRelationV1, SparkAdapterSupport}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.streaming.{Offset, Source}
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSourceV1(sqlContext: SQLContext,
+ metaClient: HoodieTableMetaClient,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String],
+ offsetRangeLimit: HoodieOffsetRangeLimit,
+ writeTableVersion: HoodieTableVersion)
+ extends Source with Logging with Serializable with SparkAdapterSupport {
+
+ private lazy val tableType = metaClient.getTableType
+
+ private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
&&
+
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
+
+ /**
+ * When hollow commits are found while doing streaming read , unlike batch
incremental query,
+ * we do not use [[HollowCommitHandling.FAIL]] by default, instead we use
[[HollowCommitHandling.BLOCK]]
+ * to block processing data from going beyond the hollow commits to avoid
unintentional skip.
+ *
+ * Users can set
[[DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT]] to
+ * [[HollowCommitHandling.USE_TRANSITION_TIME]] to avoid the blocking
behavior.
+ */
+ private val hollowCommitHandling: HollowCommitHandling =
+ parameters.get(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key)
+ .map(HollowCommitHandling.valueOf)
+ .getOrElse(HollowCommitHandling.BLOCK)
+
+ @transient private lazy val initialOffsets = {
+ val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession,
metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offset = offsetRangeLimit match {
+ case HoodieEarliestOffsetRangeLimit =>
+ INIT_OFFSET
+ case HoodieLatestOffsetRangeLimit =>
+ getLatestOffset.getOrElse(INIT_OFFSET)
+ case HoodieSpecifiedOffsetRangeLimit(instantTime) =>
+ HoodieSourceOffset(instantTime)
+ }
+ metadataLog.add(0, offset)
+ logInfo(s"The initial offset is $offset")
+ offset
+ }
+ }
+
+ override def schema: StructType = {
+ if (isCDCQuery) {
+ CDCRelation.FULL_CDC_SPARK_SCHEMA
+ } else {
+ schemaOption.getOrElse {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
+ }
+ }
+ }
+
+ private def getLatestOffset: Option[HoodieSourceOffset] = {
+ metaClient.reloadActiveTimeline()
+ val filteredTimeline = handleHollowCommitIfNeeded(
+ metaClient.getActiveTimeline.filterCompletedInstants(), metaClient,
hollowCommitHandling)
+ filteredTimeline match {
+ case activeInstants if !activeInstants.empty() =>
+ val timestamp = if (hollowCommitHandling == USE_TRANSITION_TIME) {
+ activeInstants.getInstantsOrderedByCompletionTime
+ .skip(activeInstants.countInstants() - 1)
+ .findFirst()
+ .get()
+ .getCompletionTime
+ } else {
+ activeInstants.lastInstant().get().requestedTime()
+ }
+ Some(HoodieSourceOffset(timestamp))
+ case _ =>
+ None
+ }
+ }
+
+ /**
+ * Get the latest offset from the hoodie table.
+ * @return
+ */
+ override def getOffset: Option[Offset] = {
+ getLatestOffset
+ }
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ var startOffset = start.map(HoodieSourceOffset(_))
+ .getOrElse(initialOffsets)
+ var endOffset = HoodieSourceOffset(end)
+
+ // We update the offsets here since until this point the latest offsets
have been
+ // calculated no matter if it is in the expected version.
+ // We translate them here, then the rest logic should be intact.
+ startOffset =
HoodieSourceOffset(translateCheckpoint(startOffset.offsetCommitTime))
+ endOffset =
HoodieSourceOffset(translateCheckpoint(endOffset.offsetCommitTime))
+
+ if (startOffset == endOffset) {
+ sqlContext.internalCreateDataFrame(
+ sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"),
schema, isStreaming = true)
+ } else {
+ if (isCDCQuery) {
+ val cdcOptions = Map(
+ DataSourceReadOptions.START_COMMIT.key()->
startCommitTime(startOffset),
+ DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
+ )
+ val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient,
cdcOptions)
+ .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
+
+ sqlContext.sparkSession.internalCreateDataFrame(rdd,
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)
+ } else {
+ // Consume the data between (startCommitTime, endCommitTime]
+ val incParams = parameters ++ Map(
+ DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ DataSourceReadOptions.START_COMMIT.key ->
startCommitTime(startOffset),
+ DataSourceReadOptions.END_COMMIT.key -> endOffset.offsetCommitTime,
+ INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key ->
hollowCommitHandling.name
+ )
+
+ val rdd = tableType match {
+ case HoodieTableType.COPY_ON_WRITE =>
+ val serDe = sparkAdapter.createSparkRowSerDe(schema)
+ new IncrementalRelationV1(sqlContext, incParams, Some(schema),
metaClient)
+ .buildScan()
+ .map(serDe.serializeRow)
+ case HoodieTableType.MERGE_ON_READ =>
+ val requiredColumns = schema.fields.map(_.name)
+ new MergeOnReadIncrementalRelationV1(sqlContext, incParams,
metaClient, Some(schema))
+ .buildScan(requiredColumns, Array.empty[Filter])
+ .asInstanceOf[RDD[InternalRow]]
+ case _ => throw new IllegalArgumentException(s"UnSupport tableType:
$tableType")
+ }
+ sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+ }
+ }
+ }
+
+ private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+ startOffset match {
+ case INIT_OFFSET => startOffset.offsetCommitTime
+ case HoodieSourceOffset(commitTime) =>
+ commitTime
+ case _=> throw new IllegalStateException("UnKnow offset type.")
+ }
+ }
+
+ private def translateCheckpoint(commitTime: String): String = {
+ if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ CheckpointUtils.convertToCheckpointV2ForCommitTime(
+ new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
+ } else {
+ commitTime
+ }
+ }
+
+ override def stop(): Unit = {
+
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
similarity index 77%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index 9790bdae405..b1c8e6bf90d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -20,22 +20,18 @@ package org.apache.spark.sql.hudi.streaming
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
IncrementalRelationV2, MergeOnReadIncrementalRelationV2, SparkAdapterSupport}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.model.HoodieTableType
-import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.cdc.HoodieCDCUtils
+import org.apache.hudi.common.table.checkpoint.{CheckpointUtils,
StreamerCheckpointV2}
import org.apache.hudi.common.table.log.InstantRange.RangeType
-import org.apache.hudi.common.util.TablePathUtils
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
-
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset, Source}
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
/**
* The Struct Stream Source for Hudi to consume the data by streaming job.
@@ -45,26 +41,15 @@ import org.apache.spark.sql.types.StructType
* @param parameters
*/
// TODO(yihua): handle V1/V2 checkpoint
-class HoodieStreamSource(
- sqlContext: SQLContext,
- metadataPath: String,
- schemaOption: Option[StructType],
- parameters: Map[String, String],
- offsetRangeLimit: HoodieOffsetRangeLimit)
+class HoodieStreamSourceV2(sqlContext: SQLContext,
+ metaClient: HoodieTableMetaClient,
+ metadataPath: String,
+ schemaOption: Option[StructType],
+ parameters: Map[String, String],
+ offsetRangeLimit: HoodieOffsetRangeLimit,
+ writeTableVersion: HoodieTableVersion)
extends Source with Logging with Serializable with SparkAdapterSupport {
- @transient private val storageConf = HadoopFSUtils.getStorageConf(
- sqlContext.sparkSession.sessionState.newHadoopConf())
-
- private lazy val tablePath: StoragePath = {
- val path = new StoragePath(parameters.getOrElse("path", "Missing 'path'
option"))
- val fs = new HoodieHadoopStorage(path, storageConf)
- TablePathUtils.getTablePath(fs, path).get()
- }
-
- private lazy val metaClient = HoodieTableMetaClient.builder()
- .setConf(storageConf.newInstance()).setBasePath(tablePath.toString).build()
-
private lazy val tableType = metaClient.getTableType
private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
@@ -118,11 +103,15 @@ class HoodieStreamSource(
}
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- val startOffset = start.map(HoodieSourceOffset(_))
+ var startOffset = start.map(HoodieSourceOffset(_))
.getOrElse(initialOffsets)
- val endOffset = HoodieSourceOffset(end)
- // User set write version 6
- // startOffset is requested time
+ var endOffset = HoodieSourceOffset(end)
+
+ // We update the offsets here since until this point the latest offsets
have been
+ // calculated no matter if it is in the expected version.
+ // We translate them here, then the rest logic should be intact.
+ startOffset =
HoodieSourceOffset(translateCheckpoint(startOffset.offsetCommitTime))
+ endOffset =
HoodieSourceOffset(translateCheckpoint(endOffset.offsetCommitTime))
if (startOffset == endOffset) {
sqlContext.internalCreateDataFrame(
@@ -132,7 +121,7 @@ class HoodieStreamSource(
if (isCDCQuery) {
val cdcOptions = Map(
DataSourceReadOptions.START_COMMIT.key() -> startCompletionTime,
- DataSourceReadOptions.END_COMMIT.key() -> endOffset.completionTime
+ DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
)
val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient,
cdcOptions, rangeType)
.buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
@@ -143,7 +132,7 @@ class HoodieStreamSource(
val incParams = parameters ++ Map(
DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
DataSourceReadOptions.START_COMMIT.key -> startCompletionTime,
- DataSourceReadOptions.END_COMMIT.key -> endOffset.completionTime
+ DataSourceReadOptions.END_COMMIT.key -> endOffset.offsetCommitTime
)
val rdd = tableType match {
@@ -166,12 +155,23 @@ class HoodieStreamSource(
private def getStartCompletionTimeAndRangeType(startOffset:
HoodieSourceOffset): (String, RangeType) = {
startOffset match {
- case INIT_OFFSET => (startOffset.completionTime, RangeType.CLOSED_CLOSED)
- case HoodieSourceOffset(completionTime) => (completionTime,
RangeType.OPEN_CLOSED)
+ case INIT_OFFSET => (
+ startOffset.offsetCommitTime, RangeType.CLOSED_CLOSED)
+ case HoodieSourceOffset(completionTime) => (
+ completionTime, RangeType.OPEN_CLOSED)
case _=> throw new IllegalStateException("UnKnow offset type.")
}
}
+ private def translateCheckpoint(commitTime: String): String = {
+ if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ commitTime
+ } else {
+ CheckpointUtils.convertToCheckpointV1ForCommitTime(
+ new StreamerCheckpointV2(commitTime), metaClient).getCheckpointKey
+ }
+ }
+
override def stop(): Unit = {
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 6f5d5765a28..ccce19ea0b6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -17,20 +17,18 @@
package org.apache.hudi.functional
-import org.apache.hudi.DataSourceReadOptions.START_OFFSET
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceReadOptions.{INCREMENTAL_READ_TABLE_VERSION,
START_OFFSET}
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD,
RECORDKEY_FIELD}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE,
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.util.JavaConversions
-
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.{Row, SaveMode}
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
class TestStreamingSource extends StreamTest {
@@ -249,6 +247,48 @@ class TestStreamingSource extends StreamTest {
}
}
+ test("Test checkpoint translation") {
+ withTempDir { inputDir =>
+ val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream_ckpt"
+ val metaClient = HoodieTableMetaClient.newTableBuilder()
+ .setTableType(COPY_ON_WRITE)
+ .setTableName(getTableName(tablePath))
+ .setRecordKeyFields("id")
+ .setPreCombineField("ts")
+
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()),
tablePath)
+
+ addData(tablePath, Seq(("1", "a1", "10", "000")))
+ addData(tablePath, Seq(("2", "a1", "11", "001")))
+ addData(tablePath, Seq(("3", "a1", "12", "002")))
+
+ val instants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.getInstants
+ assertEquals(3, instants.size())
+
+ // If the request time is used, i.e., V1, then the second record is
included in the output.
+ // Otherwise, only third record in the output.
+ val startTimestamp = instants.get(1).requestedTime
+
+ for (incrementalReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
+ val df = spark.readStream
+ .format("org.apache.hudi")
+ .option(START_OFFSET.key, startTimestamp)
+ .option(INCREMENTAL_READ_TABLE_VERSION.key,
incrementalReadTableVersion.toString)
+ .load(tablePath)
+ .select("id", "name", "price", "ts")
+ val expectedRows = if (incrementalReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
+ Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+ } else {
+ Seq(Row("3", "a1", "12", "002"))
+ }
+ testStream(df)(
+ AssertOnQuery { q => q.processAllAvailable(); true },
+ // Start after the first commit
+ CheckAnswerRows(expectedRows, lastOnly = true, isSorted = false)
+ )
+ }
+ }
+ }
+
private def addData(inputPath: String, rows: Seq[(String, String, String,
String)], enableInlineCompaction: Boolean = false) : Unit = {
rows.toDF(columns: _*)
.write