This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf39619de8b [HUDI-8619] Add checkpoint translation for Spark streaming 
source (#12392)
bf39619de8b is described below

commit bf39619de8ba2a75acb0b1eb4e0050e2b33f2444
Author: Lin Liu <[email protected]>
AuthorDate: Mon Dec 2 17:02:19 2024 -0800

    [HUDI-8619] Add checkpoint translation for Spark streaming source (#12392)
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |  41 ++++-
 .../org/apache/hudi/HoodieStreamingSink.scala      |  17 +-
 .../sql/hudi/streaming/HoodieSourceOffset.scala    |   6 +-
 .../sql/hudi/streaming/HoodieStreamSourceV1.scala  | 205 +++++++++++++++++++++
 ...reamSource.scala => HoodieStreamSourceV2.scala} |  66 +++----
 .../hudi/functional/TestStreamingSource.scala      |  50 ++++-
 6 files changed, 324 insertions(+), 61 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 29a706bc593..034914f8318 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,24 +21,26 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig, 
TypedProperties}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model.WriteConcurrencyMode
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.hudi.common.table.log.InstantRange.RangeType
-import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.io.storage.HoodieSparkIOFactory
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.util.{PathUtils, SparkConfigUtils}
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
-import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, 
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, 
HoodieStreamSource}
+import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, 
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, 
HoodieStreamSourceV1, HoodieStreamSourceV2}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -232,7 +234,36 @@ class DefaultSource extends RelationProvider
         HoodieSpecifiedOffsetRangeLimit(instantTime)
     }
 
-    new HoodieStreamSource(sqlContext, metadataPath, schema, parameters, 
offsetRangeLimit)
+    val storageConf = 
HadoopFSUtils.getStorageConf(sqlContext.sparkSession.sessionState.newHadoopConf())
+    val tablePath: StoragePath = {
+      val path = new StoragePath(parameters.getOrElse("path", "Missing 'path' 
option"))
+      val fs = new HoodieHadoopStorage(path, storageConf)
+      TablePathUtils.getTablePath(fs, path).get()
+    }
+    val metaClient = HoodieTableMetaClient.builder()
+      
.setConf(storageConf.newInstance()).setBasePath(tablePath.toString).build()
+    // Check if the incremental table read version is set. If set, use the 
corresponding source
+    // which uses the version corresponding to IncrementalRelation to read the 
data. And, also
+    // does the checkpoint management based on the version.
+    if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
+      val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+      if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+        new HoodieStreamSourceV2(
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+      } else {
+        new HoodieStreamSourceV1(
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+      }
+    } else {
+      val writeTableVersion = 
metaClient.getTableConfig.getTableVersion.versionCode()
+      if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
+        new HoodieStreamSourceV2(
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+      } else {
+        new HoodieStreamSourceV1(
+          sqlContext, metaClient, metadataPath, schema, parameters, 
offsetRangeLimit, HoodieTableVersion.fromVersionCode(writeTableVersion))
+      }
+    }
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 602d9e2de76..e323d3a045b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -22,14 +22,13 @@ import 
org.apache.hudi.HoodieStreamingSink.SINK_CHECKPOINT_KEY
 import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, 
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.{HoodieCommitMetadata, 
WriteConcurrencyMode}
+import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.marker.MarkerType
 import org.apache.hudi.common.table.timeline.HoodieInstant
-import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, 
CompactionUtils, ConfigUtils}
+import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, 
CompactionUtils}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.{HoodieCorruptedDataException, 
HoodieException, TableNotFoundException}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 
@@ -45,7 +44,6 @@ import java.util.function.{BiConsumer, Function}
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
-// TODO(yihua): handle V1/V2 checkpoint
 class HoodieStreamingSink(sqlContext: SQLContext,
                           options: Map[String, String],
                           partitionColumns: Seq[String],
@@ -216,17 +214,6 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     }
   }
 
-  private def getStreamIdentifier(options: Map[String, String]) : 
Option[String] = {
-    if (ConfigUtils.resolveEnum(classOf[WriteConcurrencyMode], 
options.getOrElse(WRITE_CONCURRENCY_MODE.key(),
-      WRITE_CONCURRENCY_MODE.defaultValue())) == 
WriteConcurrencyMode.SINGLE_WRITER) {
-      // for single writer model, we will fetch default if not set.
-      Some(options.getOrElse(STREAMING_CHECKPOINT_IDENTIFIER.key(), 
STREAMING_CHECKPOINT_IDENTIFIER.defaultValue()))
-    } else {
-      // incase of multi-writer scenarios, there is not default.
-      options.get(STREAMING_CHECKPOINT_IDENTIFIER.key())
-    }
-  }
-
   override def toString: String = s"HoodieStreamingSink[${options("path")}]"
 
   @annotation.tailrec
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
index 31d2020ce08..54e9294a74a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
@@ -24,7 +24,7 @@ import 
com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
 
-case class HoodieSourceOffset(completionTime: String) extends Offset {
+case class HoodieSourceOffset(offsetCommitTime: String) extends Offset {
 
   override val json: String = {
     HoodieSourceOffset.toJson(this)
@@ -33,13 +33,13 @@ case class HoodieSourceOffset(completionTime: String) 
extends Offset {
   override def equals(obj: Any): Boolean = {
     obj match {
       case HoodieSourceOffset(otherCompletionTime) =>
-        otherCompletionTime == completionTime
+        otherCompletionTime == offsetCommitTime
       case _=> false
     }
   }
 
   override def hashCode(): Int = {
-    completionTime.hashCode
+    offsetCommitTime.hashCode
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
new file mode 100644
index 00000000000..a43b77551a8
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT
+import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils
+import org.apache.hudi.common.table.checkpoint.{CheckpointUtils, 
StreamerCheckpointV1}
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling._
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, 
handleHollowCommitIfNeeded}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
IncrementalRelationV1, MergeOnReadIncrementalRelationV1, SparkAdapterSupport}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.streaming.{Offset, Source}
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * The Struct Stream Source for Hudi to consume the data by streaming job.
+ * @param sqlContext
+ * @param metadataPath
+ * @param schemaOption
+ * @param parameters
+ */
+class HoodieStreamSourceV1(sqlContext: SQLContext,
+                           metaClient: HoodieTableMetaClient,
+                           metadataPath: String,
+                           schemaOption: Option[StructType],
+                           parameters: Map[String, String],
+                           offsetRangeLimit: HoodieOffsetRangeLimit,
+                           writeTableVersion: HoodieTableVersion)
+  extends Source with Logging with Serializable with SparkAdapterSupport {
+
+  private lazy val tableType = metaClient.getTableType
+
+  private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+    
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
 &&
+    
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
+
+  /**
+   * When hollow commits are found while doing streaming read , unlike batch 
incremental query,
+   * we do not use [[HollowCommitHandling.FAIL]] by default, instead we use 
[[HollowCommitHandling.BLOCK]]
+   * to block processing data from going beyond the hollow commits to avoid 
unintentional skip.
+   *
+   * Users can set 
[[DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT]] to
+   * [[HollowCommitHandling.USE_TRANSITION_TIME]] to avoid the blocking 
behavior.
+   */
+  private val hollowCommitHandling: HollowCommitHandling =
+    parameters.get(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key)
+      .map(HollowCommitHandling.valueOf)
+      .getOrElse(HollowCommitHandling.BLOCK)
+
+  @transient private lazy val initialOffsets = {
+    val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession, 
metadataPath)
+    metadataLog.get(0).getOrElse {
+      val offset = offsetRangeLimit match {
+        case HoodieEarliestOffsetRangeLimit =>
+          INIT_OFFSET
+        case HoodieLatestOffsetRangeLimit =>
+          getLatestOffset.getOrElse(INIT_OFFSET)
+        case HoodieSpecifiedOffsetRangeLimit(instantTime) =>
+          HoodieSourceOffset(instantTime)
+      }
+      metadataLog.add(0, offset)
+      logInfo(s"The initial offset is $offset")
+      offset
+    }
+  }
+
+  override def schema: StructType = {
+    if (isCDCQuery) {
+      CDCRelation.FULL_CDC_SPARK_SCHEMA
+    } else {
+      schemaOption.getOrElse {
+        val schemaUtil = new TableSchemaResolver(metaClient)
+        
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
+      }
+    }
+  }
+
+  private def getLatestOffset: Option[HoodieSourceOffset] = {
+    metaClient.reloadActiveTimeline()
+    val filteredTimeline = handleHollowCommitIfNeeded(
+      metaClient.getActiveTimeline.filterCompletedInstants(), metaClient, 
hollowCommitHandling)
+    filteredTimeline match {
+      case activeInstants if !activeInstants.empty() =>
+        val timestamp = if (hollowCommitHandling == USE_TRANSITION_TIME) {
+          activeInstants.getInstantsOrderedByCompletionTime
+            .skip(activeInstants.countInstants() - 1)
+            .findFirst()
+            .get()
+            .getCompletionTime
+        } else {
+          activeInstants.lastInstant().get().requestedTime()
+        }
+        Some(HoodieSourceOffset(timestamp))
+      case _ =>
+        None
+    }
+  }
+
+  /**
+   * Get the latest offset from the hoodie table.
+   * @return
+   */
+  override def getOffset: Option[Offset] = {
+    getLatestOffset
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    var startOffset = start.map(HoodieSourceOffset(_))
+      .getOrElse(initialOffsets)
+    var endOffset = HoodieSourceOffset(end)
+
+    // We update the offsets here since until this point the latest offsets 
have been
+    // calculated no matter if it is in the expected version.
+    // We translate them here, then the rest logic should be intact.
+    startOffset = 
HoodieSourceOffset(translateCheckpoint(startOffset.offsetCommitTime))
+    endOffset = 
HoodieSourceOffset(translateCheckpoint(endOffset.offsetCommitTime))
+
+    if (startOffset == endOffset) {
+      sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), 
schema, isStreaming = true)
+    } else {
+      if (isCDCQuery) {
+        val cdcOptions = Map(
+          DataSourceReadOptions.START_COMMIT.key()-> 
startCommitTime(startOffset),
+          DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
+        )
+        val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient, 
cdcOptions)
+          .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
+
+        sqlContext.sparkSession.internalCreateDataFrame(rdd, 
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)
+      } else {
+        // Consume the data between (startCommitTime, endCommitTime]
+        val incParams = parameters ++ Map(
+          DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+          DataSourceReadOptions.START_COMMIT.key -> 
startCommitTime(startOffset),
+          DataSourceReadOptions.END_COMMIT.key -> endOffset.offsetCommitTime,
+          INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT.key -> 
hollowCommitHandling.name
+        )
+
+        val rdd = tableType match {
+          case HoodieTableType.COPY_ON_WRITE =>
+            val serDe = sparkAdapter.createSparkRowSerDe(schema)
+            new IncrementalRelationV1(sqlContext, incParams, Some(schema), 
metaClient)
+              .buildScan()
+              .map(serDe.serializeRow)
+          case HoodieTableType.MERGE_ON_READ =>
+            val requiredColumns = schema.fields.map(_.name)
+            new MergeOnReadIncrementalRelationV1(sqlContext, incParams, 
metaClient, Some(schema))
+              .buildScan(requiredColumns, Array.empty[Filter])
+              .asInstanceOf[RDD[InternalRow]]
+          case _ => throw new IllegalArgumentException(s"UnSupport tableType: 
$tableType")
+        }
+        sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+      }
+    }
+  }
+
+  private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+    startOffset match {
+      case INIT_OFFSET => startOffset.offsetCommitTime
+      case HoodieSourceOffset(commitTime) =>
+        commitTime
+      case _=> throw new IllegalStateException("UnKnow offset type.")
+    }
+  }
+
+  private def translateCheckpoint(commitTime: String): String = {
+    if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      CheckpointUtils.convertToCheckpointV2ForCommitTime(
+        new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
+    } else {
+      commitTime
+    }
+  }
+
+  override def stop(): Unit = {
+
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
similarity index 77%
rename from 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index 9790bdae405..b1c8e6bf90d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -20,22 +20,18 @@ package org.apache.spark.sql.hudi.streaming
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
IncrementalRelationV2, MergeOnReadIncrementalRelationV2, SparkAdapterSupport}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.model.HoodieTableType
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils
+import org.apache.hudi.common.table.checkpoint.{CheckpointUtils, 
StreamerCheckpointV2}
 import org.apache.hudi.common.table.log.InstantRange.RangeType
-import org.apache.hudi.common.util.TablePathUtils
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
-
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming.{Offset, Source}
 import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
 
 /**
   * The Struct Stream Source for Hudi to consume the data by streaming job.
@@ -45,26 +41,15 @@ import org.apache.spark.sql.types.StructType
   * @param parameters
   */
 // TODO(yihua): handle V1/V2 checkpoint
-class HoodieStreamSource(
-    sqlContext: SQLContext,
-    metadataPath: String,
-    schemaOption: Option[StructType],
-    parameters: Map[String, String],
-    offsetRangeLimit: HoodieOffsetRangeLimit)
+class HoodieStreamSourceV2(sqlContext: SQLContext,
+                           metaClient: HoodieTableMetaClient,
+                           metadataPath: String,
+                           schemaOption: Option[StructType],
+                           parameters: Map[String, String],
+                           offsetRangeLimit: HoodieOffsetRangeLimit,
+                           writeTableVersion: HoodieTableVersion)
   extends Source with Logging with Serializable with SparkAdapterSupport {
 
-  @transient private val storageConf = HadoopFSUtils.getStorageConf(
-    sqlContext.sparkSession.sessionState.newHadoopConf())
-
-  private lazy val tablePath: StoragePath = {
-    val path = new StoragePath(parameters.getOrElse("path", "Missing 'path' 
option"))
-    val fs = new HoodieHadoopStorage(path, storageConf)
-    TablePathUtils.getTablePath(fs, path).get()
-  }
-
-  private lazy val metaClient = HoodieTableMetaClient.builder()
-    .setConf(storageConf.newInstance()).setBasePath(tablePath.toString).build()
-
   private lazy val tableType = metaClient.getTableType
 
   private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
@@ -118,11 +103,15 @@ class HoodieStreamSource(
   }
 
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-    val startOffset = start.map(HoodieSourceOffset(_))
+    var startOffset = start.map(HoodieSourceOffset(_))
       .getOrElse(initialOffsets)
-    val endOffset = HoodieSourceOffset(end)
-    // User set write version 6
-    // startOffset is requested time
+    var endOffset = HoodieSourceOffset(end)
+
+    // We update the offsets here since until this point the latest offsets 
have been
+    // calculated no matter if it is in the expected version.
+    // We translate them here, then the rest logic should be intact.
+    startOffset = 
HoodieSourceOffset(translateCheckpoint(startOffset.offsetCommitTime))
+    endOffset = 
HoodieSourceOffset(translateCheckpoint(endOffset.offsetCommitTime))
 
     if (startOffset == endOffset) {
       sqlContext.internalCreateDataFrame(
@@ -132,7 +121,7 @@ class HoodieStreamSource(
       if (isCDCQuery) {
         val cdcOptions = Map(
           DataSourceReadOptions.START_COMMIT.key() -> startCompletionTime,
-          DataSourceReadOptions.END_COMMIT.key() -> endOffset.completionTime
+          DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
         )
         val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient, 
cdcOptions, rangeType)
           .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
@@ -143,7 +132,7 @@ class HoodieStreamSource(
         val incParams = parameters ++ Map(
           DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
           DataSourceReadOptions.START_COMMIT.key -> startCompletionTime,
-          DataSourceReadOptions.END_COMMIT.key -> endOffset.completionTime
+          DataSourceReadOptions.END_COMMIT.key -> endOffset.offsetCommitTime
         )
 
         val rdd = tableType match {
@@ -166,12 +155,23 @@ class HoodieStreamSource(
 
   private def getStartCompletionTimeAndRangeType(startOffset: 
HoodieSourceOffset): (String, RangeType) = {
     startOffset match {
-      case INIT_OFFSET => (startOffset.completionTime, RangeType.CLOSED_CLOSED)
-      case HoodieSourceOffset(completionTime) => (completionTime, 
RangeType.OPEN_CLOSED)
+      case INIT_OFFSET => (
+        startOffset.offsetCommitTime, RangeType.CLOSED_CLOSED)
+      case HoodieSourceOffset(completionTime) => (
+        completionTime, RangeType.OPEN_CLOSED)
       case _=> throw new IllegalStateException("UnKnow offset type.")
     }
   }
 
+  private def translateCheckpoint(commitTime: String): String = {
+    if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      commitTime
+    } else {
+      CheckpointUtils.convertToCheckpointV1ForCommitTime(
+        new StreamerCheckpointV2(commitTime), metaClient).getCheckpointKey
+    }
+  }
+
   override def stop(): Unit = {
 
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 6f5d5765a28..ccce19ea0b6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -17,20 +17,18 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.DataSourceReadOptions.START_OFFSET
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceReadOptions.{INCREMENTAL_READ_TABLE_VERSION, 
START_OFFSET}
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.config.HoodieCompactionConfig
 import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, 
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.util.JavaConversions
-
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.{Row, SaveMode}
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 
 class TestStreamingSource extends StreamTest {
 
@@ -249,6 +247,48 @@ class TestStreamingSource extends StreamTest {
     }
   }
 
+  test("Test checkpoint translation") {
+    withTempDir { inputDir =>
+      val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream_ckpt"
+      val metaClient = HoodieTableMetaClient.newTableBuilder()
+        .setTableType(COPY_ON_WRITE)
+        .setTableName(getTableName(tablePath))
+        .setRecordKeyFields("id")
+        .setPreCombineField("ts")
+        
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), 
tablePath)
+
+      addData(tablePath, Seq(("1", "a1", "10", "000")))
+      addData(tablePath, Seq(("2", "a1", "11", "001")))
+      addData(tablePath, Seq(("3", "a1", "12", "002")))
+
+      val instants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.getInstants
+      assertEquals(3, instants.size())
+
+      // If the request time is used, i.e., V1, then the second record is 
included in the output.
+      // Otherwise, only third record in the output.
+      val startTimestamp = instants.get(1).requestedTime
+
+      for (incrementalReadTableVersion <- 
List(HoodieTableVersion.SIX.versionCode(), 
HoodieTableVersion.EIGHT.versionCode())) {
+        val df = spark.readStream
+          .format("org.apache.hudi")
+          .option(START_OFFSET.key, startTimestamp)
+          .option(INCREMENTAL_READ_TABLE_VERSION.key, 
incrementalReadTableVersion.toString)
+          .load(tablePath)
+          .select("id", "name", "price", "ts")
+        val expectedRows = if (incrementalReadTableVersion == 
HoodieTableVersion.EIGHT.versionCode()) {
+          Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+        } else {
+          Seq(Row("3", "a1", "12", "002"))
+        }
+        testStream(df)(
+          AssertOnQuery { q => q.processAllAvailable(); true },
+          // Start after the first commit
+          CheckAnswerRows(expectedRows, lastOnly = true, isSorted = false)
+        )
+      }
+    }
+  }
+
   private def addData(inputPath: String, rows: Seq[(String, String, String, 
String)], enableInlineCompaction: Boolean = false) : Unit = {
     rows.toDF(columns: _*)
       .write

Reply via email to