This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d976671c7b [HUDI-5162] Allow user specified start offset for streaming
query (#7138)
d976671c7b is described below
commit d976671c7bbf6f7603cd219054dcdcee8d0632ba
Author: RexAn <[email protected]>
AuthorDate: Sun Nov 20 09:39:34 2022 +0800
[HUDI-5162] Allow user specified start offset for streaming query (#7138)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 6 ++
.../main/scala/org/apache/hudi/DefaultSource.scala | 13 ++-
.../sql/hudi/streaming/HoodieMetadataLog.scala | 79 ++++++++++++++++
.../hudi/streaming/HoodieOffsetRangeLimit.scala | 41 +++++++++
.../sql/hudi/streaming/HoodieStreamSource.scala | 100 ++++++---------------
.../hudi/functional/TestStreamingSource.scala | 60 +++++++++++++
6 files changed, 224 insertions(+), 75 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 64303e1a07..fab1c845f3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -96,6 +96,12 @@ object DataSourceReadOptions {
.withDocumentation("Enables use of the spark file index implementation for
Hudi, "
+ "that speeds up listing of large tables.")
+ val START_OFFSET: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.streaming.startOffset")
+ .defaultValue("earliest")
+ .withDocumentation("Start offset to pull data from hoodie streaming
source. allow earliest, latest, and " +
+ "specified start instant time")
+
val BEGIN_INSTANTTIME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.begin.instanttime")
.noDefaultValue()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index cd3fe6832d..3e53125805 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -34,7 +34,7 @@ import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
-import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
+import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit,
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit,
HoodieStreamSource}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -185,7 +185,16 @@ class DefaultSource extends RelationProvider
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
- new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
+ val offsetRangeLimit = parameters.getOrElse(START_OFFSET.key(),
START_OFFSET.defaultValue()) match {
+ case offset if offset.equalsIgnoreCase("earliest") =>
+ HoodieEarliestOffsetRangeLimit
+ case offset if offset.equalsIgnoreCase("latest") =>
+ HoodieLatestOffsetRangeLimit
+ case instantTime =>
+ HoodieSpecifiedOffsetRangeLimit(instantTime)
+ }
+
+ new HoodieStreamSource(sqlContext, metadataPath, schema, parameters,
offsetRangeLimit)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieMetadataLog.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieMetadataLog.scala
new file mode 100644
index 0000000000..3675e6c05e
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieMetadataLog.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+
+import org.apache.hudi.common.util.FileIOUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.HDFSMetadataLog
+
+/**
+ * Hoodie type metadata log that uses the specified path as the metadata
storage.
+ */
+class HoodieMetadataLog(
+ sparkSession: SparkSession,
+ metadataPath: String) extends
HDFSMetadataLog[HoodieSourceOffset](sparkSession, metadataPath) {
+
+ private val VERSION = 1
+
+ override def serialize(metadata: HoodieSourceOffset, out: OutputStream):
Unit = {
+ val writer = new BufferedWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush()
+ }
+
+ /**
+ * Deserialize the init offset from the metadata file.
+ * The format in the metadata file is like this:
+ * ----------------------------------------------
+ * v1 -- The version info in the first line
+ * offsetJson -- The json string of HoodieSourceOffset in the rest of the
file
+ * -----------------------------------------------
+ * @param in
+ * @return
+ */
+ override def deserialize(in: InputStream): HoodieSourceOffset = {
+ val content = FileIOUtils.readAsUTFString(in)
+ // Get version from the first line
+ val firstLineEnd = content.indexOf("\n")
+ if (firstLineEnd > 0) {
+ val version = getVersion(content.substring(0, firstLineEnd))
+ if (version > VERSION) {
+ throw new IllegalStateException(s"UnSupportVersion: max support
version is: $VERSION" +
+ s" current version is: $version")
+ }
+ // Get offset from the rest line in the file
+ HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+ } else {
+ throw new IllegalStateException(s"Bad metadata format, failed to find
the version line.")
+ }
+ }
+
+
+ private def getVersion(versionLine: String): Int = {
+ if (versionLine.startsWith("v")) {
+ versionLine.substring(1).toInt
+ } else {
+ throw new IllegalStateException(s"Illegal version line: $versionLine " +
+ s"in the streaming metadata path")
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieOffsetRangeLimit.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieOffsetRangeLimit.scala
new file mode 100644
index 0000000000..493599eced
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieOffsetRangeLimit.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+/**
+ * Objects that describe the offset of the HoodieStreamSource start
+ * Available types are earliest, latest, specified commit range.
+ */
+sealed trait HoodieOffsetRangeLimit
+
+/**
+ * Represent starting from the earliest commit in the hoodie timeline
+ */
+object HoodieEarliestOffsetRangeLimit extends HoodieOffsetRangeLimit
+
+/**
+ * Represent starting from the latest commit in the hoodie timeline
+ */
+object HoodieLatestOffsetRangeLimit extends HoodieOffsetRangeLimit
+
+/**
+ * Represent starting from the specified commit in the hoodie timeline
+ */
+case class HoodieSpecifiedOffsetRangeLimit(instantTime: String) extends
HoodieOffsetRangeLimit
+
+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index 49cbbdd799..0e39347ae9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hudi.streaming
-import java.io.{BufferedWriter, InputStream, OutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
import java.util.Date
import org.apache.hadoop.fs.Path
@@ -29,14 +27,12 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.cdc.HoodieCDCUtils
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}
-import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.hudi.common.util.TablePathUtils
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset,
Source}
+import org.apache.spark.sql.execution.streaming.{Offset, Source}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -52,7 +48,8 @@ class HoodieStreamSource(
sqlContext: SQLContext,
metadataPath: String,
schemaOption: Option[StructType],
- parameters: Map[String, String])
+ parameters: Map[String, String],
+ offsetRangeLimit: HoodieOffsetRangeLimit)
extends Source with Logging with Serializable with SparkAdapterSupport {
@transient private val hadoopConf =
sqlContext.sparkSession.sessionState.newHadoopConf()
@@ -72,57 +69,20 @@ class HoodieStreamSource(
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
&&
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
- @transient private var lastOffset: HoodieSourceOffset = _
-
@transient private lazy val initialOffsets = {
- val metadataLog =
- new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession,
metadataPath) {
- override def serialize(metadata: HoodieSourceOffset, out:
OutputStream): Unit = {
- val writer = new BufferedWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8))
- writer.write("v" + VERSION + "\n")
- writer.write(metadata.json)
- writer.flush()
- }
-
- /**
- * Deserialize the init offset from the metadata file.
- * The format in the metadata file is like this:
- * ----------------------------------------------
- * v1 -- The version info in the first line
- * offsetJson -- The json string of HoodieSourceOffset in the rest of
the file
- * -----------------------------------------------
- * @param in
- * @return
- */
- override def deserialize(in: InputStream): HoodieSourceOffset = {
- val content = FileIOUtils.readAsUTFString(in)
- // Get version from the first line
- val firstLineEnd = content.indexOf("\n")
- if (firstLineEnd > 0) {
- val version = getVersion(content.substring(0, firstLineEnd))
- if (version > VERSION) {
- throw new IllegalStateException(s"UnSupportVersion: max support
version is: $VERSION" +
- s" current version is: $version")
- }
- // Get offset from the rest line in the file
- HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
- } else {
- throw new IllegalStateException(s"Bad metadata format, failed to
find the version line.")
- }
- }
- }
+ val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession,
metadataPath)
metadataLog.get(0).getOrElse {
- metadataLog.add(0, INIT_OFFSET)
- INIT_OFFSET
- }
- }
-
- private def getVersion(versionLine: String): Int = {
- if (versionLine.startsWith("v")) {
- versionLine.substring(1).toInt
- } else {
- throw new IllegalStateException(s"Illegal version line: $versionLine " +
- s"in the streaming metadata path")
+ val offset = offsetRangeLimit match {
+ case HoodieEarliestOffsetRangeLimit =>
+ INIT_OFFSET
+ case HoodieLatestOffsetRangeLimit =>
+ getLatestOffset.getOrElse(INIT_OFFSET)
+ case HoodieSpecifiedOffsetRangeLimit(instantTime) =>
+ HoodieSourceOffset(instantTime)
+ }
+ metadataLog.add(0, offset)
+ logInfo(s"The initial offset is $offset")
+ offset
}
}
@@ -137,27 +97,25 @@ class HoodieStreamSource(
}
}
+ private def getLatestOffset: Option[HoodieSourceOffset] = {
+ metaClient.reloadActiveTimeline()
+ metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
match {
+ case activeInstants if !activeInstants.empty() =>
+
Some(HoodieSourceOffset(activeInstants.lastInstant().get().getTimestamp))
+ case _ =>
+ None
+ }
+ }
+
/**
* Get the latest offset from the hoodie table.
* @return
*/
override def getOffset: Option[Offset] = {
- metaClient.reloadActiveTimeline()
- val activeInstants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
- if (!activeInstants.empty()) {
- val currentLatestCommitTime =
activeInstants.lastInstant().get().getTimestamp
- if (lastOffset == null || currentLatestCommitTime >
lastOffset.commitTime) {
- lastOffset = HoodieSourceOffset(currentLatestCommitTime)
- }
- } else { // if there are no active commits, use the init offset
- lastOffset = initialOffsets
- }
- Some(lastOffset)
+ getLatestOffset
}
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- initialOffsets
-
val startOffset = start.map(HoodieSourceOffset(_))
.getOrElse(initialOffsets)
val endOffset = HoodieSourceOffset(end)
@@ -217,7 +175,3 @@ class HoodieStreamSource(
}
}
-
-object HoodieStreamSource {
- val VERSION = 1
-}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index b628cc157b..84cc741b1d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -17,6 +17,7 @@
package org.apache.hudi.functional
+import org.apache.hudi.DataSourceReadOptions.START_OFFSET
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD,
RECORDKEY_FIELD}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
@@ -138,6 +139,65 @@ class TestStreamingSource extends StreamTest {
}
}
+ test("Test cow from latest offset") {
+ withTempDir { inputDir =>
+ val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(COPY_ON_WRITE)
+ .setTableName(getTableName(tablePath))
+
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .initTable(spark.sessionState.newHadoopConf(), tablePath)
+
+ addData(tablePath, Seq(("1", "a1", "10", "000")))
+ val df = spark.readStream
+ .format("org.apache.hudi")
+ .option(START_OFFSET.key(), "latest")
+ .load(tablePath)
+ .select("id", "name", "price", "ts")
+
+ testStream(df)(
+ AssertOnQuery {q => q.processAllAvailable(); true },
+ // Start from the latest, should contains no data
+ CheckAnswerRows(Seq(), lastOnly = true, isSorted = false),
+ StopStream,
+
+ addDataToQuery(tablePath, Seq(("2", "a1", "12", "000"))),
+ StartStream(),
+ AssertOnQuery {q => q.processAllAvailable(); true },
+ CheckAnswerRows(Seq(Row("2", "a1", "12", "000")), lastOnly = false,
isSorted = false)
+ )
+ }
+ }
+
+ test("Test cow from specified offset") {
+ withTempDir { inputDir =>
+ val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
+ val metaClient = HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(COPY_ON_WRITE)
+ .setTableName(getTableName(tablePath))
+
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .initTable(spark.sessionState.newHadoopConf(), tablePath)
+
+ addData(tablePath, Seq(("1", "a1", "10", "000")))
+ addData(tablePath, Seq(("2", "a1", "11", "001")))
+ addData(tablePath, Seq(("3", "a1", "12", "002")))
+
+ val timestamp =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+ .firstInstant().get().getTimestamp
+ val df = spark.readStream
+ .format("org.apache.hudi")
+ .option(START_OFFSET.key(), timestamp)
+ .load(tablePath)
+ .select("id", "name", "price", "ts")
+
+ testStream(df)(
+ AssertOnQuery {q => q.processAllAvailable(); true },
+ // Start after the first commit
+ CheckAnswerRows(Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12",
"002")), lastOnly = true, isSorted = false)
+ )
+ }
+ }
+
private def addData(inputPath: String, rows: Seq[(String, String, String,
String)]): Unit = {
rows.toDF(columns: _*)
.write