This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8801b342d [spark] Supports Spark Streaming Source (#1842)
8801b342d is described below
commit 8801b342d1432298bea82bb892c19c9685f30633
Author: Yann Byron <[email protected]>
AuthorDate: Thu Sep 7 20:13:03 2023 +0800
[spark] Supports Spark Streaming Source (#1842)
---
.../java/org/apache/paimon/options/Options.java | 4 +
.../table/source/InnerStreamTableScanImpl.java | 12 +-
.../paimon/table/source/StreamTableScan.java | 2 +-
.../ContinuousFromTimestampStartingScanner.java | 9 -
.../snapshot/ContinuousLatestStartingScanner.java | 9 -
.../table/source/snapshot/StartingContext.java | 2 +-
.../apache/paimon/table/system/AuditLogTable.java | 4 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 2 +-
.../java/org/apache/paimon/spark/SparkScan.java | 13 +-
.../org/apache/paimon/spark/SparkScanBuilder.java | 1 +
.../java/org/apache/paimon/spark/SparkTable.java | 1 +
.../paimon/spark/PaimonPartitionManagement.scala | 12 +-
.../org/apache/paimon/spark/SparkSource.scala | 6 +-
.../paimon/spark/commands/PaimonCommand.scala | 4 +-
.../paimon/spark/commands/SchemaHelper.scala | 4 +-
...SchemaHelper.scala => WithFileStoreTable.scala} | 28 +-
.../spark/sources/PaimonMicroBatchStream.scala | 81 ++++
.../paimon/spark/sources/PaimonSourceOffset.scala | 63 +++
.../apache/paimon/spark/sources/StreamHelper.scala | 146 ++++++
.../org/apache/paimon/spark/util/JsonUtils.scala | 41 ++
.../org/apache/paimon/spark/PaimonSourceTest.scala | 537 +++++++++++++++++++++
.../apache/paimon/spark/PaimonSparkTestBase.scala | 4 +
22 files changed, 917 insertions(+), 68 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index a6b82fd37..ee8a249df 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -152,6 +152,10 @@ public class Options implements Serializable {
return new Options(newData);
}
+ public synchronized void remove(String key) {
+ data.remove(key);
+ }
+
public synchronized boolean containsKey(String key) {
return data.containsKey(key);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 4ff0b3be0..0968ac361 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
+import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -238,9 +239,14 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
}
@Override
- public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
- restore(nextSnapshotId);
- snapshotReader.withMode(scanMode);
+ public void restore(@Nullable Long nextSnapshotId, boolean
scanAllSnapshot) {
+ if (nextSnapshotId != null && scanAllSnapshot) {
+ startingScanner =
+ new StaticFromSnapshotStartingScanner(snapshotManager,
nextSnapshotId);
+ restore(null);
+ } else {
+ restore(nextSnapshotId);
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 5c8932059..2df743428 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -44,7 +44,7 @@ public interface StreamTableScan extends TableScan,
Restorable<Long> {
void restore(@Nullable Long nextSnapshotId);
/** Restore from checkpoint next snapshot id with scan kind. */
- void restore(@Nullable Long nextSnapshotId, ScanMode scanMode);
+ void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
/** Checkpoint to return next snapshot id. */
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 6113773ff..7fc8cd6aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -42,15 +42,6 @@ public class ContinuousFromTimestampStartingScanner extends
AbstractStartingScan
this.startingSnapshotId =
this.snapshotManager.earlierThanTimeMills(this.startupMillis);
}
- @Override
- public StartingContext startingContext() {
- if (startingSnapshotId == null) {
- return StartingContext.EMPTY;
- } else {
- return new StartingContext(startingSnapshotId + 1, false);
- }
- }
-
@Override
public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId =
snapshotManager.earlierThanTimeMills(startupMillis);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index 423181b37..c1a6054ae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -38,15 +38,6 @@ public class ContinuousLatestStartingScanner extends
AbstractStartingScanner {
this.startingSnapshotId = snapshotManager.latestSnapshotId();
}
- @Override
- public StartingContext startingContext() {
- if (startingSnapshotId == null) {
- return StartingContext.EMPTY;
- } else {
- return new StartingContext(startingSnapshotId + 1, false);
- }
- }
-
@Override
public Result scan(SnapshotReader snapshotReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
index 9204f4d47..dcafed348 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
@@ -43,5 +43,5 @@ public class StartingContext {
return this.scanFullSnapshot;
}
- public static final StartingContext EMPTY = new StartingContext(0L, false);
+ public static final StartingContext EMPTY = new StartingContext(1L, false);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 28bba8901..cd2390e6a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -335,8 +335,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
- streamScan.restore(nextSnapshotId, scanMode);
+ public void restore(@Nullable Long nextSnapshotId, boolean
scanAllSnapshot) {
+ streamScan.restore(nextSnapshotId, scanAllSnapshot);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index adf88ffe6..0a02ac512 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -867,7 +867,7 @@ public class ContinuousFileSplitEnumeratorTest {
public void restore(Long state) {}
@Override
- public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode)
{}
+ public void restore(@Nullable Long nextSnapshotId, boolean
scanAllSnapshot) {}
public void allowEnd(boolean allowEnd) {
this.allowEnd = allowEnd;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index 1c5c7b50a..e277834b5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -18,6 +18,9 @@
package org.apache.paimon.spark;
+import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -27,6 +30,7 @@ import
org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import java.util.List;
@@ -39,11 +43,13 @@ import java.util.OptionalLong;
*/
public class SparkScan implements Scan, SupportsReportStatistics {
+ private final Table table;
private final ReadBuilder readBuilder;
private List<Split> splits;
- public SparkScan(ReadBuilder readBuilder) {
+ public SparkScan(Table table, ReadBuilder readBuilder) {
+ this.table = table;
this.readBuilder = readBuilder;
}
@@ -75,6 +81,11 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
};
}
+ @Override
+ public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
+ return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder,
checkpointLocation);
+ }
+
protected List<Split> splits() {
if (splits == null) {
splits = readBuilder.newScan().plan().splits();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
index 8fe3868b3..ec7367ed2 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
@@ -81,6 +81,7 @@ public class SparkScanBuilder
@Override
public Scan build() {
return new SparkScan(
+ table,
table.newReadBuilder().withFilter(predicates).withProjection(projectedFields));
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 77a5909a4..674b4fbc2 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -89,6 +89,7 @@ public class SparkTable
capabilities.add(TableCapability.V1_BATCH_WRITE);
capabilities.add(TableCapability.OVERWRITE_BY_FILTER);
capabilities.add(TableCapability.OVERWRITE_DYNAMIC);
+ capabilities.add(TableCapability.MICRO_BATCH_READ);
return capabilities;
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 16bd91145..d4461ff1c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -17,11 +17,9 @@
*/
package org.apache.paimon.spark
-import org.apache.paimon.data.BinaryRow
import org.apache.paimon.operation.FileStoreCommit
-import org.apache.paimon.table.{AbstractFileStoreTable, Table}
+import org.apache.paimon.table.AbstractFileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
-import org.apache.paimon.table.source.TableScan
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.{FileStorePathFactory, RowDataPartitionComputer}
@@ -30,8 +28,8 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters,
InternalRow}
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types.StructType
-import java.util
import java.util.{Collections, UUID}
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
@@ -75,11 +73,11 @@ trait PaimonPartitionManagement extends
SupportsPartitionManagement {
override def replacePartitionMetadata(
ident: InternalRow,
- properties: util.Map[String, String]): Unit = {
+ properties: JMap[String, String]): Unit = {
throw new UnsupportedOperationException("Replace partition is not
supported")
}
- override def loadPartitionMetadata(ident: InternalRow): util.Map[String,
String] = {
+ override def loadPartitionMetadata(ident: InternalRow): JMap[String, String]
= {
throw new UnsupportedOperationException("Load partition is not supported")
}
@@ -123,7 +121,7 @@ trait PaimonPartitionManagement extends
SupportsPartitionManagement {
.toArray
}
- override def createPartition(ident: InternalRow, properties:
util.Map[String, String]): Unit = {
+ override def createPartition(ident: InternalRow, properties: JMap[String,
String]): Unit = {
throw new UnsupportedOperationException("Create partition is not
supported")
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index d9675ca64..9495cb0fe 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import java.util
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
@@ -61,7 +61,7 @@ class SparkSource
override def getTable(
schema: StructType,
partitioning: Array[Transform],
- properties: util.Map[String, String]): Table = {
+ properties: JMap[String, String]): Table = {
new SparkTable(loadTable(properties))
}
@@ -76,7 +76,7 @@ class SparkSource
SparkSource.toBaseRelation(table, sqlContext)
}
- private def loadTable(options: util.Map[String, String]): FileStoreTable = {
+ private def loadTable(options: JMap[String, String]): FileStoreTable = {
val catalogContext = CatalogContext.create(
Options.fromMap(options),
SparkSession.active.sessionState.newHadoopConf())
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 9738c5c83..6e88ec6c8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -27,12 +27,10 @@ import org.apache.spark.sql.sources.{AlwaysTrue, And,
EqualNullSafe, Filter}
import java.io.IOException
/** Helper trait for all paimon commands. */
-trait PaimonCommand {
+trait PaimonCommand extends WithFileStoreTable {
val BUCKET_COL = "_bucket_"
- def table: FileStoreTable
-
lazy val bucketMode: BucketMode = table match {
case fileStoreTable: FileStoreTable =>
fileStoreTable.bucketMode
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index 8a8415d64..1fd2f2c58 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -26,13 +26,13 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
-trait SchemaHelper {
+trait SchemaHelper extends WithFileStoreTable {
val originTable: FileStoreTable
protected var newTable: Option[FileStoreTable] = None
- def table: FileStoreTable = newTable.getOrElse(originTable)
+ override def table: FileStoreTable = newTable.getOrElse(originTable)
def tableSchema: TableSchema = table.schema
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
similarity index 50%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
index 8a8415d64..03ca0e3d1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
@@ -17,34 +17,10 @@
*/
package org.apache.paimon.spark.commands
-import org.apache.paimon.schema.{SchemaMergingUtils, TableSchema}
-import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.types.RowType
-import org.apache.spark.sql.types.StructType
+private[spark] trait WithFileStoreTable {
-import scala.collection.JavaConverters._
-
-trait SchemaHelper {
-
- val originTable: FileStoreTable
-
- protected var newTable: Option[FileStoreTable] = None
-
- def table: FileStoreTable = newTable.getOrElse(originTable)
-
- def tableSchema: TableSchema = table.schema
-
- def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast:
Boolean): Unit = {
- val dataRowType =
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
- if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
- newTable = Some(table.copyWithLatestSchema())
- }
- }
-
- def updateTableWithOptions(options: Map[String, String]): Unit = {
- newTable = Some(table.copy(options.asJava))
- }
+ def table: FileStoreTable
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
new file mode 100644
index 000000000..e8b573c2e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.spark.{SparkInputPartition, SparkReaderFactory}
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.source.ReadBuilder
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+
+class PaimonMicroBatchStream(
+ originTable: FileStoreTable,
+ readBuilder: ReadBuilder,
+ checkpointLocation: String)
+ extends MicroBatchStream
+ with StreamHelper
+ with Logging {
+
+ private var committedOffset: Option[PaimonSourceOffset] = None
+
+ lazy val initOffset: PaimonSourceOffset =
PaimonSourceOffset(getStartingContext)
+
+ override def latestOffset(): Offset = {
+ getLatestOffset
+ }
+
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ val startOffset = {
+ val startOffset0 = PaimonSourceOffset.apply(start)
+ if (startOffset0.compareTo(initOffset) < 0) {
+ initOffset
+ } else {
+ startOffset0
+ }
+ }
+ val endOffset = PaimonSourceOffset.apply(end)
+
+ getBatch(startOffset, endOffset)
+ .map(ids => new SparkInputPartition(ids.entry))
+ .toArray[InputPartition]
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ new SparkReaderFactory(readBuilder)
+ }
+
+ override def initialOffset(): Offset = {
+ initOffset
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ PaimonSourceOffset.apply(json)
+ }
+
+ override def commit(end: Offset): Unit = {
+ committedOffset = Some(PaimonSourceOffset.apply(end))
+ logInfo(s"$committedOffset is committed.")
+ }
+
+ override def stop(): Unit = {}
+
+ override def table: FileStoreTable = originTable
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
new file mode 100644
index 000000000..d64fe80f6
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.spark.util.JsonUtils
+import org.apache.paimon.table.source.snapshot.StartingContext
+
+import org.apache.spark.sql.connector.read.streaming.Offset
+
+case class PaimonSourceOffset(snapshotId: Long, index: Long, scanSnapshot:
Boolean)
+ extends Offset
+ with Comparable[PaimonSourceOffset] {
+
+ override def json(): String = {
+ JsonUtils.toJson(this)
+ }
+
+ override def compareTo(o: PaimonSourceOffset): Int = {
+ o match {
+ case PaimonSourceOffset(snapshotId, index, scanSnapshot) =>
+ val diff = this.snapshotId.compare(snapshotId)
+ if (diff == 0) {
+ this.index.compare(index)
+ } else {
+ diff
+ }
+ }
+ }
+}
+
+object PaimonSourceOffset {
+ def apply(version: Long, index: Long, scanSnapshot: Boolean):
PaimonSourceOffset = {
+ new PaimonSourceOffset(
+ version,
+ index,
+ scanSnapshot
+ )
+ }
+
+ def apply(offset: Any): PaimonSourceOffset = {
+ offset match {
+ case o: PaimonSourceOffset => o
+ case json: String => JsonUtils.fromJson[PaimonSourceOffset](json)
+ case sc: StartingContext => PaimonSourceOffset(sc.getSnapshotId, -1,
sc.getScanFullSnapshot)
+ case _ => throw new IllegalArgumentException(s"Can't parse $offset to
PaimonSourceOffset.")
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
new file mode 100644
index 000000000..0994bdc59
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.spark.commands.WithFileStoreTable
+import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan,
ScanMode}
+import org.apache.paimon.table.source.TableScan.Plan
+import org.apache.paimon.table.source.snapshot.StartingContext
+import org.apache.paimon.utils.RowDataPartitionComputer
+
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit,
isLast: Boolean)
+
+trait StreamHelper extends WithFileStoreTable {
+
+ val initOffset: PaimonSourceOffset
+
+ private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
+
+ private lazy val partitionSchema: StructType =
+ SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
+
+ private lazy val partitionComputer: RowDataPartitionComputer = new
RowDataPartitionComputer(
+ new CoreOptions(table.schema.options).partitionDefaultName,
+ table.schema.logicalPartitionType,
+ table.schema.partitionKeys.asScala.toArray
+ )
+
+ // Used to get the initial offset.
+ def getStartingContext: StartingContext = streamScan.startingContext()
+
+ def getLatestOffset: PaimonSourceOffset = {
+ val latestSnapshotId = table.snapshotManager().latestSnapshotId()
+ val plan = if (needToScanCurrentSnapshot(latestSnapshotId)) {
+ table
+ .newSnapshotReader()
+ .withSnapshot(latestSnapshotId)
+ .withMode(ScanMode.ALL)
+ .read()
+ } else {
+ table
+ .newSnapshotReader()
+ .withSnapshot(latestSnapshotId)
+ .withMode(ScanMode.DELTA)
+ .read()
+ }
+ val indexedDataSplits = convertPlanToIndexedSplits(plan)
+ indexedDataSplits.lastOption
+ .map(ids => PaimonSourceOffset(ids.snapshotId, ids.index, scanSnapshot =
false))
+ .orNull
+ }
+
+ def getBatch(
+ startOffset: PaimonSourceOffset,
+ endOffset: PaimonSourceOffset): Array[IndexedDataSplit] = {
+ val indexedDataSplits = mutable.ArrayBuffer.empty[IndexedDataSplit]
+ if (startOffset != null) {
+ streamScan.restore(startOffset.snapshotId,
needToScanCurrentSnapshot(startOffset.snapshotId))
+ }
+ var hasSplits = true
+ while (hasSplits && streamScan.checkpoint() <= endOffset.snapshotId) {
+ val plan = streamScan.plan()
+ if (plan.splits.isEmpty) {
+ hasSplits = false
+ } else {
+ indexedDataSplits ++= convertPlanToIndexedSplits(plan)
+ }
+ }
+ indexedDataSplits.filter(ids => inRange(ids, startOffset,
endOffset)).toArray
+ }
+
+ private def needToScanCurrentSnapshot(snapshotId: Long): Boolean = {
+ snapshotId == initOffset.snapshotId && initOffset.scanSnapshot
+ }
+
+ /** Sort the [[DataSplit]] list and index them. */
+ private def convertPlanToIndexedSplits(plan: Plan): Array[IndexedDataSplit]
= {
+ val dataSplits =
+ plan.splits().asScala.collect { case dataSplit: DataSplit => dataSplit
}.toArray
+ val snapshotId = dataSplits.head.snapshotId()
+ val length = dataSplits.length
+
+ dataSplits
+ .sortWith((ds1, ds2) => compareByPartitionAndBucket(ds1, ds2) < 0)
+ .zipWithIndex
+ .map {
+ case (split, idx) =>
+ IndexedDataSplit(snapshotId, idx, split, idx == length - 1)
+ }
+ }
+
+ private def compareByPartitionAndBucket(dataSplit1: DataSplit, dataSplit2:
DataSplit): Int = {
+ val res = compareBinaryRow(dataSplit1.partition, dataSplit2.partition)
+ if (res == 0) {
+ dataSplit1.bucket - dataSplit2.bucket
+ } else {
+ res
+ }
+ }
+
+ private def compareBinaryRow(row1: BinaryRow, ror2: BinaryRow): Int = {
+ val partitionPath1 = PartitioningUtils.getPathFragment(
+ partitionComputer.generatePartValues(row1).asScala.toMap,
+ partitionSchema)
+ val partitionPath2 = PartitioningUtils.getPathFragment(
+ partitionComputer.generatePartValues(ror2).asScala.toMap,
+ partitionSchema)
+ partitionPath1.compareTo(partitionPath2)
+ }
+
+ private def inRange(
+ indexedDataSplit: IndexedDataSplit,
+ start: PaimonSourceOffset,
+ end: PaimonSourceOffset): Boolean = {
+ val startRange = indexedDataSplit.snapshotId > start.snapshotId ||
+ (indexedDataSplit.snapshotId == start.snapshotId &&
indexedDataSplit.index > start.index)
+ val endRange = indexedDataSplit.snapshotId < end.snapshotId ||
+ (indexedDataSplit.snapshotId == end.snapshotId && indexedDataSplit.index
<= end.index)
+
+ startRange && endRange
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/JsonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/JsonUtils.scala
new file mode 100644
index 000000000..bb2a8b741
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/JsonUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.util
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule,
ScalaObjectMapper}
+
+object JsonUtils {
+
+ lazy val mapper = {
+ val _mapper = new ObjectMapper with ScalaObjectMapper {}
+ _mapper.setSerializationInclusion(Include.NON_ABSENT)
+ _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ _mapper.registerModule(DefaultScalaModule)
+ _mapper
+ }
+
+ def toJson[T: Manifest](obj: T): String = {
+ mapper.writeValueAsString(obj)
+ }
+
+ def fromJson[T: Manifest](json: String): T = {
+ mapper.readValue[T](json)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
new file mode 100644
index 000000000..b99ed592b
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.WriteMode
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.junit.jupiter.api.Assertions
+
+import java.util.concurrent.TimeUnit
+
+class PaimonSourceTest extends PaimonSparkTestBase with StreamTest {
+
+ import testImplicits._
+
+ test("Paimon Source: default scan mode") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query = spark.readStream
+ .format("paimon")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ query.processAllAvailable()
+ var totalStreamingData = snapshotData
+ // in the default mode without any related configs, only data
written in the last time will be read.
+ checkAnswer(currentResult(), totalStreamingData)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query.processAllAvailable()
+ totalStreamingData ++= (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult(), totalStreamingData)
+
+ spark.sql("INSERT INTO T VALUES (50, 'v_50'), (51, 'v_51'), (52,
'v_52')")
+ query.processAllAvailable()
+ totalStreamingData ++= (Row(50, "v_50") :: Row(51, "v_51") ::
Row(52, "v_52") :: Nil)
+ checkAnswer(currentResult(), totalStreamingData)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: default and from-snapshot scan mode with
scan.snapshot-id") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ // set scan.snapshot-id = 3, this query can read the latest changes.
+ val query1 = spark.readStream
+ .format("paimon")
+ .option("scan.snapshot-id", 3)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .queryName("mem_table1")
+ .outputMode("append")
+ .start()
+
+ // set scan.snapshot-id = 4, this query will read data from the next
commit.
+ val query2 = spark.readStream
+ .format("paimon")
+ .option("scan.snapshot-id", 4)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table2")
+ .outputMode("append")
+ .start()
+
+ val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+ val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+ try {
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ var totalStreamingData1 = latestChanges
+ var totalStreamingData2 = Seq.empty[Row]
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ totalStreamingData1 =
+ totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ totalStreamingData2 =
+ totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+ } finally {
+ query1.stop()
+ query2.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: default and from-timestamp scan mode with
scan.timestamp-millis") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ // timestamp that is before this table is created and data is written.
+ val ts1 = System.currentTimeMillis()
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+ // timestamp that is after this table is created and data is written.
+ val ts2 = System.currentTimeMillis()
+
+ val query1 = spark.readStream
+ .format("paimon")
+ .option("scan.timestamp-millis", ts1)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .queryName("mem_table1")
+ .outputMode("append")
+ .start()
+
+ val query2 = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "from-timestamp")
+ .option("scan.timestamp-millis", ts2)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table2")
+ .outputMode("append")
+ .start()
+
+ val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+ val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+ try {
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ checkAnswer(currentResult1(), snapshotData)
+ checkAnswer(currentResult2(), Seq.empty)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ val totalStreamingData1 =
+ snapshotData ++ (Row(40, "v_40") :: Row(41, "v_41") :: Row(42,
"v_42") :: Nil)
+ val totalStreamingData2 = Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+ } finally {
+ query1.stop()
+ query2.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: latest and latest-full scan mode") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query1 = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "latest-full")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .queryName("mem_table1")
+ .outputMode("append")
+ .start()
+
+ val query2 = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "latest")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table2")
+ .outputMode("append")
+ .start()
+
+ val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+ val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+ try {
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ // query1 uses the latest-full mode, which will scan the whole
snapshot, not just the changes.
+ var totalStreamingData1 = snapshotData
+ // query2 uses the latest mode, which will only scan the changes.
+ var totalStreamingData2 = latestChanges
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ totalStreamingData1 =
+ totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ totalStreamingData2 =
+ totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+ } finally {
+ query1.stop()
+ query2.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: from-snapshot and from-snapshot-full scan mode") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query1 = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "from-snapshot-full")
+ .option("scan.snapshot-id", 3)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .queryName("mem_table1")
+ .outputMode("append")
+ .start()
+
+ val query2 = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 3)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table2")
+ .outputMode("append")
+ .start()
+
+ val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+ val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+ try {
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ // query1 uses the from-snapshot-full mode, which will scan the
whole snapshot, not just the changes.
+ var totalStreamingData1 = snapshotData
+ // query2 uses the from-snapshot mode, which will only scan the
changes.
+ var totalStreamingData2 = latestChanges
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query1.processAllAvailable()
+ query2.processAllAvailable()
+ totalStreamingData1 =
+ totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ totalStreamingData2 =
+ totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult1(), totalStreamingData1)
+ checkAnswer(currentResult2(), totalStreamingData2)
+ } finally {
+ query1.stop()
+ query2.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: Trigger AvailableNow") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "latest")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .trigger(Trigger.AvailableNow())
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ Assertions.assertTrue(query.isActive)
+ query.processAllAvailable()
+ checkAnswer(currentResult(), latestChanges)
+
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ Assertions.assertFalse(query.isActive)
+ query.processAllAvailable()
+ // The query has been stopped after all available data at the start
of the query have been read.
+ // So no more data will be read.
+ checkAnswer(currentResult(), latestChanges)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: Trigger ProcessingTime 5s") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "latest")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ spark.sql("INSERT INTO T VALUES (50, 'v_50'), (51, 'v_51'), (52,
'v_52')")
+ var totalStreamingData = latestChanges ++ (Row(40, "v_40") ::
Row(41, "v_41") :: Row(
+ 42,
+ "v_42") :: Row(50, "v_50") :: Row(51, "v_51") :: Row(52, "v_52")
:: Nil)
+ Thread.sleep(6 * 1000)
+ checkAnswer(currentResult(), totalStreamingData)
+ } finally {
+ query.stop()
+ }
+ }
+ }
+
+ test("Paimon Source: with error options") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, snapshotData, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ assertThrows[IllegalArgumentException] {
+ spark.readStream
+ .format("paimon")
+ .option("scan.snapshot-id", 3)
+ .option("scan.timestamp-millis", System.currentTimeMillis())
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
+ .start()
+ }
+ }
+ }
+
+ test("Paimon Source: not supports compacted-full scan mode in streaming
mode") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, _, _) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+ val query = spark.readStream
+ .format("paimon")
+ .option("scan.mode", "incremental")
+ .option("incremental-between", "3,5")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ assert(intercept[Exception] {
+ query.processAllAvailable()
+ }.getMessage.contains("Cannot read incremental in streaming mode"))
+ }
+ }
+
+ test("Paimon Source and Sink") {
+ withTempDir {
+ checkpointDir =>
+ val TableSnapshotState(_, location, _, latestChanges) =
+ prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG, tableName = "T1")
+
+ val TableSnapshotState(_, targetLocation, _, _) =
+ prepareTableAndGetLocation(0, WriteMode.APPEND_ONLY, tableName =
"T2")
+
+ val df = spark.readStream
+ .format("paimon")
+ .option("scan.snapshot-id", "3")
+ .load(location)
+ .writeStream
+ .format("paimon")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+
+ val currentResult = () => spark.sql("SELECT * FROM T2")
+ var totalStreamingData = Seq.empty[Row]
+
+ val query1 = df.start(targetLocation)
+ try {
+ query1.processAllAvailable()
+ totalStreamingData = latestChanges
+ checkAnswer(currentResult(), totalStreamingData)
+
+ spark.sql("INSERT INTO T1 VALUES (40, 'v_40'), (41, 'v_41'), (42,
'v_42')")
+ query1.processAllAvailable()
+ totalStreamingData =
+ totalStreamingData ++ (Row(40, "v_40") :: Row(41, "v_41") ::
Row(42, "v_42") :: Nil)
+ checkAnswer(currentResult(), totalStreamingData)
+ } finally {
+ query1.stop()
+ }
+
+ // scan.snapshot-id should be ignored when restarting from a checkpoint
+ val query2 = df.start(targetLocation)
+ try {
+ query2.processAllAvailable()
+ // no new data are queried, the target paimon table is not changed.
+ checkAnswer(currentResult(), totalStreamingData)
+
+ spark.sql("INSERT INTO T1 VALUES (50, 'v_50'), (51, 'v_51'), (52,
'v_52')")
+ query2.processAllAvailable()
+ totalStreamingData =
+ totalStreamingData ++ (Row(50, "v_50") :: Row(51, "v_51") ::
Row(52, "v_52") :: Nil)
+ checkAnswer(currentResult(), totalStreamingData)
+ } finally {
+ query2.stop()
+ }
+ }
+ }
+
+ case class TableSnapshotState(
+ table: String,
+ location: String,
+ snapshotData: Seq[Row],
+ latestChanges: Seq[Row]
+ )
+
+ /** Create a paimon table, insert some data, return the location of this
table. */
+ private def prepareTableAndGetLocation(
+ snapshotNum: Int,
+ writeMode: WriteMode,
+ tableName: String = "T"): TableSnapshotState = {
+
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+
+ val primaryKeysProp = if (writeMode == WriteMode.CHANGE_LOG) {
+ "'primary-key'='a',"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='2', 'file.format'='parquet')
+ |""".stripMargin)
+ val location = loadTable(tableName).location().getPath
+
+ val mergedData = scala.collection.mutable.TreeMap.empty[Int, String]
+ val unmergedData = scala.collection.mutable.ArrayBuffer.empty[(Int,
String)]
+ var latestChanges = Array.empty[(Int, String)]
+
+ def updateData(row: (Int, String)): Unit = {
+ writeMode match {
+ case WriteMode.CHANGE_LOG =>
+ mergedData += (row._1 -> row._2)
+ case WriteMode.APPEND_ONLY =>
+ unmergedData += row
+ case _ =>
+ throw new IllegalArgumentException("Please provide write mode
explicitly.")
+ }
+ }
+
+ def currentTableSnapshotState: (Array[Row], Array[Row]) = {
+ def toRow(data: (Int, String)) = Row(data._1, data._2)
+
+ writeMode match {
+ case WriteMode.CHANGE_LOG =>
+ (mergedData.toArray[(Int, String)].map(toRow),
latestChanges.map(toRow))
+ case WriteMode.APPEND_ONLY =>
+ (unmergedData.sorted.toArray.map(toRow), latestChanges.map(toRow))
+ case _ =>
+ throw new IllegalArgumentException("Please provide write mode
explicitly.")
+ }
+ }
+
+ (1 to snapshotNum).foreach {
+ round =>
+ val startId = 10 * round
+ val data = (startId to startId + 2).map {
+ id =>
+ val row = (id, s"v_$id")
+ updateData(row)
+ row
+ }
+ latestChanges = data.toArray
+ data.toDF("a",
"b").write.format("paimon").mode("append").save(location)
+ }
+
+ val snapshotState = currentTableSnapshotState
+ TableSnapshotState("T", location, snapshotState._1, snapshotState._2)
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 10977ee7d..5ba073120 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -69,6 +69,10 @@ class PaimonSparkTestBase extends QueryTest with
SharedSparkSession with WithTab
spark.sql(s"DROP TABLE IF EXISTS $tableName0")
}
+ protected def withTempDirs(f: (File, File) => Unit): Unit = {
+ withTempDir(file1 => withTempDir(file2 => f(file1, file2)))
+ }
+
override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
println(testName)