This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 08556fc00 [spark] Refactor needToScanCurrentSnapshot in
PaimonSourceOffset (#2035)
08556fc00 is described below
commit 08556fc00021cbc19b1310f348fa8711346c16ba
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Sep 20 21:06:12 2023 +0800
[spark] Refactor needToScanCurrentSnapshot in PaimonSourceOffset (#2035)
---
.../paimon/spark/sources/PaimonMicroBatchStream.scala | 2 +-
.../apache/paimon/spark/sources/PaimonSourceOffset.scala | 16 +++++++++++++++-
.../org/apache/paimon/spark/sources/StreamHelper.scala | 15 ++++++++++-----
.../scala/org/apache/paimon/spark/PaimonSourceTest.scala | 6 +++---
4 files changed, 29 insertions(+), 10 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index ca730140e..f1f8f7e8c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -48,7 +48,7 @@ class PaimonMicroBatchStream(
} else {
false
}
- PaimonSourceOffset(initSnapshotId, -1L, scanSnapshot)
+ PaimonSourceOffset(initSnapshotId, PaimonSourceOffset.INIT_OFFSET_INDEX,
scanSnapshot)
}
// the committed offset this is used to detect the validity of subsequent
offsets
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
index 3946950fa..291982fe1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
@@ -22,6 +22,16 @@ import
org.apache.paimon.table.source.snapshot.StartingContext
import org.apache.spark.sql.connector.read.streaming.Offset
+/**
+ * The implementation of [[Offset]] for paimon spark streaming source.
+ *
+ * @param snapshotId
+ * the current snapshot id
+ * @param index
+ * the index position of file in the current snapshot, start from 0
+ * @param scanSnapshot
+ * whether to scan all files in the current snapshot
+ */
case class PaimonSourceOffset(snapshotId: Long, index: Long, scanSnapshot:
Boolean)
extends Offset
with Comparable[PaimonSourceOffset] {
@@ -44,6 +54,9 @@ case class PaimonSourceOffset(snapshotId: Long, index: Long,
scanSnapshot: Boole
}
object PaimonSourceOffset {
+ // index of the init offset, for we filter offset by (startOffset,
endOffset]
+ val INIT_OFFSET_INDEX: Long = -1L
+
def apply(version: Long, index: Long, scanSnapshot: Boolean):
PaimonSourceOffset = {
new PaimonSourceOffset(
version,
@@ -56,7 +69,8 @@ object PaimonSourceOffset {
offset match {
case o: PaimonSourceOffset => o
case json: String => JsonUtils.fromJson[PaimonSourceOffset](json)
- case sc: StartingContext => PaimonSourceOffset(sc.getSnapshotId, -1,
sc.getScanFullSnapshot)
+ case sc: StartingContext =>
+ PaimonSourceOffset(sc.getSnapshotId, INIT_OFFSET_INDEX,
sc.getScanFullSnapshot)
case _ => throw new IllegalArgumentException(s"Can't parse $offset to
PaimonSourceOffset.")
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 1ef678d83..13afc81a4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
import scala.collection.mutable
-case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit,
isLast: Boolean)
+case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)
trait StreamHelper extends WithFileStoreTable {
@@ -61,7 +61,13 @@ trait StreamHelper extends WithFileStoreTable {
limit: ReadLimit): Option[PaimonSourceOffset] = {
val indexedDataSplits = getBatch(startOffset, endOffset, Some(limit))
indexedDataSplits.lastOption
- .map(ids => PaimonSourceOffset(ids.snapshotId, ids.index, scanSnapshot =
false))
+ .map(
+ ids =>
+ PaimonSourceOffset(
+ ids.snapshotId,
+ ids.index,
+ scanSnapshot =
+ startOffset.scanSnapshot &&
ids.snapshotId.equals(startOffset.snapshotId)))
}
def getBatch(
@@ -69,7 +75,7 @@ trait StreamHelper extends WithFileStoreTable {
endOffset: Option[PaimonSourceOffset],
limit: Option[ReadLimit]): Array[IndexedDataSplit] = {
if (startOffset != null) {
- streamScan.restore(startOffset.snapshotId,
needToScanCurrentSnapshot(startOffset.snapshotId))
+ streamScan.restore(startOffset.snapshotId, startOffset.scanSnapshot)
}
val readLimitGuard = limit.flatMap(PaimonReadLimits(_, lastTriggerMillis))
@@ -111,14 +117,13 @@ trait StreamHelper extends WithFileStoreTable {
val dataSplits =
plan.splits().asScala.collect { case dataSplit: DataSplit => dataSplit
}.toArray
val snapshotId = dataSplits.head.snapshotId()
- val length = dataSplits.length
dataSplits
.sortWith((ds1, ds2) => compareByPartitionAndBucket(ds1, ds2) < 0)
.zipWithIndex
.map {
case (split, idx) =>
- IndexedDataSplit(snapshotId, idx, split, idx == length - 1)
+ IndexedDataSplit(snapshotId, idx, split)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index bef1c4b60..45ae4b7c5 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -444,7 +444,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
// Only 9 rows in this table. Only one batch can consume all the
data.
Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
Assertions.assertEquals(
- PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(3L, 1L, scanSnapshot = true),
PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
checkAnswer(currentResult(), snapshotData)
} finally {
@@ -499,7 +499,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
// So there latest committed offset is not changed.
Assertions.assertEquals(1, query.recentProgress.count(_.numInputRows
!= 0))
Assertions.assertEquals(
- PaimonSourceOffset(3L, 1L, scanSnapshot = false),
+ PaimonSourceOffset(3L, 1L, scanSnapshot = true),
PaimonSourceOffset(query.lastProgress.sources(0).endOffset))
checkAnswer(currentResult(), totalStreamingData)
@@ -563,7 +563,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
Assertions.assertEquals(2, query.recentProgress.count(_.numInputRows
!= 0))
Assertions.assertTrue(query.recentProgress.map(_.numInputRows).sum <
16)
- Thread.sleep(5000)
+ Thread.sleep(6000)
// the rest rows can trigger a batch. Then all the data are consumed.
Assertions.assertEquals(3, query.recentProgress.count(_.numInputRows
!= 0))
Assertions.assertEquals(16L,
query.recentProgress.map(_.numInputRows).sum)