This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 378599175 [spark] Supports read limit for streaming source (#1973)
378599175 is described below
commit 378599175b80a6eb265148952f064a73cfa07ae1
Author: Yann Byron <[email protected]>
AuthorDate: Mon Sep 11 17:47:43 2023 +0800
[spark] Supports read limit for streaming source (#1973)
---
.../generated/spark_connector_configuration.html | 30 +++
.../ContinuousFromTimestampStartingScanner.java | 9 +
.../apache/paimon/spark/SparkConnectorOptions.java | 32 +++
.../org/apache/paimon/spark/PaimonImplicits.scala | 33 +++
.../spark/sources/PaimonMicroBatchStream.scala | 86 +++++-
.../paimon/spark/sources/PaimonReadLimits.scala | 119 +++++++++
.../paimon/spark/sources/PaimonSourceOffset.scala | 10 +
.../apache/paimon/spark/sources/StreamHelper.scala | 66 ++---
.../org/apache/paimon/spark/PaimonSourceTest.scala | 291 +++++++++++++++++++--
9 files changed, 620 insertions(+), 56 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index db9c1d889..84358454e 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -38,5 +38,35 @@ under the License.
<td>Boolean</td>
<td>If true, allow to merge data types if the two types meet the
rules for explicit casting.</td>
</tr>
+ <tr>
+ <td><h5>read.stream.maxFilesPerTrigger</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>The maximum number of files returned in a single batch.</td>
+ </tr>
+ <tr>
+ <td><h5>read.stream.maxBytesPerTrigger</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>The maximum number of bytes returned in a single batch.</td>
+ </tr>
+ <tr>
+ <td><h5>read.stream.maxRowsPerTrigger</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>The maximum number of rows returned in a single batch.</td>
+ </tr>
+ <tr>
+ <td><h5>read.stream.minRowsPerTrigger</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>The minimum number of rows returned in a single batch, which
used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs
together.</td>
+ </tr>
+ <tr>
+ <td><h5>read.stream.maxTriggerDelayMs</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>The maximum delay between two adjacent batches, which used to
create MinRowsReadLimit with read.stream.minRowsPerTrigger together.</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 7fc8cd6aa..6113773ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -42,6 +42,15 @@ public class ContinuousFromTimestampStartingScanner extends
AbstractStartingScan
this.startingSnapshotId =
this.snapshotManager.earlierThanTimeMills(this.startupMillis);
}
+ @Override
+ public StartingContext startingContext() {
+ if (startingSnapshotId == null) {
+ return StartingContext.EMPTY;
+ } else {
+ return new StartingContext(startingSnapshotId + 1, false);
+ }
+ }
+
@Override
public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId =
snapshotManager.earlierThanTimeMills(startupMillis);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 54950fdd9..2f9e9297c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -37,4 +37,36 @@ public class SparkConnectorOptions {
.defaultValue(false)
.withDescription(
"If true, allow to merge data types if the two
types meet the rules for explicit casting.");
+
+ public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
+ key("read.stream.maxFilesPerTrigger")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The maximum number of files returned in
a single batch.");
+
+ public static final ConfigOption<Long> MAX_BYTES_PER_TRIGGER =
+ key("read.stream.maxBytesPerTrigger")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The maximum number of bytes returned in
a single batch.");
+
+ public static final ConfigOption<Long> MAX_ROWS_PER_TRIGGER =
+ key("read.stream.maxRowsPerTrigger")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The maximum number of rows returned in a
single batch.");
+
+ public static final ConfigOption<Long> MIN_ROWS_PER_TRIGGER =
+ key("read.stream.minRowsPerTrigger")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The minimum number of rows returned in a single
batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs
together.");
+
+ public static final ConfigOption<Long> MAX_DELAY_MS_PER_TRIGGER =
+ key("read.stream.maxTriggerDelayMs")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The maximum delay between two adjacent batches,
which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger
together.");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonImplicits.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonImplicits.scala
new file mode 100644
index 000000000..8b7942596
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonImplicits.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import java.util.Optional
+
+import scala.language.implicitConversions
+
+object PaimonImplicits {
+ implicit def toScalaOption[T](o: Optional[T]): Option[T] = {
+ if (o.isPresent) {
+ Option.apply(o.get())
+ } else {
+ None
+ }
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index e8b573c2e..ca730140e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -17,42 +17,110 @@
*/
package org.apache.paimon.spark.sources
-import org.apache.paimon.spark.{SparkInputPartition, SparkReaderFactory}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions,
SparkInputPartition, SparkReaderFactory}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.source.ReadBuilder
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream,
Offset, ReadLimit, SupportsTriggerAvailableNow}
+
+import scala.collection.mutable
class PaimonMicroBatchStream(
originTable: FileStoreTable,
readBuilder: ReadBuilder,
checkpointLocation: String)
extends MicroBatchStream
+ with SupportsTriggerAvailableNow
with StreamHelper
with Logging {
+ private val options = Options.fromMap(table.options())
+
+ lazy val initOffset: PaimonSourceOffset = {
+ val initSnapshotId = Math.max(
+ table.snapshotManager().earliestSnapshotId(),
+ streamScanStartingContext.getSnapshotId)
+ val scanSnapshot = if (initSnapshotId ==
streamScanStartingContext.getSnapshotId) {
+ streamScanStartingContext.getScanFullSnapshot.booleanValue()
+ } else {
+ false
+ }
+ PaimonSourceOffset(initSnapshotId, -1L, scanSnapshot)
+ }
+
+ // the committed offset this is used to detect the validity of subsequent
offsets
private var committedOffset: Option[PaimonSourceOffset] = None
- lazy val initOffset: PaimonSourceOffset =
PaimonSourceOffset(getStartingContext)
+ // the timestamp when the batch is triggered the last time.
+ // It will be reset when there is non-empty PaimonSourceOffset returned by
calling "latestOffset".
+ var lastTriggerMillis = 0L
+
+ // the latest offset when call "prepareForTriggerAvailableNow"
+ // the query will be terminated when data is consumed to this offset in
"TriggerAvailableNow" mode.
+ private var offsetForTriggerAvailableNow: Option[PaimonSourceOffset] = None
+
+ private lazy val defaultReadLimit: ReadLimit = {
+ import PaimonImplicits._
+
+ val readLimits = mutable.ArrayBuffer.empty[ReadLimit]
+ options.getOptional(SparkConnectorOptions.MAX_BYTES_PER_TRIGGER).foreach {
+ bytes => readLimits += ReadMaxBytes(bytes)
+ }
+ options.getOptional(SparkConnectorOptions.MAX_FILES_PER_TRIGGER).foreach {
+ files => readLimits += ReadLimit.maxFiles(files)
+ }
+ options.getOptional(SparkConnectorOptions.MAX_ROWS_PER_TRIGGER).foreach {
+ rows => readLimits += ReadLimit.maxRows(rows)
+ }
+ val minRowsOptional =
options.getOptional(SparkConnectorOptions.MIN_ROWS_PER_TRIGGER)
+ val maxDelayMSOptional =
options.getOptional(SparkConnectorOptions.MAX_DELAY_MS_PER_TRIGGER)
+ if (minRowsOptional.isPresent && maxDelayMSOptional.isPresent) {
+ readLimits += ReadLimit.minRows(minRowsOptional.get(),
maxDelayMSOptional.get())
+ } else if (minRowsOptional.isPresent || maxDelayMSOptional.isPresent) {
+ throw new IllegalArgumentException(
+ "Can't provide only one of read.stream.minRowsPerTrigger and
read.stream.maxTriggerDelayMs.")
+ }
+
+ PaimonReadLimits(ReadLimit.compositeLimit(readLimits.toArray),
lastTriggerMillis)
+ .map(_.toReadLimit)
+ .getOrElse(ReadLimit.allAvailable())
+ }
+
+ override def getDefaultReadLimit: ReadLimit = defaultReadLimit
+
+ override def prepareForTriggerAvailableNow(): Unit = {
+ offsetForTriggerAvailableNow = getLatestOffset(initOffset, None,
ReadLimit.allAvailable())
+ }
override def latestOffset(): Offset = {
- getLatestOffset
+ throw new UnsupportedOperationException(
+ "That latestOffset(Offset, ReadLimit) method should be called instead of
this method.")
+ }
+
+ override def latestOffset(start: Offset, limit: ReadLimit): Offset = {
+ val startOffset = PaimonSourceOffset(start)
+ getLatestOffset(startOffset, offsetForTriggerAvailableNow, limit).map {
+ offset =>
+ lastTriggerMillis = System.currentTimeMillis()
+ offset
+ }.orNull
}
override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
val startOffset = {
- val startOffset0 = PaimonSourceOffset.apply(start)
+ val startOffset0 = PaimonSourceOffset(start)
if (startOffset0.compareTo(initOffset) < 0) {
initOffset
} else {
startOffset0
}
}
- val endOffset = PaimonSourceOffset.apply(end)
+ val endOffset = PaimonSourceOffset(end)
- getBatch(startOffset, endOffset)
+ getBatch(startOffset, Some(endOffset), None)
.map(ids => new SparkInputPartition(ids.entry))
.toArray[InputPartition]
}
@@ -66,11 +134,11 @@ class PaimonMicroBatchStream(
}
override def deserializeOffset(json: String): Offset = {
- PaimonSourceOffset.apply(json)
+ PaimonSourceOffset(json)
}
override def commit(end: Offset): Unit = {
- committedOffset = Some(PaimonSourceOffset.apply(end))
+ committedOffset = Some(PaimonSourceOffset(end))
logInfo(s"$committedOffset is committed.")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonReadLimits.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonReadLimits.scala
new file mode 100644
index 000000000..0c64886be
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonReadLimits.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.spark.sql.connector.read.streaming.{CompositeReadLimit,
ReadAllAvailable, ReadLimit, ReadMaxFiles, ReadMaxRows, ReadMinRows}
+
+import scala.collection.JavaConverters._
+
+case class ReadMaxBytes(bytes: Long) extends ReadLimit
+
+object PaimonReadLimits {
+
+ def apply(limit: ReadLimit, lastTriggerMillis: Long):
Option[PaimonReadLimitGuard] = limit match {
+ case _: ReadAllAvailable => None
+ case composite: CompositeReadLimit if composite.getReadLimits.isEmpty =>
None
+ case composite: CompositeReadLimit if
composite.getReadLimits.forall(supportedReadLimit) =>
+ Some(PaimonReadLimitGuard(composite.getReadLimits, lastTriggerMillis))
+ case limit: ReadLimit if supportedReadLimit(limit) =>
+ Some(PaimonReadLimitGuard(Array(limit), lastTriggerMillis))
+ case other =>
+ throw new UnsupportedOperationException(s"Not supported read limit:
${other.toString}")
+ }
+
+ private def supportedReadLimit(limit: ReadLimit): Boolean = {
+ limit match {
+ case _: ReadAllAvailable => true
+ case _: ReadMaxFiles => true
+ case _: ReadMinRows => true
+ case _: ReadMaxRows => true
+ case _: ReadMaxBytes => true
+ case _ => false
+ }
+ }
+}
+
+case class PaimonReadLimitGuard(limits: Array[ReadLimit], lastTriggerMillis:
Long) {
+
+ private val (minRowsReadLimits, otherReadLimits) =
limits.partition(_.isInstanceOf[ReadMinRows])
+
+ assert(minRowsReadLimits.length <= 1, "Paimon supports one ReadMinRows Read
Limit at most.")
+
+ private var acceptedFiles: Int = 0
+
+ private var acceptedBytes: Long = 0
+
+ private var acceptedRows: Long = 0
+
+ private val minRowsReadLimit = minRowsReadLimits.collectFirst { case limit:
ReadMinRows => limit }
+
+ // may skip this batch if it can't reach the time that must return batch
+ // and the number of rows is less than the threshold.
+ private var _maySkipBatch =
+ minRowsReadLimit.exists(System.currentTimeMillis() - lastTriggerMillis <=
_.maxTriggerDelayMs)
+
+ private var _hasCapacity = true
+
+ def toReadLimit: ReadLimit = {
+ ReadLimit.compositeLimit(limits)
+ }
+
+ def admit(indexedDataSplit: IndexedDataSplit): Boolean = {
+ if (!_hasCapacity) {
+ return false
+ }
+
+ val (rows, bytes) = getBytesAndRows(indexedDataSplit)
+
+ _hasCapacity = otherReadLimits.forall {
+ case maxFiles: ReadMaxFiles =>
+ acceptedFiles < maxFiles.maxFiles
+ case maxRows: ReadMaxRows =>
+ acceptedRows < maxRows.maxRows
+ case maxBytes: ReadMaxBytes =>
+ acceptedBytes < maxBytes.bytes
+ case limit =>
+ throw new UnsupportedOperationException(s"Not supported read limit:
${limit.toString}")
+ }
+
+ acceptedFiles += 1
+ acceptedRows += rows
+ acceptedBytes += bytes
+
+ if (_maySkipBatch) {
+ _maySkipBatch = minRowsReadLimit.exists(acceptedRows < _.minRows)
+ }
+
+ _hasCapacity
+ }
+
+ def skipBatch: Boolean = _maySkipBatch
+
+ def hasCapacity: Boolean = _hasCapacity
+
+ private def getBytesAndRows(indexedDataSplit: IndexedDataSplit): (Long,
Long) = {
+ var rows = 0L
+ var bytes = 0L
+ indexedDataSplit.entry.dataFiles().asScala.foreach {
+ file =>
+ rows += file.rowCount()
+ bytes += file.fileSize()
+ }
+ (rows, bytes)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
index d64fe80f6..3946950fa 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
@@ -60,4 +60,14 @@ object PaimonSourceOffset {
case _ => throw new IllegalArgumentException(s"Can't parse $offset to
PaimonSourceOffset.")
}
}
+
+ def gt(indexedDataSplit: IndexedDataSplit, start: PaimonSourceOffset):
Boolean = {
+ indexedDataSplit.snapshotId > start.snapshotId ||
+ (indexedDataSplit.snapshotId == start.snapshotId && indexedDataSplit.index
> start.index)
+ }
+
+ def le(indexedDataSplit: IndexedDataSplit, end: PaimonSourceOffset): Boolean
= {
+ indexedDataSplit.snapshotId < end.snapshotId ||
+ (indexedDataSplit.snapshotId == end.snapshotId && indexedDataSplit.index
<= end.index)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 0994bdc59..1ef678d83 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types.StructType
@@ -38,6 +39,8 @@ trait StreamHelper extends WithFileStoreTable {
val initOffset: PaimonSourceOffset
+ var lastTriggerMillis: Long
+
private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
private lazy val partitionSchema: StructType =
@@ -50,46 +53,53 @@ trait StreamHelper extends WithFileStoreTable {
)
// Used to get the initial offset.
- def getStartingContext: StartingContext = streamScan.startingContext()
-
- def getLatestOffset: PaimonSourceOffset = {
- val latestSnapshotId = table.snapshotManager().latestSnapshotId()
- val plan = if (needToScanCurrentSnapshot(latestSnapshotId)) {
- table
- .newSnapshotReader()
- .withSnapshot(latestSnapshotId)
- .withMode(ScanMode.ALL)
- .read()
- } else {
- table
- .newSnapshotReader()
- .withSnapshot(latestSnapshotId)
- .withMode(ScanMode.DELTA)
- .read()
- }
- val indexedDataSplits = convertPlanToIndexedSplits(plan)
+ lazy val streamScanStartingContext: StartingContext =
streamScan.startingContext()
+
+ def getLatestOffset(
+ startOffset: PaimonSourceOffset,
+ endOffset: Option[PaimonSourceOffset],
+ limit: ReadLimit): Option[PaimonSourceOffset] = {
+ val indexedDataSplits = getBatch(startOffset, endOffset, Some(limit))
indexedDataSplits.lastOption
.map(ids => PaimonSourceOffset(ids.snapshotId, ids.index, scanSnapshot =
false))
- .orNull
}
def getBatch(
startOffset: PaimonSourceOffset,
- endOffset: PaimonSourceOffset): Array[IndexedDataSplit] = {
- val indexedDataSplits = mutable.ArrayBuffer.empty[IndexedDataSplit]
+ endOffset: Option[PaimonSourceOffset],
+ limit: Option[ReadLimit]): Array[IndexedDataSplit] = {
if (startOffset != null) {
streamScan.restore(startOffset.snapshotId,
needToScanCurrentSnapshot(startOffset.snapshotId))
}
+
+ val readLimitGuard = limit.flatMap(PaimonReadLimits(_, lastTriggerMillis))
var hasSplits = true
- while (hasSplits && streamScan.checkpoint() <= endOffset.snapshotId) {
+ def continue: Boolean = {
+ hasSplits && readLimitGuard.forall(_.hasCapacity) && endOffset.forall(
+ streamScan.checkpoint() <= _.snapshotId)
+ }
+
+ val indexedDataSplits = mutable.ArrayBuffer.empty[IndexedDataSplit]
+ while (continue) {
val plan = streamScan.plan()
if (plan.splits.isEmpty) {
hasSplits = false
} else {
indexedDataSplits ++= convertPlanToIndexedSplits(plan)
+ // Filter by (start, end]
+ .filter(ids => inRange(ids, startOffset, endOffset))
+ // Filter splits by read limits other than ReadMinRows.
+ .takeWhile(s => readLimitGuard.forall(_.admit(s)))
}
}
- indexedDataSplits.filter(ids => inRange(ids, startOffset,
endOffset)).toArray
+
+ // Filter splits by ReadMinRows read limit if exists.
+ // If this batch doesn't meet the condition of ReadMinRows, then nothing
will be returned.
+ if (readLimitGuard.exists(_.skipBatch)) {
+ Array.empty
+ } else {
+ indexedDataSplits.toArray
+ }
}
private def needToScanCurrentSnapshot(snapshotId: Long): Boolean = {
@@ -134,13 +144,9 @@ trait StreamHelper extends WithFileStoreTable {
private def inRange(
indexedDataSplit: IndexedDataSplit,
start: PaimonSourceOffset,
- end: PaimonSourceOffset): Boolean = {
- val startRange = indexedDataSplit.snapshotId > start.snapshotId ||
- (indexedDataSplit.snapshotId == start.snapshotId &&
indexedDataSplit.index > start.index)
- val endRange = indexedDataSplit.snapshotId < end.snapshotId ||
- (indexedDataSplit.snapshotId == end.snapshotId && indexedDataSplit.index
<= end.index)
-
- startRange && endRange
+ end: Option[PaimonSourceOffset]): Boolean = {
+ PaimonSourceOffset.gt(indexedDataSplit, start) && end.forall(
+ PaimonSourceOffset.le(indexedDataSplit, _))
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index b99ed592b..bef1c4b60 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -18,9 +18,10 @@
package org.apache.paimon.spark
import org.apache.paimon.WriteMode
+import org.apache.paimon.spark.sources.PaimonSourceOffset
import org.apache.spark.sql.Row
-import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest,
Trigger}
import org.junit.jupiter.api.Assertions
import java.util.concurrent.TimeUnit
@@ -32,7 +33,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: default scan mode") {
withTempDir {
checkpointDir =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
val query = spark.readStream
@@ -70,7 +71,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: default and from-snapshot scan mode with
scan.snapshot-id") {
withTempDirs {
(checkpointDir1, checkpointDir2) =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
// set scan.snapshot-id = 3, this query can read the latest changes.
@@ -128,7 +129,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
(checkpointDir1, checkpointDir2) =>
// timestamp that is before this table is created and data is written.
val ts1 = System.currentTimeMillis()
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
// timestamp that is after this table is created and data is written.
val ts2 = System.currentTimeMillis()
@@ -182,7 +183,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: latest and latest-full scan mode") {
withTempDirs {
(checkpointDir1, checkpointDir2) =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
val query1 = spark.readStream
@@ -238,7 +239,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: from-snapshot and from-snapshot-full scan mode") {
withTempDirs {
(checkpointDir1, checkpointDir2) =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
val query1 = spark.readStream
@@ -296,7 +297,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: Trigger AvailableNow") {
withTempDir {
checkpointDir =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
val query = spark.readStream
@@ -329,10 +330,253 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
}
}
+ test("Paimon Source: Trigger AvailableNow + latest scan mode +
maxFilesPerTrigger read limit") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges,
snapshotToDataSplitNum) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "latest")
+ .option("read.stream.maxFilesPerTrigger", "1")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .trigger(Trigger.AvailableNow())
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ Assertions.assertTrue(query.isActive)
+ query.processAllAvailable()
+ // 2 is the number of bucket, also is the number of DataSplit which
is generated by planning.
+ Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertEquals(
+ PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+ checkAnswer(currentResult(), latestChanges)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ Assertions.assertFalse(query.isActive)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: Default Trigger + from-snapshot scan mode +
maxBytesPerTrigger read limit") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges,
snapshotToDataSplitNum) =
+ prepareTableAndGetLocation(3, WriteMode.APPEND_ONLY)
+ val totalDataSplitNum = snapshotToDataSplitNum.values.sum
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 1)
+ // Set 1 byte to maxBytesPerTrigger, that will lead to return only
one data split.
+ .option("read.stream.maxBytesPerTrigger", "1")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ query.processAllAvailable()
+ // Each batch can consume one data split.
+ // Therefore it takes totalDataSplitNum batches to consume all the
data.
+ Assertions.assertEquals(
+ totalDataSplitNum,
+ query.recentProgress.count(_.numInputRows != 0))
+ Assertions.assertEquals(
+ PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+ var totalStreamingData = snapshotData
+ checkAnswer(currentResult(), totalStreamingData)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query.processAllAvailable()
+ // Two additional batches are required to consume the new data.
+ Assertions.assertEquals(
+ totalDataSplitNum + 2,
+ query.recentProgress.count(_.numInputRows != 0))
+ Assertions.assertEquals(
+ PaimonSourceOffset(4L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+ totalStreamingData =
+ totalStreamingData ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult(), totalStreamingData)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: default scan mode + maxRowsPerTrigger read limit") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("read.stream.maxRowsPerTrigger", "10")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ query.processAllAvailable()
+ // Only 9 rows in this table. Only one batch can consume all the
data.
+ Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertEquals(
+ PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+ checkAnswer(currentResult(), snapshotData)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: default scan mode + minRowsPerTrigger and
maxTriggerDelayMs read limits") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ assertThrows[StreamingQueryException] {
+ spark.readStream
+ .format("paimon")
+ .option("read.stream.minRowsPerTrigger", "10")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+ .processAllAvailable()
+ }
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("read.stream.minRowsPerTrigger", "5")
+ .option("read.stream.maxTriggerDelayMs", "5000")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ query.processAllAvailable()
+ // One batch can consume all the data.
+ Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
+ var totalStreamingData = snapshotData
+ checkAnswer(currentResult(), totalStreamingData)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query.processAllAvailable()
+ // There are only 3 new rows,at least 5 rows can trigger a batch.
+ // So there latest committed offset is not changed.
+ Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertEquals(
+ PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+ checkAnswer(currentResult(), totalStreamingData)
+
+ Thread.sleep(5000)
+ query.processAllAvailable()
+ // Now 5s passed, a batch can be triggered even if there are only 3
rows, but the maximum delay time(5s) is reached.
+ Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertEquals(
+ PaimonSourceOffset(4L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+ totalStreamingData =
+ totalStreamingData ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult(), totalStreamingData)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test(
+ "Paimon Source: from-snapshot scan mode + maxRowsPerTrigger,
minRowsPerTrigger and maxTriggerDelayMs read limits") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
+ prepareTableAndGetLocation(0, WriteMode.APPEND_ONLY)
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 1)
+ .option("read.stream.maxRowsPerTrigger", "6")
+ .option("read.stream.minRowsPerTrigger", "4")
+ .option("read.stream.maxTriggerDelayMs", "5000")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ spark.sql("INSERT INTO T VALUES (10, 'v_10'), (11, 'v_11'), (12,
'v_12')")
+ query.processAllAvailable()
+ // The first batch will always be triggered when use ReadMinRows
read limit.
+ Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
+
+ spark.sql("INSERT INTO T VALUES (20, 'v_20'), (21, 'v_21'), (22,
'v_22')")
+ query.processAllAvailable()
+ Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertEquals(3L, query.lastProgress.numInputRows)
+
+ spark.sql(
+ """
+ |INSERT INTO T VALUES (30, 'v_30'), (31, 'v_31'), (32, 'v_32'),
(33, 'v_33'),
+ | (34, 'v_34'), (35, 'v_35'), (36, 'v_36'), (37, 'v_37'), (38,
'v_38'), (39, 'v_39')
+ | """.stripMargin)
+ query.processAllAvailable()
+ // Since the limits of minRowsPerTrigger and maxRowsPerTrigger, not
all data can be consumed at this batch.
+ Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertTrue(query.recentProgress.map(_.numInputRows).sum <
16)
+
+ Thread.sleep(5000)
+ // the rest rows can trigger a batch. Then all the data are consumed.
+ Assertions.assertEquals(3, query.recentProgress.count(_.numInputRows
!= 0))
+ Assertions.assertEquals(16L,
query.recentProgress.map(_.numInputRows).sum)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
test("Paimon Source: Trigger ProcessingTime 5s") {
withTempDir {
checkpointDir =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
val query = spark.readStream
@@ -365,7 +609,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: with error options") {
withTempDir {
checkpointDir =>
- val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
assertThrows[IllegalArgumentException] {
@@ -388,7 +632,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source: not supports compacted-full scan mode in streaming
mode") {
withTempDir {
checkpointDir =>
- val TableSnapshotState(_, location, _, _) =
+ val TableSnapshotState(_, location, _, _, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
val query = spark.readStream
@@ -412,10 +656,10 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
test("Paimon Source and Sink") {
withTempDir {
checkpointDir =>
- val TableSnapshotState(_, location, _, latestChanges) =
+ val TableSnapshotState(_, location, _, latestChanges, _) =
prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG, tableName = "T1")
- val TableSnapshotState(_, targetLocation, _, _) =
+ val TableSnapshotState(_, targetLocation, _, _, _) =
prepareTableAndGetLocation(0, WriteMode.APPEND_ONLY, tableName =
"T2")
val df = spark.readStream
@@ -466,7 +710,8 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
table: String,
location: String,
snapshotData: Seq[Row],
- latestChanges: Seq[Row]
+ latestChanges: Seq[Row],
+ snapshotToDataSplitNum: Map[Long, Int]
)
/** Create a paimon table, insert some data, return the location of this
table. */
@@ -487,7 +732,8 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE $tableName (a INT, b STRING)
|TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='2', 'file.format'='parquet')
|""".stripMargin)
- val location = loadTable(tableName).location().getPath
+ val table = loadTable(tableName)
+ val location = table.location().getPath
val mergedData = scala.collection.mutable.TreeMap.empty[Int, String]
val unmergedData = scala.collection.mutable.ArrayBuffer.empty[(Int,
String)]
@@ -517,9 +763,11 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
}
}
+ val snapshotToDataSplitNum = scala.collection.mutable.Map.empty[Long, Int]
+ val streamScan = table.newStreamScan()
(1 to snapshotNum).foreach {
- round =>
- val startId = 10 * round
+ snapshotId =>
+ val startId = 10 * snapshotId
val data = (startId to startId + 2).map {
id =>
val row = (id, s"v_$id")
@@ -528,10 +776,19 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
}
latestChanges = data.toArray
data.toDF("a",
"b").write.format("paimon").mode("append").save(location)
+
+ streamScan.restore(snapshotId)
+ val dataSplitNum = streamScan.plan().splits().size()
+ snapshotToDataSplitNum += snapshotId.toLong -> dataSplitNum
}
val snapshotState = currentTableSnapshotState
- TableSnapshotState("T", location, snapshotState._1, snapshotState._2)
+ TableSnapshotState(
+ "T",
+ location,
+ snapshotState._1,
+ snapshotState._2,
+ snapshotToDataSplitNum.toMap)
}
}