This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8801b342d [spark] Supports Spark Streaming Source (#1842)
8801b342d is described below

commit 8801b342d1432298bea82bb892c19c9685f30633
Author: Yann Byron <[email protected]>
AuthorDate: Thu Sep 7 20:13:03 2023 +0800

    [spark] Supports Spark Streaming Source (#1842)
---
 .../java/org/apache/paimon/options/Options.java    |   4 +
 .../table/source/InnerStreamTableScanImpl.java     |  12 +-
 .../paimon/table/source/StreamTableScan.java       |   2 +-
 .../ContinuousFromTimestampStartingScanner.java    |   9 -
 .../snapshot/ContinuousLatestStartingScanner.java  |   9 -
 .../table/source/snapshot/StartingContext.java     |   2 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   4 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  |   2 +-
 .../java/org/apache/paimon/spark/SparkScan.java    |  13 +-
 .../org/apache/paimon/spark/SparkScanBuilder.java  |   1 +
 .../java/org/apache/paimon/spark/SparkTable.java   |   1 +
 .../paimon/spark/PaimonPartitionManagement.scala   |  12 +-
 .../org/apache/paimon/spark/SparkSource.scala      |   6 +-
 .../paimon/spark/commands/PaimonCommand.scala      |   4 +-
 .../paimon/spark/commands/SchemaHelper.scala       |   4 +-
 ...SchemaHelper.scala => WithFileStoreTable.scala} |  28 +-
 .../spark/sources/PaimonMicroBatchStream.scala     |  81 ++++
 .../paimon/spark/sources/PaimonSourceOffset.scala  |  63 +++
 .../apache/paimon/spark/sources/StreamHelper.scala | 146 ++++++
 .../org/apache/paimon/spark/util/JsonUtils.scala   |  41 ++
 .../org/apache/paimon/spark/PaimonSourceTest.scala | 537 +++++++++++++++++++++
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |   4 +
 22 files changed, 917 insertions(+), 68 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java 
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index a6b82fd37..ee8a249df 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -152,6 +152,10 @@ public class Options implements Serializable {
         return new Options(newData);
     }
 
+    public synchronized void remove(String key) {
+        data.remove(key);
+    }
+
     public synchronized boolean containsKey(String key) {
         return data.containsKey(key);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 4ff0b3be0..0968ac361 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
+import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -238,9 +239,14 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     }
 
     @Override
-    public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
-        restore(nextSnapshotId);
-        snapshotReader.withMode(scanMode);
+    public void restore(@Nullable Long nextSnapshotId, boolean 
scanAllSnapshot) {
+        if (nextSnapshotId != null && scanAllSnapshot) {
+            startingScanner =
+                    new StaticFromSnapshotStartingScanner(snapshotManager, 
nextSnapshotId);
+            restore(null);
+        } else {
+            restore(nextSnapshotId);
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 5c8932059..2df743428 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -44,7 +44,7 @@ public interface StreamTableScan extends TableScan, 
Restorable<Long> {
     void restore(@Nullable Long nextSnapshotId);
 
     /** Restore from checkpoint next snapshot id with scan kind. */
-    void restore(@Nullable Long nextSnapshotId, ScanMode scanMode);
+    void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
 
     /** Checkpoint to return next snapshot id. */
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 6113773ff..7fc8cd6aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -42,15 +42,6 @@ public class ContinuousFromTimestampStartingScanner extends 
AbstractStartingScan
         this.startingSnapshotId = 
this.snapshotManager.earlierThanTimeMills(this.startupMillis);
     }
 
-    @Override
-    public StartingContext startingContext() {
-        if (startingSnapshotId == null) {
-            return StartingContext.EMPTY;
-        } else {
-            return new StartingContext(startingSnapshotId + 1, false);
-        }
-    }
-
     @Override
     public Result scan(SnapshotReader snapshotReader) {
         Long startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index 423181b37..c1a6054ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -38,15 +38,6 @@ public class ContinuousLatestStartingScanner extends 
AbstractStartingScanner {
         this.startingSnapshotId = snapshotManager.latestSnapshotId();
     }
 
-    @Override
-    public StartingContext startingContext() {
-        if (startingSnapshotId == null) {
-            return StartingContext.EMPTY;
-        } else {
-            return new StartingContext(startingSnapshotId + 1, false);
-        }
-    }
-
     @Override
     public Result scan(SnapshotReader snapshotReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
index 9204f4d47..dcafed348 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingContext.java
@@ -43,5 +43,5 @@ public class StartingContext {
         return this.scanFullSnapshot;
     }
 
-    public static final StartingContext EMPTY = new StartingContext(0L, false);
+    public static final StartingContext EMPTY = new StartingContext(1L, false);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 28bba8901..cd2390e6a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -335,8 +335,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) {
-            streamScan.restore(nextSnapshotId, scanMode);
+        public void restore(@Nullable Long nextSnapshotId, boolean 
scanAllSnapshot) {
+            streamScan.restore(nextSnapshotId, scanAllSnapshot);
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index adf88ffe6..0a02ac512 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -867,7 +867,7 @@ public class ContinuousFileSplitEnumeratorTest {
         public void restore(Long state) {}
 
         @Override
-        public void restore(@Nullable Long nextSnapshotId, ScanMode scanMode) 
{}
+        public void restore(@Nullable Long nextSnapshotId, boolean 
scanAllSnapshot) {}
 
         public void allowEnd(boolean allowEnd) {
             this.allowEnd = allowEnd;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index 1c5c7b50a..e277834b5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 
@@ -27,6 +30,7 @@ import 
org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.List;
@@ -39,11 +43,13 @@ import java.util.OptionalLong;
  */
 public class SparkScan implements Scan, SupportsReportStatistics {
 
+    private final Table table;
     private final ReadBuilder readBuilder;
 
     private List<Split> splits;
 
-    public SparkScan(ReadBuilder readBuilder) {
+    public SparkScan(Table table, ReadBuilder readBuilder) {
+        this.table = table;
         this.readBuilder = readBuilder;
     }
 
@@ -75,6 +81,11 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
         };
     }
 
+    @Override
+    public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
+        return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder, 
checkpointLocation);
+    }
+
     protected List<Split> splits() {
         if (splits == null) {
             splits = readBuilder.newScan().plan().splits();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
index 8fe3868b3..ec7367ed2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
@@ -81,6 +81,7 @@ public class SparkScanBuilder
     @Override
     public Scan build() {
         return new SparkScan(
+                table,
                 
table.newReadBuilder().withFilter(predicates).withProjection(projectedFields));
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 77a5909a4..674b4fbc2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -89,6 +89,7 @@ public class SparkTable
         capabilities.add(TableCapability.V1_BATCH_WRITE);
         capabilities.add(TableCapability.OVERWRITE_BY_FILTER);
         capabilities.add(TableCapability.OVERWRITE_DYNAMIC);
+        capabilities.add(TableCapability.MICRO_BATCH_READ);
         return capabilities;
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 16bd91145..d4461ff1c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -17,11 +17,9 @@
  */
 package org.apache.paimon.spark
 
-import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.operation.FileStoreCommit
-import org.apache.paimon.table.{AbstractFileStoreTable, Table}
+import org.apache.paimon.table.AbstractFileStoreTable
 import org.apache.paimon.table.sink.BatchWriteBuilder
-import org.apache.paimon.table.source.TableScan
 import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.{FileStorePathFactory, RowDataPartitionComputer}
 
@@ -30,8 +28,8 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, 
InternalRow}
 import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
 import org.apache.spark.sql.types.StructType
 
-import java.util
 import java.util.{Collections, UUID}
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 
@@ -75,11 +73,11 @@ trait PaimonPartitionManagement extends 
SupportsPartitionManagement {
 
   override def replacePartitionMetadata(
       ident: InternalRow,
-      properties: util.Map[String, String]): Unit = {
+      properties: JMap[String, String]): Unit = {
     throw new UnsupportedOperationException("Replace partition is not 
supported")
   }
 
-  override def loadPartitionMetadata(ident: InternalRow): util.Map[String, 
String] = {
+  override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] 
= {
     throw new UnsupportedOperationException("Load partition is not supported")
   }
 
@@ -123,7 +121,7 @@ trait PaimonPartitionManagement extends 
SupportsPartitionManagement {
       .toArray
   }
 
-  override def createPartition(ident: InternalRow, properties: 
util.Map[String, String]): Unit = {
+  override def createPartition(ident: InternalRow, properties: JMap[String, 
String]): Unit = {
     throw new UnsupportedOperationException("Create partition is not 
supported")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index d9675ca64..9495cb0fe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-import java.util
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 
@@ -61,7 +61,7 @@ class SparkSource
   override def getTable(
       schema: StructType,
       partitioning: Array[Transform],
-      properties: util.Map[String, String]): Table = {
+      properties: JMap[String, String]): Table = {
     new SparkTable(loadTable(properties))
   }
 
@@ -76,7 +76,7 @@ class SparkSource
     SparkSource.toBaseRelation(table, sqlContext)
   }
 
-  private def loadTable(options: util.Map[String, String]): FileStoreTable = {
+  private def loadTable(options: JMap[String, String]): FileStoreTable = {
     val catalogContext = CatalogContext.create(
       Options.fromMap(options),
       SparkSession.active.sessionState.newHadoopConf())
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 9738c5c83..6e88ec6c8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -27,12 +27,10 @@ import org.apache.spark.sql.sources.{AlwaysTrue, And, 
EqualNullSafe, Filter}
 import java.io.IOException
 
 /** Helper trait for all paimon commands. */
-trait PaimonCommand {
+trait PaimonCommand extends WithFileStoreTable {
 
   val BUCKET_COL = "_bucket_"
 
-  def table: FileStoreTable
-
   lazy val bucketMode: BucketMode = table match {
     case fileStoreTable: FileStoreTable =>
       fileStoreTable.bucketMode
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index 8a8415d64..1fd2f2c58 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -26,13 +26,13 @@ import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
-trait SchemaHelper {
+trait SchemaHelper extends WithFileStoreTable {
 
   val originTable: FileStoreTable
 
   protected var newTable: Option[FileStoreTable] = None
 
-  def table: FileStoreTable = newTable.getOrElse(originTable)
+  override def table: FileStoreTable = newTable.getOrElse(originTable)
 
   def tableSchema: TableSchema = table.schema
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
similarity index 50%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
index 8a8415d64..03ca0e3d1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
@@ -17,34 +17,10 @@
  */
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.schema.{SchemaMergingUtils, TableSchema}
-import org.apache.paimon.spark.SparkTypeUtils
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.types.StructType
+private[spark] trait WithFileStoreTable {
 
-import scala.collection.JavaConverters._
-
-trait SchemaHelper {
-
-  val originTable: FileStoreTable
-
-  protected var newTable: Option[FileStoreTable] = None
-
-  def table: FileStoreTable = newTable.getOrElse(originTable)
-
-  def tableSchema: TableSchema = table.schema
-
-  def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast: 
Boolean): Unit = {
-    val dataRowType = 
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
-    if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
-      newTable = Some(table.copyWithLatestSchema())
-    }
-  }
-
-  def updateTableWithOptions(options: Map[String, String]): Unit = {
-    newTable = Some(table.copy(options.asJava))
-  }
+  def table: FileStoreTable
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
new file mode 100644
index 000000000..e8b573c2e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.spark.{SparkInputPartition, SparkReaderFactory}
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.source.ReadBuilder
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+
+class PaimonMicroBatchStream(
+    originTable: FileStoreTable,
+    readBuilder: ReadBuilder,
+    checkpointLocation: String)
+  extends MicroBatchStream
+  with StreamHelper
+  with Logging {
+
+  private var committedOffset: Option[PaimonSourceOffset] = None
+
+  lazy val initOffset: PaimonSourceOffset = 
PaimonSourceOffset(getStartingContext)
+
+  override def latestOffset(): Offset = {
+    getLatestOffset
+  }
+
+  override def planInputPartitions(start: Offset, end: Offset): 
Array[InputPartition] = {
+    val startOffset = {
+      val startOffset0 = PaimonSourceOffset.apply(start)
+      if (startOffset0.compareTo(initOffset) < 0) {
+        initOffset
+      } else {
+        startOffset0
+      }
+    }
+    val endOffset = PaimonSourceOffset.apply(end)
+
+    getBatch(startOffset, endOffset)
+      .map(ids => new SparkInputPartition(ids.entry))
+      .toArray[InputPartition]
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new SparkReaderFactory(readBuilder)
+  }
+
+  override def initialOffset(): Offset = {
+    initOffset
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    PaimonSourceOffset.apply(json)
+  }
+
+  override def commit(end: Offset): Unit = {
+    committedOffset = Some(PaimonSourceOffset.apply(end))
+    logInfo(s"$committedOffset is committed.")
+  }
+
+  override def stop(): Unit = {}
+
+  override def table: FileStoreTable = originTable
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
new file mode 100644
index 000000000..d64fe80f6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSourceOffset.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.spark.util.JsonUtils
+import org.apache.paimon.table.source.snapshot.StartingContext
+
+import org.apache.spark.sql.connector.read.streaming.Offset
+
+case class PaimonSourceOffset(snapshotId: Long, index: Long, scanSnapshot: 
Boolean)
+  extends Offset
+  with Comparable[PaimonSourceOffset] {
+
+  override def json(): String = {
+    JsonUtils.toJson(this)
+  }
+
+  override def compareTo(o: PaimonSourceOffset): Int = {
+    o match {
+      case PaimonSourceOffset(snapshotId, index, scanSnapshot) =>
+        val diff = this.snapshotId.compare(snapshotId)
+        if (diff == 0) {
+          this.index.compare(index)
+        } else {
+          diff
+        }
+    }
+  }
+}
+
+object PaimonSourceOffset {
+  def apply(version: Long, index: Long, scanSnapshot: Boolean): 
PaimonSourceOffset = {
+    new PaimonSourceOffset(
+      version,
+      index,
+      scanSnapshot
+    )
+  }
+
+  def apply(offset: Any): PaimonSourceOffset = {
+    offset match {
+      case o: PaimonSourceOffset => o
+      case json: String => JsonUtils.fromJson[PaimonSourceOffset](json)
+      case sc: StartingContext => PaimonSourceOffset(sc.getSnapshotId, -1, 
sc.getScanFullSnapshot)
+      case _ => throw new IllegalArgumentException(s"Can't parse $offset to 
PaimonSourceOffset.")
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
new file mode 100644
index 000000000..0994bdc59
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sources
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.spark.commands.WithFileStoreTable
+import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan, 
ScanMode}
+import org.apache.paimon.table.source.TableScan.Plan
+import org.apache.paimon.table.source.snapshot.StartingContext
+import org.apache.paimon.utils.RowDataPartitionComputer
+
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit, 
isLast: Boolean)
+
+trait StreamHelper extends WithFileStoreTable {
+
+  val initOffset: PaimonSourceOffset
+
+  private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
+
+  private lazy val partitionSchema: StructType =
+    SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
+
+  private lazy val partitionComputer: RowDataPartitionComputer = new 
RowDataPartitionComputer(
+    new CoreOptions(table.schema.options).partitionDefaultName,
+    table.schema.logicalPartitionType,
+    table.schema.partitionKeys.asScala.toArray
+  )
+
+  // Used to get the initial offset.
+  def getStartingContext: StartingContext = streamScan.startingContext()
+
+  def getLatestOffset: PaimonSourceOffset = {
+    val latestSnapshotId = table.snapshotManager().latestSnapshotId()
+    val plan = if (needToScanCurrentSnapshot(latestSnapshotId)) {
+      table
+        .newSnapshotReader()
+        .withSnapshot(latestSnapshotId)
+        .withMode(ScanMode.ALL)
+        .read()
+    } else {
+      table
+        .newSnapshotReader()
+        .withSnapshot(latestSnapshotId)
+        .withMode(ScanMode.DELTA)
+        .read()
+    }
+    val indexedDataSplits = convertPlanToIndexedSplits(plan)
+    indexedDataSplits.lastOption
+      .map(ids => PaimonSourceOffset(ids.snapshotId, ids.index, scanSnapshot = 
false))
+      .orNull
+  }
+
+  def getBatch(
+      startOffset: PaimonSourceOffset,
+      endOffset: PaimonSourceOffset): Array[IndexedDataSplit] = {
+    val indexedDataSplits = mutable.ArrayBuffer.empty[IndexedDataSplit]
+    if (startOffset != null) {
+      streamScan.restore(startOffset.snapshotId, 
needToScanCurrentSnapshot(startOffset.snapshotId))
+    }
+    var hasSplits = true
+    while (hasSplits && streamScan.checkpoint() <= endOffset.snapshotId) {
+      val plan = streamScan.plan()
+      if (plan.splits.isEmpty) {
+        hasSplits = false
+      } else {
+        indexedDataSplits ++= convertPlanToIndexedSplits(plan)
+      }
+    }
+    indexedDataSplits.filter(ids => inRange(ids, startOffset, 
endOffset)).toArray
+  }
+
+  private def needToScanCurrentSnapshot(snapshotId: Long): Boolean = {
+    snapshotId == initOffset.snapshotId && initOffset.scanSnapshot
+  }
+
+  /** Sort the [[DataSplit]] list and index them. */
+  private def convertPlanToIndexedSplits(plan: Plan): Array[IndexedDataSplit] 
= {
+    val dataSplits =
+      plan.splits().asScala.collect { case dataSplit: DataSplit => dataSplit 
}.toArray
+    val snapshotId = dataSplits.head.snapshotId()
+    val length = dataSplits.length
+
+    dataSplits
+      .sortWith((ds1, ds2) => compareByPartitionAndBucket(ds1, ds2) < 0)
+      .zipWithIndex
+      .map {
+        case (split, idx) =>
+          IndexedDataSplit(snapshotId, idx, split, idx == length - 1)
+      }
+  }
+
+  private def compareByPartitionAndBucket(dataSplit1: DataSplit, dataSplit2: 
DataSplit): Int = {
+    val res = compareBinaryRow(dataSplit1.partition, dataSplit2.partition)
+    if (res == 0) {
+      dataSplit1.bucket - dataSplit2.bucket
+    } else {
+      res
+    }
+  }
+
+  private def compareBinaryRow(row1: BinaryRow, ror2: BinaryRow): Int = {
+    val partitionPath1 = PartitioningUtils.getPathFragment(
+      partitionComputer.generatePartValues(row1).asScala.toMap,
+      partitionSchema)
+    val partitionPath2 = PartitioningUtils.getPathFragment(
+      partitionComputer.generatePartValues(ror2).asScala.toMap,
+      partitionSchema)
+    partitionPath1.compareTo(partitionPath2)
+  }
+
+  private def inRange(
+      indexedDataSplit: IndexedDataSplit,
+      start: PaimonSourceOffset,
+      end: PaimonSourceOffset): Boolean = {
+    val startRange = indexedDataSplit.snapshotId > start.snapshotId ||
+      (indexedDataSplit.snapshotId == start.snapshotId && 
indexedDataSplit.index > start.index)
+    val endRange = indexedDataSplit.snapshotId < end.snapshotId ||
+      (indexedDataSplit.snapshotId == end.snapshotId && indexedDataSplit.index 
<= end.index)
+
+    startRange && endRange
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/JsonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/JsonUtils.scala
new file mode 100644
index 000000000..bb2a8b741
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/JsonUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.util
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, 
ScalaObjectMapper}
+
+object JsonUtils {
+
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper {}
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def toJson[T: Manifest](obj: T): String = {
+    mapper.writeValueAsString(obj)
+  }
+
+  def fromJson[T: Manifest](json: String): T = {
+    mapper.readValue[T](json)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
new file mode 100644
index 000000000..b99ed592b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.WriteMode
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.junit.jupiter.api.Assertions
+
+import java.util.concurrent.TimeUnit
+
+class PaimonSourceTest extends PaimonSparkTestBase with StreamTest {
+
+  import testImplicits._
+
+  test("Paimon Source: default scan mode") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query = spark.readStream
+          .format("paimon")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          query.processAllAvailable()
+          var totalStreamingData = snapshotData
+          // in the default mode without any related configs, only data 
written in the last time will be read.
+          checkAnswer(currentResult(), totalStreamingData)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query.processAllAvailable()
+          totalStreamingData ++= (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult(), totalStreamingData)
+
+          spark.sql("INSERT INTO T VALUES (50, 'v_50'), (51, 'v_51'), (52, 
'v_52')")
+          query.processAllAvailable()
+          totalStreamingData ++= (Row(50, "v_50") :: Row(51, "v_51") :: 
Row(52, "v_52") :: Nil)
+          checkAnswer(currentResult(), totalStreamingData)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: default and from-snapshot scan mode with 
scan.snapshot-id") {
+    withTempDirs {
+      (checkpointDir1, checkpointDir2) =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        // set scan.snapshot-id = 3, this query can read the latest changes.
+        val query1 = spark.readStream
+          .format("paimon")
+          .option("scan.snapshot-id", 3)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+          .queryName("mem_table1")
+          .outputMode("append")
+          .start()
+
+        // set scan.snapshot-id = 4, this query will read data from the next 
commit.
+        val query2 = spark.readStream
+          .format("paimon")
+          .option("scan.snapshot-id", 4)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+          .queryName("mem_table2")
+          .outputMode("append")
+          .start()
+
+        val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+        val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+        try {
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          var totalStreamingData1 = latestChanges
+          var totalStreamingData2 = Seq.empty[Row]
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          totalStreamingData1 =
+            totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          totalStreamingData2 =
+            totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+        } finally {
+          query1.stop()
+          query2.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: default and from-timestamp scan mode with 
scan.timestamp-millis") {
+    withTempDirs {
+      (checkpointDir1, checkpointDir2) =>
+        // timestamp that is before this table is created and data is written.
+        val ts1 = System.currentTimeMillis()
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+        // timestamp that is after this table is created and data is written.
+        val ts2 = System.currentTimeMillis()
+
+        val query1 = spark.readStream
+          .format("paimon")
+          .option("scan.timestamp-millis", ts1)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+          .queryName("mem_table1")
+          .outputMode("append")
+          .start()
+
+        val query2 = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "from-timestamp")
+          .option("scan.timestamp-millis", ts2)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+          .queryName("mem_table2")
+          .outputMode("append")
+          .start()
+
+        val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+        val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+        try {
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          checkAnswer(currentResult1(), snapshotData)
+          checkAnswer(currentResult2(), Seq.empty)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          val totalStreamingData1 =
+            snapshotData ++ (Row(40, "v_40") :: Row(41, "v_41") :: Row(42, 
"v_42") :: Nil)
+          val totalStreamingData2 = Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+        } finally {
+          query1.stop()
+          query2.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: latest and latest-full scan mode") {
+    withTempDirs {
+      (checkpointDir1, checkpointDir2) =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query1 = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "latest-full")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+          .queryName("mem_table1")
+          .outputMode("append")
+          .start()
+
+        val query2 = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "latest")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+          .queryName("mem_table2")
+          .outputMode("append")
+          .start()
+
+        val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+        val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+        try {
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          // query1 uses the latest-full mode, which will scan the whole 
snapshot, not just the changes.
+          var totalStreamingData1 = snapshotData
+          // query2 uses the latest mode, which will only scan the changes.
+          var totalStreamingData2 = latestChanges
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          totalStreamingData1 =
+            totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          totalStreamingData2 =
+            totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+        } finally {
+          query1.stop()
+          query2.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: from-snapshot and from-snapshot-full scan mode") {
+    withTempDirs {
+      (checkpointDir1, checkpointDir2) =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query1 = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "from-snapshot-full")
+          .option("scan.snapshot-id", 3)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+          .queryName("mem_table1")
+          .outputMode("append")
+          .start()
+
+        val query2 = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "from-snapshot")
+          .option("scan.snapshot-id", 3)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+          .queryName("mem_table2")
+          .outputMode("append")
+          .start()
+
+        val currentResult1 = () => spark.sql("SELECT * FROM mem_table1")
+        val currentResult2 = () => spark.sql("SELECT * FROM mem_table2")
+        try {
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          // query1 uses the from-snapshot-full mode, which will scan the 
whole snapshot, not just the changes.
+          var totalStreamingData1 = snapshotData
+          // query2 uses the from-snapshot mode, which will only scan the 
changes.
+          var totalStreamingData2 = latestChanges
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query1.processAllAvailable()
+          query2.processAllAvailable()
+          totalStreamingData1 =
+            totalStreamingData1 ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          totalStreamingData2 =
+            totalStreamingData2 ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult1(), totalStreamingData1)
+          checkAnswer(currentResult2(), totalStreamingData2)
+        } finally {
+          query1.stop()
+          query2.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: Trigger AvailableNow") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "latest")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .trigger(Trigger.AvailableNow())
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          Assertions.assertTrue(query.isActive)
+          query.processAllAvailable()
+          checkAnswer(currentResult(), latestChanges)
+
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          Assertions.assertFalse(query.isActive)
+          query.processAllAvailable()
+          // The query has been stopped after all available data at the start 
of the query have been read.
+          // So no more data will be read.
+          checkAnswer(currentResult(), latestChanges)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: Trigger ProcessingTime 5s") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "latest")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          spark.sql("INSERT INTO T VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          spark.sql("INSERT INTO T VALUES (50, 'v_50'), (51, 'v_51'), (52, 
'v_52')")
+          var totalStreamingData = latestChanges ++ (Row(40, "v_40") :: 
Row(41, "v_41") :: Row(
+            42,
+            "v_42") :: Row(50, "v_50") :: Row(51, "v_51") :: Row(52, "v_52") 
:: Nil)
+          Thread.sleep(6 * 1000)
+          checkAnswer(currentResult(), totalStreamingData)
+        } finally {
+          query.stop()
+        }
+    }
+  }
+
+  test("Paimon Source: with error options") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, snapshotData, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        assertThrows[IllegalArgumentException] {
+          spark.readStream
+            .format("paimon")
+            .option("scan.snapshot-id", 3)
+            .option("scan.timestamp-millis", System.currentTimeMillis())
+            .load(location)
+            .writeStream
+            .format("memory")
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .queryName("mem_table")
+            .outputMode("append")
+            .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
+            .start()
+        }
+    }
+  }
+
+  test("Paimon Source: not supports compacted-full scan mode in streaming 
mode") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, _, _) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG)
+
+        val query = spark.readStream
+          .format("paimon")
+          .option("scan.mode", "incremental")
+          .option("incremental-between", "3,5")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        assert(intercept[Exception] {
+          query.processAllAvailable()
+        }.getMessage.contains("Cannot read incremental in streaming mode"))
+    }
+  }
+
+  test("Paimon Source and Sink") {
+    withTempDir {
+      checkpointDir =>
+        val TableSnapshotState(_, location, _, latestChanges) =
+          prepareTableAndGetLocation(3, WriteMode.CHANGE_LOG, tableName = "T1")
+
+        val TableSnapshotState(_, targetLocation, _, _) =
+          prepareTableAndGetLocation(0, WriteMode.APPEND_ONLY, tableName = 
"T2")
+
+        val df = spark.readStream
+          .format("paimon")
+          .option("scan.snapshot-id", "3")
+          .load(location)
+          .writeStream
+          .format("paimon")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+
+        val currentResult = () => spark.sql("SELECT * FROM T2")
+        var totalStreamingData = Seq.empty[Row]
+
+        val query1 = df.start(targetLocation)
+        try {
+          query1.processAllAvailable()
+          totalStreamingData = latestChanges
+          checkAnswer(currentResult(), totalStreamingData)
+
+          spark.sql("INSERT INTO T1 VALUES (40, 'v_40'), (41, 'v_41'), (42, 
'v_42')")
+          query1.processAllAvailable()
+          totalStreamingData =
+            totalStreamingData ++ (Row(40, "v_40") :: Row(41, "v_41") :: 
Row(42, "v_42") :: Nil)
+          checkAnswer(currentResult(), totalStreamingData)
+        } finally {
+          query1.stop()
+        }
+
+        // scan.snapshot-id should be ignored when restarting from a checkpoint
+        val query2 = df.start(targetLocation)
+        try {
+          query2.processAllAvailable()
+          // no new data are queried, the target paimon table is not changed.
+          checkAnswer(currentResult(), totalStreamingData)
+
+          spark.sql("INSERT INTO T1 VALUES (50, 'v_50'), (51, 'v_51'), (52, 
'v_52')")
+          query2.processAllAvailable()
+          totalStreamingData =
+            totalStreamingData ++ (Row(50, "v_50") :: Row(51, "v_51") :: 
Row(52, "v_52") :: Nil)
+          checkAnswer(currentResult(), totalStreamingData)
+        } finally {
+          query2.stop()
+        }
+    }
+  }
+
+  case class TableSnapshotState(
+      table: String,
+      location: String,
+      snapshotData: Seq[Row],
+      latestChanges: Seq[Row]
+  )
+
+  /** Create a paimon table, insert some data, return the location of this 
table. */
+  private def prepareTableAndGetLocation(
+      snapshotNum: Int,
+      writeMode: WriteMode,
+      tableName: String = "T"): TableSnapshotState = {
+
+    spark.sql(s"DROP TABLE IF EXISTS $tableName")
+
+    val primaryKeysProp = if (writeMode == WriteMode.CHANGE_LOG) {
+      "'primary-key'='a',"
+    } else {
+      ""
+    }
+    spark.sql(
+      s"""
+         |CREATE TABLE $tableName (a INT, b STRING)
+         |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='2', 'file.format'='parquet')
+         |""".stripMargin)
+    val location = loadTable(tableName).location().getPath
+
+    val mergedData = scala.collection.mutable.TreeMap.empty[Int, String]
+    val unmergedData = scala.collection.mutable.ArrayBuffer.empty[(Int, 
String)]
+    var latestChanges = Array.empty[(Int, String)]
+
+    def updateData(row: (Int, String)): Unit = {
+      writeMode match {
+        case WriteMode.CHANGE_LOG =>
+          mergedData += (row._1 -> row._2)
+        case WriteMode.APPEND_ONLY =>
+          unmergedData += row
+        case _ =>
+          throw new IllegalArgumentException("Please provide write mode 
explicitly.")
+      }
+    }
+
+    def currentTableSnapshotState: (Array[Row], Array[Row]) = {
+      def toRow(data: (Int, String)) = Row(data._1, data._2)
+
+      writeMode match {
+        case WriteMode.CHANGE_LOG =>
+          (mergedData.toArray[(Int, String)].map(toRow), 
latestChanges.map(toRow))
+        case WriteMode.APPEND_ONLY =>
+          (unmergedData.sorted.toArray.map(toRow), latestChanges.map(toRow))
+        case _ =>
+          throw new IllegalArgumentException("Please provide write mode 
explicitly.")
+      }
+    }
+
+    (1 to snapshotNum).foreach {
+      round =>
+        val startId = 10 * round
+        val data = (startId to startId + 2).map {
+          id =>
+            val row = (id, s"v_$id")
+            updateData(row)
+            row
+        }
+        latestChanges = data.toArray
+        data.toDF("a", 
"b").write.format("paimon").mode("append").save(location)
+    }
+
+    val snapshotState = currentTableSnapshotState
+    TableSnapshotState("T", location, snapshotState._1, snapshotState._2)
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 10977ee7d..5ba073120 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -69,6 +69,10 @@ class PaimonSparkTestBase extends QueryTest with 
SharedSparkSession with WithTab
     spark.sql(s"DROP TABLE IF EXISTS $tableName0")
   }
 
+  protected def withTempDirs(f: (File, File) => Unit): Unit = {
+    withTempDir(file1 => withTempDir(file2 => f(file1, file2)))
+  }
+
   override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
       pos: Position): Unit = {
     println(testName)

Reply via email to