This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 378599175 [spark] Supports read limit for streaming source (#1973)
378599175 is described below

commit 378599175b80a6eb265148952f064a73cfa07ae1
Author: Yann Byron <[email protected]>
AuthorDate: Mon Sep 11 17:47:43 2023 +0800

    [spark] Supports read limit for streaming source (#1973)
---
 .../generated/spark_connector_configuration.html   |  30 +++
 .../ContinuousFromTimestampStartingScanner.java    |   9 +
 .../apache/paimon/spark/SparkConnectorOptions.java |  32 +++
 .../org/apache/paimon/spark/PaimonImplicits.scala  |  33 +++
 .../spark/sources/PaimonMicroBatchStream.scala     |  86 +++++-
 .../paimon/spark/sources/PaimonReadLimits.scala    | 119 +++++++++
 .../paimon/spark/sources/PaimonSourceOffset.scala  |  10 +
 .../apache/paimon/spark/sources/StreamHelper.scala |  66 ++---
 .../org/apache/paimon/spark/PaimonSourceTest.scala | 291 +++++++++++++++++++--
 9 files changed, 620 insertions(+), 56 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index db9c1d889..84358454e 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -38,5 +38,35 @@ under the License.
             <td>Boolean</td>
             <td>If true, allow to merge data types if the two types meet the 
rules for explicit casting.</td>
         </tr>
+        <tr>
+            <td><h5>read.stream.maxFilesPerTrigger</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The maximum number of files returned in a single batch.</td>
+        </tr>
+        <tr>
+            <td><h5>read.stream.maxBytesPerTrigger</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The maximum number of bytes returned in a single batch.</td>
+        </tr>
+        <tr>
+            <td><h5>read.stream.maxRowsPerTrigger</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The maximum number of rows returned in a single batch.</td>
+        </tr>
+        <tr>
+            <td><h5>read.stream.minRowsPerTrigger</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The minimum number of rows returned in a single batch, which 
used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs 
together.</td>
+        </tr>
+        <tr>
+            <td><h5>read.stream.maxTriggerDelayMs</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The maximum delay between two adjacent batches, which used to 
create MinRowsReadLimit with read.stream.minRowsPerTrigger together.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 7fc8cd6aa..6113773ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -42,6 +42,15 @@ public class ContinuousFromTimestampStartingScanner extends 
AbstractStartingScan
         this.startingSnapshotId = 
this.snapshotManager.earlierThanTimeMills(this.startupMillis);
     }
 
+    @Override
+    public StartingContext startingContext() {
+        if (startingSnapshotId == null) {
+            return StartingContext.EMPTY;
+        } else {
+            return new StartingContext(startingSnapshotId + 1, false);
+        }
+    }
+
     @Override
     public Result scan(SnapshotReader snapshotReader) {
         Long startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 54950fdd9..2f9e9297c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -37,4 +37,36 @@ public class SparkConnectorOptions {
                     .defaultValue(false)
                     .withDescription(
                             "If true, allow to merge data types if the two 
types meet the rules for explicit casting.");
+
+    public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
+            key("read.stream.maxFilesPerTrigger")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("The maximum number of files returned in 
a single batch.");
+
+    public static final ConfigOption<Long> MAX_BYTES_PER_TRIGGER =
+            key("read.stream.maxBytesPerTrigger")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The maximum number of bytes returned in 
a single batch.");
+
+    public static final ConfigOption<Long> MAX_ROWS_PER_TRIGGER =
+            key("read.stream.maxRowsPerTrigger")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The maximum number of rows returned in a 
single batch.");
+
+    public static final ConfigOption<Long> MIN_ROWS_PER_TRIGGER =
+            key("read.stream.minRowsPerTrigger")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The minimum number of rows returned in a single 
batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs 
together.");
+
+    public static final ConfigOption<Long> MAX_DELAY_MS_PER_TRIGGER =
+            key("read.stream.maxTriggerDelayMs")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The maximum delay between two adjacent batches, 
which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger 
together.");
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonImplicits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonImplicits.scala
new file mode 100644
index 000000000..8b7942596
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonImplicits.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import java.util.Optional
+
+import scala.language.implicitConversions
+
+object PaimonImplicits {
+  implicit def toScalaOption[T](o: Optional[T]): Option[T] = {
+    if (o.isPresent) {
+      Option.apply(o.get())
+    } else {
+      None
+    }
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index e8b573c2e..ca730140e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -17,42 +17,110 @@
  */
 package org.apache.paimon.spark.sources
 
-import org.apache.paimon.spark.{SparkInputPartition, SparkReaderFactory}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions, 
SparkInputPartition, SparkReaderFactory}
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.source.ReadBuilder
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
Offset, ReadLimit, SupportsTriggerAvailableNow}
+
+import scala.collection.mutable
 
 class PaimonMicroBatchStream(
     originTable: FileStoreTable,
     readBuilder: ReadBuilder,
     checkpointLocation: String)
   extends MicroBatchStream
+  with SupportsTriggerAvailableNow
   with StreamHelper
   with Logging {
 
+  private val options = Options.fromMap(table.options())
+
+  lazy val initOffset: PaimonSourceOffset = {
+    val initSnapshotId = Math.max(
+      table.snapshotManager().earliestSnapshotId(),
+      streamScanStartingContext.getSnapshotId)
+    val scanSnapshot = if (initSnapshotId == 
streamScanStartingContext.getSnapshotId) {
+      streamScanStartingContext.getScanFullSnapshot.booleanValue()
+    } else {
+      false
+    }
+    PaimonSourceOffset(initSnapshotId, -1L, scanSnapshot)
+  }
+
+  // the committed offset this is used to detect the validity of subsequent 
offsets
   private var committedOffset: Option[PaimonSourceOffset] = None
 
-  lazy val initOffset: PaimonSourceOffset = 
PaimonSourceOffset(getStartingContext)
+  // the timestamp when the batch is triggered the last time.
+  // It will be reset when there is non-empty PaimonSourceOffset returned by 
calling "latestOffset".
+  var lastTriggerMillis = 0L
+
+  // the latest offset when call "prepareForTriggerAvailableNow"
+  // the query will be terminated when data is consumed to this offset in 
"TriggerAvailableNow" mode.
+  private var offsetForTriggerAvailableNow: Option[PaimonSourceOffset] = None
+
+  private lazy val defaultReadLimit: ReadLimit = {
+    import PaimonImplicits._
+
+    val readLimits = mutable.ArrayBuffer.empty[ReadLimit]
+    options.getOptional(SparkConnectorOptions.MAX_BYTES_PER_TRIGGER).foreach {
+      bytes => readLimits += ReadMaxBytes(bytes)
+    }
+    options.getOptional(SparkConnectorOptions.MAX_FILES_PER_TRIGGER).foreach {
+      files => readLimits += ReadLimit.maxFiles(files)
+    }
+    options.getOptional(SparkConnectorOptions.MAX_ROWS_PER_TRIGGER).foreach {
+      rows => readLimits += ReadLimit.maxRows(rows)
+    }
+    val minRowsOptional = 
options.getOptional(SparkConnectorOptions.MIN_ROWS_PER_TRIGGER)
+    val maxDelayMSOptional = 
options.getOptional(SparkConnectorOptions.MAX_DELAY_MS_PER_TRIGGER)
+    if (minRowsOptional.isPresent && maxDelayMSOptional.isPresent) {
+      readLimits += ReadLimit.minRows(minRowsOptional.get(), 
maxDelayMSOptional.get())
+    } else if (minRowsOptional.isPresent || maxDelayMSOptional.isPresent) {
+      throw new IllegalArgumentException(
+        "Can't provide only one of read.stream.minRowsPerTrigger and 
read.stream.maxTriggerDelayMs.")
+    }
+
+    PaimonReadLimits(ReadLimit.compositeLimit(readLimits.toArray), 
lastTriggerMillis)
+      .map(_.toReadLimit)
+      .getOrElse(ReadLimit.allAvailable())
+  }
+
+  override def getDefaultReadLimit: ReadLimit = defaultReadLimit
+
+  override def prepareForTriggerAvailableNow(): Unit = {
+    offsetForTriggerAvailableNow = getLatestOffset(initOffset, None, 
ReadLimit.allAvailable())
+  }
 
   override def latestOffset(): Offset = {
-    getLatestOffset
+    throw new UnsupportedOperationException(
+      "That latestOffset(Offset, ReadLimit) method should be called instead of 
this method.")
+  }
+
+  override def latestOffset(start: Offset, limit: ReadLimit): Offset = {
+    val startOffset = PaimonSourceOffset(start)
+    getLatestOffset(startOffset, offsetForTriggerAvailableNow, limit).map {
+      offset =>
+        lastTriggerMillis = System.currentTimeMillis()
+        offset
+    }.orNull
   }
 
   override def planInputPartitions(start: Offset, end: Offset): 
Array[InputPartition] = {
     val startOffset = {
-      val startOffset0 = PaimonSourceOffset.apply(start)
+      val startOffset0 = PaimonSourceOffset(start)
       if (startOffset0.compareTo(initOffset) < 0) {
         initOffset
       } else {
         startOffset0
       }
     }
-    val endOffset = PaimonSourceOffset.apply(end)
+    val endOffset = PaimonSourceOffset(end)
 
-    getBatch(startOffset, endOffset)
+    getBatch(startOffset, Some(endOffset), None)
       .map(ids => new SparkInputPartition(ids.entry))
       .toArray[InputPartition]
   }
@@ -66,11 +134,11 @@ class PaimonMicroBatchStream(
   }
 
   override def deserializeOffset(json: String): Offset = {
-    PaimonSourceOffset.apply(json)
+    PaimonSourceOffset(json)
   }
 
   override def commit(end: Offset): Unit = {
-    committedOffset = Some(PaimonSourceOffset.apply(end))
+    committedOffset = Some(PaimonSourceOffset(end))
     logInfo(s"$committedOffset is committed.")
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonReadLimits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonReadLimits.scala
new file mode 100644
index 000000000..0c64886be
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonReadLimits.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.spark.sql.connector.read.streaming.{CompositeReadLimit, 
ReadAllAvailable, ReadLimit, ReadMaxFiles, ReadMaxRows, ReadMinRows}
+
+import scala.collection.JavaConverters._
+
+case class ReadMaxBytes(bytes: Long) extends ReadLimit
+
+object PaimonReadLimits {
+
+  def apply(limit: ReadLimit, lastTriggerMillis: Long): 
Option[PaimonReadLimitGuard] = limit match {
+    case _: ReadAllAvailable => None
+    case composite: CompositeReadLimit if composite.getReadLimits.isEmpty => 
None
+    case composite: CompositeReadLimit if 
composite.getReadLimits.forall(supportedReadLimit) =>
+      Some(PaimonReadLimitGuard(composite.getReadLimits, lastTriggerMillis))
+    case limit: ReadLimit if supportedReadLimit(limit) =>
+      Some(PaimonReadLimitGuard(Array(limit), lastTriggerMillis))
+    case other =>
+      throw new UnsupportedOperationException(s"Not supported read limit: 
${other.toString}")
+  }
+
+  private def supportedReadLimit(limit: ReadLimit): Boolean = {
+    limit match {
+      case _: ReadAllAvailable => true
+      case _: ReadMaxFiles => true
+      case _: ReadMinRows => true
+      case _: ReadMaxRows => true
+      case _: ReadMaxBytes => true
+      case _ => false
+    }
+  }
+}
+
+case class PaimonReadLimitGuard(limits: Array[ReadLimit], lastTriggerMillis: 
Long) {
+
+  private val (minRowsReadLimits, otherReadLimits) = 
limits.partition(_.isInstanceOf[ReadMinRows])
+
+  assert(minRowsReadLimits.length <= 1, "Paimon supports one ReadMinRows Read 
Limit at most.")
+
+  private var acceptedFiles: Int = 0
+
+  private var acceptedBytes: Long = 0
+
+  private var acceptedRows: Long = 0
+
+  private val minRowsReadLimit = minRowsReadLimits.collectFirst { case limit: 
ReadMinRows => limit }
+
+  // may skip this batch if it can't reach the time that must return batch
+  // and the number of rows is less than the threshold.
+  private var _maySkipBatch =
+    minRowsReadLimit.exists(System.currentTimeMillis() - lastTriggerMillis <= 
_.maxTriggerDelayMs)
+
+  private var _hasCapacity = true
+
+  def toReadLimit: ReadLimit = {
+    ReadLimit.compositeLimit(limits)
+  }
+
+  def admit(indexedDataSplit: IndexedDataSplit): Boolean = {
+    if (!_hasCapacity) {
+      return false
+    }
+
+    val (rows, bytes) = getBytesAndRows(indexedDataSplit)
+
+    _hasCapacity = otherReadLimits.forall {
+      case maxFiles: ReadMaxFiles =>
+        acceptedFiles < maxFiles.maxFiles
+      case maxRows: ReadMaxRows =>
+        acceptedRows < maxRows.maxRows
+      case maxBytes: ReadMaxBytes =>
+        acceptedBytes < maxBytes.bytes
+      case limit =>
+        throw new UnsupportedOperationException(s"Not supported read limit: 
${limit.toString}")
+    }
+
+    acceptedFiles += 1
+    acceptedRows += rows
+    acceptedBytes += bytes
+
+    if (_maySkipBatch) {
+      _maySkipBatch = minRowsReadLimit.exists(acceptedRows < _.minRows)
+    }
+
+    _hasCapacity
+  }
+
+  def skipBatch: Boolean = _maySkipBatch
+
+  def hasCapacity: Boolean = _hasCapacity
+
+  private def getBytesAndRows(indexedDataSplit: IndexedDataSplit): (Long, 
Long) = {
+    var rows = 0L
+    var bytes = 0L
+    indexedDataSplit.entry.dataFiles().asScala.foreach {
+      file =>
+        rows += file.rowCount()
+        bytes += file.fileSize()
+    }
+    (rows, bytes)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
index d64fe80f6..3946950fa 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
@@ -60,4 +60,14 @@ object PaimonSourceOffset {
       case _ => throw new IllegalArgumentException(s"Can't parse $offset to 
PaimonSourceOffset.")
     }
   }
+
+  def gt(indexedDataSplit: IndexedDataSplit, start: PaimonSourceOffset): 
Boolean = {
+    indexedDataSplit.snapshotId > start.snapshotId ||
+    (indexedDataSplit.snapshotId == start.snapshotId && indexedDataSplit.index 
> start.index)
+  }
+
+  def le(indexedDataSplit: IndexedDataSplit, end: PaimonSourceOffset): Boolean 
= {
+    indexedDataSplit.snapshotId < end.snapshotId ||
+    (indexedDataSplit.snapshotId == end.snapshotId && indexedDataSplit.index 
<= end.index)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 0994bdc59..1ef678d83 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.table.source.TableScan.Plan
 import org.apache.paimon.table.source.snapshot.StartingContext
 import org.apache.paimon.utils.RowDataPartitionComputer
 
+import org.apache.spark.sql.connector.read.streaming.ReadLimit
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types.StructType
 
@@ -38,6 +39,8 @@ trait StreamHelper extends WithFileStoreTable {
 
   val initOffset: PaimonSourceOffset
 
+  var lastTriggerMillis: Long
+
   private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
 
   private lazy val partitionSchema: StructType =
@@ -50,46 +53,53 @@ trait StreamHelper extends WithFileStoreTable {
   )
 
   // Used to get the initial offset.
-  def getStartingContext: StartingContext = streamScan.startingContext()
-
-  def getLatestOffset: PaimonSourceOffset = {
-    val latestSnapshotId = table.snapshotManager().latestSnapshotId()
-    val plan = if (needToScanCurrentSnapshot(latestSnapshotId)) {
-      table
-        .newSnapshotReader()
-        .withSnapshot(latestSnapshotId)
-        .withMode(ScanMode.ALL)
-        .read()
-    } else {
-      table
-        .newSnapshotReader()
-        .withSnapshot(latestSnapshotId)
-        .withMode(ScanMode.DELTA)
-        .read()
-    }
-    val indexedDataSplits = convertPlanToIndexedSplits(plan)
+  lazy val streamScanStartingContext: StartingContext = 
streamScan.startingContext()
+
+  def getLatestOffset(
+      startOffset: PaimonSourceOffset,
+      endOffset: Option[PaimonSourceOffset],
+      limit: ReadLimit): Option[PaimonSourceOffset] = {
+    val indexedDataSplits = getBatch(startOffset, endOffset, Some(limit))
     indexedDataSplits.lastOption
       .map(ids => PaimonSourceOffset(ids.snapshotId, ids.index, scanSnapshot = 
false))
-      .orNull
   }
 
   def getBatch(
       startOffset: PaimonSourceOffset,
-      endOffset: PaimonSourceOffset): Array[IndexedDataSplit] = {
-    val indexedDataSplits = mutable.ArrayBuffer.empty[IndexedDataSplit]
+      endOffset: Option[PaimonSourceOffset],
+      limit: Option[ReadLimit]): Array[IndexedDataSplit] = {
     if (startOffset != null) {
       streamScan.restore(startOffset.snapshotId, 
needToScanCurrentSnapshot(startOffset.snapshotId))
     }
+
+    val readLimitGuard = limit.flatMap(PaimonReadLimits(_, lastTriggerMillis))
     var hasSplits = true
-    while (hasSplits && streamScan.checkpoint() <= endOffset.snapshotId) {
+    def continue: Boolean = {
+      hasSplits && readLimitGuard.forall(_.hasCapacity) && endOffset.forall(
+        streamScan.checkpoint() <= _.snapshotId)
+    }
+
+    val indexedDataSplits = mutable.ArrayBuffer.empty[IndexedDataSplit]
+    while (continue) {
       val plan = streamScan.plan()
       if (plan.splits.isEmpty) {
         hasSplits = false
       } else {
         indexedDataSplits ++= convertPlanToIndexedSplits(plan)
+          // Filter by (start, end]
+          .filter(ids => inRange(ids, startOffset, endOffset))
+          // Filter splits by read limits other than ReadMinRows.
+          .takeWhile(s => readLimitGuard.forall(_.admit(s)))
       }
     }
-    indexedDataSplits.filter(ids => inRange(ids, startOffset, 
endOffset)).toArray
+
+    // Filter splits by ReadMinRows read limit if exists.
+    // If this batch doesn't meet the condition of ReadMinRows, then nothing 
will be returned.
+    if (readLimitGuard.exists(_.skipBatch)) {
+      Array.empty
+    } else {
+      indexedDataSplits.toArray
+    }
   }
 
   private def needToScanCurrentSnapshot(snapshotId: Long): Boolean = {
@@ -134,13 +144,9 @@ trait StreamHelper extends WithFileStoreTable {
   private def inRange(
       indexedDataSplit: IndexedDataSplit,
       start: PaimonSourceOffset,
-      end: PaimonSourceOffset): Boolean = {
-    val startRange = indexedDataSplit.snapshotId > start.snapshotId ||
-      (indexedDataSplit.snapshotId == start.snapshotId && 
indexedDataSplit.index > start.index)
-    val endRange = indexedDataSplit.snapshotId < end.snapshotId ||
-      (indexedDataSplit.snapshotId == end.snapshotId && indexedDataSplit.index 
<= end.index)
-
-    startRange && endRange
+      end: Option[PaimonSourceOffset]): Boolean = {
+    PaimonSourceOffset.gt(indexedDataSplit, start) && end.forall(
+      PaimonSourceOffset.le(indexedDataSplit, _))
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index b99ed592b..bef1c4b60 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -18,9 +18,10 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.WriteMode
+import org.apache.paimon.spark.sources.PaimonSourceOffset
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest, 
Trigger}
 import org.junit.jupiter.api.Assertions
 
 import java.util.concurrent.TimeUnit
@@ -32,7 +33,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: default scan mode") {
     withTempDir {
       checkpointDir =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         val query = spark.readStream
@@ -70,7 +71,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: default and from-snapshot scan mode with 
scan.snapshot-id") {
     withTempDirs {
       (checkpointDir1, checkpointDir2) =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         // set scan.snapshot-id = 3, this query can read the latest changes.
@@ -128,7 +129,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
       (checkpointDir1, checkpointDir2) =>
         // timestamp that is before this table is created and data is written.
         val ts1 = System.currentTimeMillis()
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
         // timestamp that is after this table is created and data is written.
         val ts2 = System.currentTimeMillis()
@@ -182,7 +183,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: latest and latest-full scan mode") {
     withTempDirs {
       (checkpointDir1, checkpointDir2) =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         val query1 = spark.readStream
@@ -238,7 +239,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: from-snapshot and from-snapshot-full scan mode") {
     withTempDirs {
       (checkpointDir1, checkpointDir2) =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         val query1 = spark.readStream
@@ -296,7 +297,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: Trigger AvailableNow") {
     withTempDir {
       checkpointDir =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         val query = spark.readStream
@@ -329,10 +330,253 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
     }
   }
 
+  test("Paimon Source: Trigger AvailableNow + latest scan mode + 
maxFilesPerTrigger read limit") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges, 
snapshotToDataSplitNum) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "latest")
+          .option("read.stream.maxFilesPerTrigger", "1")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .trigger(Trigger.AvailableNow())
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          Assertions.assertTrue(query.isActive)
+          query.processAllAvailable()
+          // 2 is the number of bucket, also is the number of DataSplit which 
is generated by planning.
+          Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertEquals(
+            PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+          checkAnswer(currentResult(), latestChanges)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          Assertions.assertFalse(query.isActive)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: Default Trigger + from-snapshot scan mode + 
maxBytesPerTrigger read limit") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges, 
snapshotToDataSplitNum) =
+          prepareTableAndGetLocation(3, WriteMode.APPEND_ONLY)
+        val totalDataSplitNum = snapshotToDataSplitNum.values.sum
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "from-snapshot")
+          .option("scan.snapshot-id", 1)
+          // Set 1 byte to maxBytesPerTrigger, that will lead to return only 
one data split.
+          .option("read.stream.maxBytesPerTrigger", "1")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          query.processAllAvailable()
+          // Each batch can consume one data split.
+          // Therefore it takes totalDataSplitNum batches to consume all the 
data.
+          Assertions.assertEquals(
+            totalDataSplitNum,
+            query.recentProgress.count(_.numInputRows != 0))
+          Assertions.assertEquals(
+            PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+          var totalStreamingData = snapshotData
+          checkAnswer(currentResult(), totalStreamingData)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query.processAllAvailable()
+          // Two additional batches are required to consume the new data.
+          Assertions.assertEquals(
+            totalDataSplitNum + 2,
+            query.recentProgress.count(_.numInputRows != 0))
+          Assertions.assertEquals(
+            PaimonSourceOffset(4L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+          totalStreamingData =
+            totalStreamingData ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult(), totalStreamingData)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: default scan mode + maxRowsPerTrigger read limit") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("read.stream.maxRowsPerTrigger", "10")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          query.processAllAvailable()
+          // Only 9 rows in this table. Only one batch can consume all the 
data.
+          Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertEquals(
+            PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+          checkAnswer(currentResult(), snapshotData)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: default scan mode + minRowsPerTrigger and 
maxTriggerDelayMs read limits") {
+    withTempDirs {
+      (checkpointDir1, checkpointDir2) =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        assertThrows[StreamingQueryException] {
+          spark.readStream
+            .format("paimon")
+            .option("read.stream.minRowsPerTrigger", "10")
+            .load(location)
+            .writeStream
+            .format("memory")
+            .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+            .queryName("mem_table")
+            .outputMode("append")
+            .start()
+            .processAllAvailable()
+        }
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("read.stream.minRowsPerTrigger", "5")
+          .option("read.stream.maxTriggerDelayMs", "5000")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          query.processAllAvailable()
+          // One batch can consume all the data.
+          Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
+          var totalStreamingData = snapshotData
+          checkAnswer(currentResult(), totalStreamingData)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query.processAllAvailable()
+          // There are only 3 new rows,at least 5 rows can trigger a batch.
+          // So there latest committed offset is not changed.
+          Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertEquals(
+            PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+          checkAnswer(currentResult(), totalStreamingData)
+
+          Thread.sleep(5000)
+          query.processAllAvailable()
+          // Now 5s passed, a batch can be triggered even if there are only 3 
rows, but the maximum delay time(5s) is reached.
+          Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertEquals(
+            PaimonSourceOffset(4L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
+          totalStreamingData =
+            totalStreamingData ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult(), totalStreamingData)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test(
+    "Paimon Source: from-snapshot scan mode + maxRowsPerTrigger, 
minRowsPerTrigger and maxTriggerDelayMs read limits") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
+          prepareTableAndGetLocation(0, WriteMode.APPEND_ONLY)
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "from-snapshot")
+          .option("scan.snapshot-id", 1)
+          .option("read.stream.maxRowsPerTrigger", "6")
+          .option("read.stream.minRowsPerTrigger", "4")
+          .option("read.stream.maxTriggerDelayMs", "5000")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          spark.sql("INSERT INTO T VALUES (10, 'v_10'), (11, 'v_11'), (12, 
'v_12')")
+          query.processAllAvailable()
+          // The first batch will always be triggered when use ReadMinRows 
read limit.
+          Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
+
+          spark.sql("INSERT INTO T VALUES (20, 'v_20'), (21, 'v_21'), (22, 
'v_22')")
+          query.processAllAvailable()
+          Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertEquals(3L, query.lastProgress.numInputRows)
+
+          spark.sql(
+            """
+              |INSERT INTO T VALUES (30, 'v_30'), (31, 'v_31'), (32, 'v_32'), 
(33, 'v_33'),
+              | (34, 'v_34'), (35, 'v_35'), (36, 'v_36'), (37, 'v_37'), (38, 
'v_38'), (39, 'v_39')
+              | """.stripMargin)
+          query.processAllAvailable()
+          // Since the limits of minRowsPerTrigger and maxRowsPerTrigger, not 
all data can be consumed at this batch.
+          Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertTrue(query.recentProgress.map(_.numInputRows).sum < 
16)
+
+          Thread.sleep(5000)
+          // the rest rows can trigger a batch. Then all the data are consumed.
+          Assertions.assertEquals(3, query.recentProgress.count(_.numInputRows 
!= 0))
+          Assertions.assertEquals(16L, 
query.recentProgress.map(_.numInputRows).sum)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
   test("Paimon Source: Trigger ProcessingTime 5s") {
     withTempDir {
       checkpointDir =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         val query = spark.readStream
@@ -365,7 +609,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: with error options") {
     withTempDir {
       checkpointDir =>
-        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+        val TableSnapshotState(_, location, snapshotData, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         assertThrows[IllegalArgumentException] {
@@ -388,7 +632,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source: not supports compacted-full scan mode in streaming 
mode") {
     withTempDir {
       checkpointDir =>
-        val TableSnapshotState(_, location, _, _) =
+        val TableSnapshotState(_, location, _, _, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
 
         val query = spark.readStream
@@ -412,10 +656,10 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
   test("Paimon Source and Sink") {
     withTempDir {
       checkpointDir =>
-        val TableSnapshotState(_, location, _, latestChanges) =
+        val TableSnapshotState(_, location, _, latestChanges, _) =
           prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG, tableName = "T1")
 
-        val TableSnapshotState(_, targetLocation, _, _) =
+        val TableSnapshotState(_, targetLocation, _, _, _) =
           prepareTableAndGetLocation(0, WriteMode.APPEND_ONLY, tableName = 
"T2")
 
         val df = spark.readStream
@@ -466,7 +710,8 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
       table: String,
       location: String,
       snapshotData: Seq[Row],
-      latestChanges: Seq[Row]
+      latestChanges: Seq[Row],
+      snapshotToDataSplitNum: Map[Long, Int]
   )
 
   /** Create a paimon table, insert some data, return the location of this 
table. */
@@ -487,7 +732,8 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
          |CREATE TABLE $tableName (a INT, b STRING)
          |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='2', 'file.format'='parquet')
          |""".stripMargin)
-    val location = loadTable(tableName).location().getPath
+    val table = loadTable(tableName)
+    val location = table.location().getPath
 
     val mergedData = scala.collection.mutable.TreeMap.empty[Int, String]
     val unmergedData = scala.collection.mutable.ArrayBuffer.empty[(Int, 
String)]
@@ -517,9 +763,11 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
       }
     }
 
+    val snapshotToDataSplitNum = scala.collection.mutable.Map.empty[Long, Int]
+    val streamScan = table.newStreamScan()
     (1 to snapshotNum).foreach {
-      round =>
-        val startId = 10 * round
+      snapshotId =>
+        val startId = 10 * snapshotId
         val data = (startId to startId + 2).map {
           id =>
             val row = (id, s"v_$id")
@@ -528,10 +776,19 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
         }
         latestChanges = data.toArray
         data.toDF("a", 
"b").write.format("paimon").mode("append").save(location)
+
+        streamScan.restore(snapshotId)
+        val dataSplitNum = streamScan.plan().splits().size()
+        snapshotToDataSplitNum += snapshotId.toLong -> dataSplitNum
     }
 
     val snapshotState = currentTableSnapshotState
-    TableSnapshotState("T", location, snapshotState._1, snapshotState._2)
+    TableSnapshotState(
+      "T",
+      location,
+      snapshotState._1,
+      snapshotState._2,
+      snapshotToDataSplitNum.toMap)
   }
 
 }


Reply via email to