This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 08556fc00 [spark] Refactor needToScanCurrentSnapshot in 
PaimonSourceOffset (#2035)
08556fc00 is described below

commit 08556fc00021cbc19b1310f348fa8711346c16ba
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Sep 20 21:06:12 2023 +0800

    [spark] Refactor needToScanCurrentSnapshot in PaimonSourceOffset (#2035)
---
 .../paimon/spark/sources/PaimonMicroBatchStream.scala    |  2 +-
 .../apache/paimon/spark/sources/PaimonSourceOffset.scala | 16 +++++++++++++++-
 .../org/apache/paimon/spark/sources/StreamHelper.scala   | 15 ++++++++++-----
 .../scala/org/apache/paimon/spark/PaimonSourceTest.scala |  6 +++---
 4 files changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index ca730140e..f1f8f7e8c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -48,7 +48,7 @@ class PaimonMicroBatchStream(
     } else {
       false
     }
-    PaimonSourceOffset(initSnapshotId, -1L, scanSnapshot)
+    PaimonSourceOffset(initSnapshotId, PaimonSourceOffset.INIT_OFFSET_INDEX, 
scanSnapshot)
   }
 
   // the committed offset this is used to detect the validity of subsequent 
offsets
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
index 3946950fa..291982fe1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
@@ -22,6 +22,16 @@ import 
org.apache.paimon.table.source.snapshot.StartingContext
 
 import org.apache.spark.sql.connector.read.streaming.Offset
 
+/**
+ * The implementation of [[Offset]] for paimon spark streaming source.
+ *
+ * @param snapshotId
+ *   the current snapshot id
+ * @param index
+ *   the index position of file in the current snapshot, start from 0
+ * @param scanSnapshot
+ *   whether to scan all files in the current snapshot
+ */
 case class PaimonSourceOffset(snapshotId: Long, index: Long, scanSnapshot: 
Boolean)
   extends Offset
   with Comparable[PaimonSourceOffset] {
@@ -44,6 +54,9 @@ case class PaimonSourceOffset(snapshotId: Long, index: Long, 
scanSnapshot: Boole
 }
 
 object PaimonSourceOffset {
+  //  index of the init offset, for we filter offset by (startOffset, 
endOffset]
+  val INIT_OFFSET_INDEX: Long = -1L
+
   def apply(version: Long, index: Long, scanSnapshot: Boolean): 
PaimonSourceOffset = {
     new PaimonSourceOffset(
       version,
@@ -56,7 +69,8 @@ object PaimonSourceOffset {
     offset match {
       case o: PaimonSourceOffset => o
       case json: String => JsonUtils.fromJson[PaimonSourceOffset](json)
-      case sc: StartingContext => PaimonSourceOffset(sc.getSnapshotId, -1, 
sc.getScanFullSnapshot)
+      case sc: StartingContext =>
+        PaimonSourceOffset(sc.getSnapshotId, INIT_OFFSET_INDEX, 
sc.getScanFullSnapshot)
       case _ => throw new IllegalArgumentException(s"Can't parse $offset to 
PaimonSourceOffset.")
     }
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 1ef678d83..13afc81a4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StructType
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit, 
isLast: Boolean)
+case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)
 
 trait StreamHelper extends WithFileStoreTable {
 
@@ -61,7 +61,13 @@ trait StreamHelper extends WithFileStoreTable {
       limit: ReadLimit): Option[PaimonSourceOffset] = {
     val indexedDataSplits = getBatch(startOffset, endOffset, Some(limit))
     indexedDataSplits.lastOption
-      .map(ids => PaimonSourceOffset(ids.snapshotId, ids.index, scanSnapshot = 
false))
+      .map(
+        ids =>
+          PaimonSourceOffset(
+            ids.snapshotId,
+            ids.index,
+            scanSnapshot =
+              startOffset.scanSnapshot && 
ids.snapshotId.equals(startOffset.snapshotId)))
   }
 
   def getBatch(
@@ -69,7 +75,7 @@ trait StreamHelper extends WithFileStoreTable {
       endOffset: Option[PaimonSourceOffset],
       limit: Option[ReadLimit]): Array[IndexedDataSplit] = {
     if (startOffset != null) {
-      streamScan.restore(startOffset.snapshotId, 
needToScanCurrentSnapshot(startOffset.snapshotId))
+      streamScan.restore(startOffset.snapshotId, startOffset.scanSnapshot)
     }
 
     val readLimitGuard = limit.flatMap(PaimonReadLimits(_, lastTriggerMillis))
@@ -111,14 +117,13 @@ trait StreamHelper extends WithFileStoreTable {
     val dataSplits =
       plan.splits().asScala.collect { case dataSplit: DataSplit => dataSplit 
}.toArray
     val snapshotId = dataSplits.head.snapshotId()
-    val length = dataSplits.length
 
     dataSplits
       .sortWith((ds1, ds2) => compareByPartitionAndBucket(ds1, ds2) < 0)
       .zipWithIndex
       .map {
         case (split, idx) =>
-          IndexedDataSplit(snapshotId, idx, split, idx == length - 1)
+          IndexedDataSplit(snapshotId, idx, split)
       }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index bef1c4b60..45ae4b7c5 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -444,7 +444,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
           // Only 9 rows in this table. Only one batch can consume all the 
data.
           Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
           Assertions.assertEquals(
-            PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(3L, 1L, scanSnapshot = true),
             PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
           checkAnswer(currentResult(), snapshotData)
         } finally {
@@ -499,7 +499,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
           // So there latest committed offset is not changed.
           Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows 
!= 0))
           Assertions.assertEquals(
-            PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+            PaimonSourceOffset(3L, 1L, scanSnapshot = true),
             PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
           checkAnswer(currentResult(), totalStreamingData)
 
@@ -563,7 +563,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
           Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows 
!= 0))
           Assertions.assertTrue(query.recentProgress.map(_.numInputRows).sum < 
16)
 
-          Thread.sleep(5000)
+          Thread.sleep(6000)
           // the rest rows can trigger a batch. Then all the data are consumed.
           Assertions.assertEquals(3, query.recentProgress.count(_.numInputRows 
!= 0))
           Assertions.assertEquals(16L, 
query.recentProgress.map(_.numInputRows).sum)

Reply via email to