This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d976671c7b [HUDI-5162] Allow user specified start offset for streaming 
query (#7138)
d976671c7b is described below

commit d976671c7bbf6f7603cd219054dcdcee8d0632ba
Author: RexAn <[email protected]>
AuthorDate: Sun Nov 20 09:39:34 2022 +0800

    [HUDI-5162] Allow user specified start offset for streaming query (#7138)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   6 ++
 .../main/scala/org/apache/hudi/DefaultSource.scala |  13 ++-
 .../sql/hudi/streaming/HoodieMetadataLog.scala     |  79 ++++++++++++++++
 .../hudi/streaming/HoodieOffsetRangeLimit.scala    |  41 +++++++++
 .../sql/hudi/streaming/HoodieStreamSource.scala    | 100 ++++++---------------
 .../hudi/functional/TestStreamingSource.scala      |  60 +++++++++++++
 6 files changed, 224 insertions(+), 75 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 64303e1a07..fab1c845f3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -96,6 +96,12 @@ object DataSourceReadOptions {
     .withDocumentation("Enables use of the spark file index implementation for 
Hudi, "
       + "that speeds up listing of large tables.")
 
+  val START_OFFSET: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.streaming.startOffset")
+    .defaultValue("earliest")
+    .withDocumentation("Start offset to pull data from hoodie streaming 
source. allow earliest, latest, and " +
+      "specified start instant time")
+
   val BEGIN_INSTANTTIME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.begin.instanttime")
     .noDefaultValue()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index cd3fe6832d..3e53125805 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -34,7 +34,7 @@ import org.apache.log4j.LogManager
 
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
-import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
+import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, 
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, 
HoodieStreamSource}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -185,7 +185,16 @@ class DefaultSource extends RelationProvider
                             schema: Option[StructType],
                             providerName: String,
                             parameters: Map[String, String]): Source = {
-    new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
+    val offsetRangeLimit = parameters.getOrElse(START_OFFSET.key(), 
START_OFFSET.defaultValue()) match {
+      case offset if offset.equalsIgnoreCase("earliest") =>
+        HoodieEarliestOffsetRangeLimit
+      case offset if offset.equalsIgnoreCase("latest") =>
+        HoodieLatestOffsetRangeLimit
+      case instantTime =>
+        HoodieSpecifiedOffsetRangeLimit(instantTime)
+    }
+
+    new HoodieStreamSource(sqlContext, metadataPath, schema, parameters, 
offsetRangeLimit)
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieMetadataLog.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieMetadataLog.scala
new file mode 100644
index 0000000000..3675e6c05e
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieMetadataLog.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+
+import org.apache.hudi.common.util.FileIOUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.HDFSMetadataLog
+
+/**
+ * Hoodie type metadata log that uses the specified path as the metadata 
storage.
+ */
+class HoodieMetadataLog(
+    sparkSession: SparkSession,
+    metadataPath: String) extends 
HDFSMetadataLog[HoodieSourceOffset](sparkSession, metadataPath) {
+
+  private val VERSION = 1
+
+  override def serialize(metadata: HoodieSourceOffset, out: OutputStream): 
Unit = {
+    val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+    writer.write("v" + VERSION + "\n")
+    writer.write(metadata.json)
+    writer.flush()
+  }
+
+  /**
+   * Deserialize the init offset from the metadata file.
+   * The format in the metadata file is like this:
+   * ----------------------------------------------
+   * v1         -- The version info in the first line
+   * offsetJson -- The json string of HoodieSourceOffset in the rest of the 
file
+   * -----------------------------------------------
+   * @param in
+   * @return
+   */
+  override def deserialize(in: InputStream): HoodieSourceOffset = {
+    val content = FileIOUtils.readAsUTFString(in)
+    // Get version from the first line
+    val firstLineEnd = content.indexOf("\n")
+    if (firstLineEnd > 0) {
+      val version = getVersion(content.substring(0, firstLineEnd))
+      if (version > VERSION) {
+        throw new IllegalStateException(s"UnSupportVersion: max support 
version is: $VERSION" +
+          s" current version is: $version")
+      }
+      // Get offset from the rest line in the file
+      HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+    } else {
+      throw new IllegalStateException(s"Bad metadata format, failed to find 
the version line.")
+    }
+  }
+
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieOffsetRangeLimit.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieOffsetRangeLimit.scala
new file mode 100644
index 0000000000..493599eced
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieOffsetRangeLimit.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+/**
+ * Objects that describe the offset of the HoodieStreamSource start
+ * Available types are earliest, latest, specified commit range.
+ */
+sealed trait HoodieOffsetRangeLimit
+
+/**
+ * Represent starting from the earliest commit in the hoodie timeline
+ */
+object HoodieEarliestOffsetRangeLimit extends HoodieOffsetRangeLimit
+
+/**
+ * Represent starting from the latest commit in the hoodie timeline
+ */
+object HoodieLatestOffsetRangeLimit extends HoodieOffsetRangeLimit
+
+/**
+ * Represent starting from the specified commit in the hoodie timeline
+ */
+case class HoodieSpecifiedOffsetRangeLimit(instantTime: String) extends 
HoodieOffsetRangeLimit
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index 49cbbdd799..0e39347ae9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hudi.streaming
 
-import java.io.{BufferedWriter, InputStream, OutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
 import java.util.Date
 
 import org.apache.hadoop.fs.Path
@@ -29,14 +27,12 @@ import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}
-import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.hudi.common.util.TablePathUtils
 import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, 
Source}
+import org.apache.spark.sql.execution.streaming.{Offset, Source}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -52,7 +48,8 @@ class HoodieStreamSource(
     sqlContext: SQLContext,
     metadataPath: String,
     schemaOption: Option[StructType],
-    parameters: Map[String, String])
+    parameters: Map[String, String],
+    offsetRangeLimit: HoodieOffsetRangeLimit)
   extends Source with Logging with Serializable with SparkAdapterSupport {
 
   @transient private val hadoopConf = 
sqlContext.sparkSession.sessionState.newHadoopConf()
@@ -72,57 +69,20 @@ class HoodieStreamSource(
     
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
 &&
     
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
 
-  @transient private var lastOffset: HoodieSourceOffset = _
-
   @transient private lazy val initialOffsets = {
-    val metadataLog =
-      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, 
metadataPath) {
-        override def serialize(metadata: HoodieSourceOffset, out: 
OutputStream): Unit = {
-          val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
-          writer.write("v" + VERSION + "\n")
-          writer.write(metadata.json)
-          writer.flush()
-        }
-
-        /**
-          * Deserialize the init offset from the metadata file.
-          * The format in the metadata file is like this:
-          * ----------------------------------------------
-          * v1         -- The version info in the first line
-          * offsetJson -- The json string of HoodieSourceOffset in the rest of 
the file
-          * -----------------------------------------------
-          * @param in
-          * @return
-          */
-        override def deserialize(in: InputStream): HoodieSourceOffset = {
-          val content = FileIOUtils.readAsUTFString(in)
-          // Get version from the first line
-          val firstLineEnd = content.indexOf("\n")
-          if (firstLineEnd > 0) {
-            val version = getVersion(content.substring(0, firstLineEnd))
-            if (version > VERSION) {
-              throw new IllegalStateException(s"UnSupportVersion: max support 
version is: $VERSION" +
-                s" current version is: $version")
-            }
-            // Get offset from the rest line in the file
-            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
-          } else {
-            throw new IllegalStateException(s"Bad metadata format, failed to 
find the version line.")
-          }
-        }
-      }
+    val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession, 
metadataPath)
     metadataLog.get(0).getOrElse {
-      metadataLog.add(0, INIT_OFFSET)
-      INIT_OFFSET
-    }
-  }
-
-  private def getVersion(versionLine: String): Int = {
-    if (versionLine.startsWith("v")) {
-      versionLine.substring(1).toInt
-    } else {
-      throw new IllegalStateException(s"Illegal version line: $versionLine " +
-        s"in the streaming metadata path")
+      val offset = offsetRangeLimit match {
+        case HoodieEarliestOffsetRangeLimit =>
+          INIT_OFFSET
+        case HoodieLatestOffsetRangeLimit =>
+          getLatestOffset.getOrElse(INIT_OFFSET)
+        case HoodieSpecifiedOffsetRangeLimit(instantTime) =>
+          HoodieSourceOffset(instantTime)
+      }
+      metadataLog.add(0, offset)
+      logInfo(s"The initial offset is $offset")
+      offset
     }
   }
 
@@ -137,27 +97,25 @@ class HoodieStreamSource(
     }
   }
 
+  private def getLatestOffset: Option[HoodieSourceOffset] = {
+    metaClient.reloadActiveTimeline()
+    metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants() 
match {
+      case activeInstants if !activeInstants.empty() =>
+        
Some(HoodieSourceOffset(activeInstants.lastInstant().get().getTimestamp))
+      case _ =>
+        None
+    }
+  }
+
   /**
     * Get the latest offset from the hoodie table.
     * @return
     */
   override def getOffset: Option[Offset] = {
-    metaClient.reloadActiveTimeline()
-    val activeInstants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
-    if (!activeInstants.empty()) {
-      val currentLatestCommitTime = 
activeInstants.lastInstant().get().getTimestamp
-      if (lastOffset == null || currentLatestCommitTime > 
lastOffset.commitTime) {
-        lastOffset = HoodieSourceOffset(currentLatestCommitTime)
-      }
-    } else { // if there are no active commits, use the init offset
-      lastOffset = initialOffsets
-    }
-    Some(lastOffset)
+    getLatestOffset
   }
 
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-    initialOffsets
-
     val startOffset = start.map(HoodieSourceOffset(_))
       .getOrElse(initialOffsets)
     val endOffset = HoodieSourceOffset(end)
@@ -217,7 +175,3 @@ class HoodieStreamSource(
 
   }
 }
-
-object HoodieStreamSource {
-  val VERSION = 1
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index b628cc157b..84cc741b1d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.functional
 
+import org.apache.hudi.DataSourceReadOptions.START_OFFSET
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
@@ -138,6 +139,65 @@ class TestStreamingSource extends StreamTest {
     }
   }
 
+  test("Test cow from latest offset") {
+    withTempDir { inputDir =>
+      val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
+      HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(COPY_ON_WRITE)
+        .setTableName(getTableName(tablePath))
+        
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .initTable(spark.sessionState.newHadoopConf(), tablePath)
+
+      addData(tablePath, Seq(("1", "a1", "10", "000")))
+      val df = spark.readStream
+        .format("org.apache.hudi")
+        .option(START_OFFSET.key(), "latest")
+        .load(tablePath)
+        .select("id", "name", "price", "ts")
+
+      testStream(df)(
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        // Start from the latest, should contains no data
+        CheckAnswerRows(Seq(), lastOnly = true, isSorted = false),
+        StopStream,
+
+        addDataToQuery(tablePath, Seq(("2", "a1", "12", "000"))),
+        StartStream(),
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(Seq(Row("2", "a1", "12", "000")), lastOnly = false, 
isSorted = false)
+      )
+    }
+  }
+
+  test("Test cow from specified offset") {
+    withTempDir { inputDir =>
+      val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
+      val metaClient = HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(COPY_ON_WRITE)
+        .setTableName(getTableName(tablePath))
+        
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .initTable(spark.sessionState.newHadoopConf(), tablePath)
+
+      addData(tablePath, Seq(("1", "a1", "10", "000")))
+      addData(tablePath, Seq(("2", "a1", "11", "001")))
+      addData(tablePath, Seq(("3", "a1", "12", "002")))
+
+      val timestamp = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+        .firstInstant().get().getTimestamp
+      val df = spark.readStream
+        .format("org.apache.hudi")
+        .option(START_OFFSET.key(), timestamp)
+        .load(tablePath)
+        .select("id", "name", "price", "ts")
+
+      testStream(df)(
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        // Start after the first commit
+        CheckAnswerRows(Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", 
"002")), lastOnly = true, isSorted = false)
+      )
+    }
+  }
+
   private def addData(inputPath: String, rows: Seq[(String, String, String, 
String)]): Unit = {
     rows.toDF(columns: _*)
       .write

Reply via email to