This is an automated email from the ASF dual-hosted git repository.

qingwzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e14de22 feat: Paimon stream source. (#662)
69e14de22 is described below

commit 69e14de224361ddc585fd0626d20f60a54e1cdeb
Author: moses <[email protected]>
AuthorDate: Wed Nov 26 19:06:38 2025 +0800

    feat: Paimon stream source. (#662)
    
    * feat: Paimon stream source.
    
    * add lisences
    
    * check style
    
    * check style
    
    * minor fix
    
    * fix list partitions
    
    * throws exception
    
    * fix tests
---
 .../dsl/connector/paimon/IteratorWrapper.java      |  51 +++
 .../dsl/connector/paimon/PaimonConfigKeys.java     |  14 +
 .../connector/paimon/PaimonRecordDeserializer.java |  24 +-
 .../dsl/connector/paimon/PaimonTableSource.java    | 384 ++++++++++++++++-----
 .../geaflow/dsl/connector/paimon/SourceMode.java   |  26 ++
 .../geaflow/dsl/connector/paimon/StartupMode.java  |  56 +++
 .../connector/paimon/PaimonTableConnectorTest.java | 307 +++++++++++++---
 7 files changed, 712 insertions(+), 150 deletions(-)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/IteratorWrapper.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/IteratorWrapper.java
new file mode 100644
index 000000000..3ce921708
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/IteratorWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.CloseableIterator;
+
+public class IteratorWrapper implements CloseableIterator<Row> {
+
+    private final CloseableIterator<InternalRow> iterator;
+    private final PaimonRecordDeserializer deserializer;
+
+    public IteratorWrapper(CloseableIterator<InternalRow> iterator,
+                           PaimonRecordDeserializer deserializer) {
+        this.iterator = iterator;
+        this.deserializer = deserializer;
+    }
+
+    @Override
+    public void close() throws Exception {
+        iterator.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public Row next() {
+        return deserializer.deserialize(iterator.next());
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
index 106ef026b..7218ed73b 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
@@ -49,4 +49,18 @@ public class PaimonConfigKeys {
         .noDefaultValue()
         .description("The paimon table name to read.");
 
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_SOURCE_MODE = ConfigKeys
+            .key("geaflow.dsl.paimon.source.mode")
+            .defaultValue(SourceMode.BATCH.name())
+            .description("The paimon source mode, if stream, will continue to 
read data from paimon.");
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID = 
ConfigKeys
+            .key("geaflow.dsl.paimon.scan.snapshot.id")
+            .defaultValue(null)
+            .description("If scan mode is from-snapshot, this parameter is 
required.");
+
+    public static final ConfigKey GEAFLOW_DSL_PAIMON_SCAN_MODE = ConfigKeys
+            .key("geaflow.dsl.paimon.scan.mode")
+            .defaultValue(StartupMode.LATEST.getValue())
+            .description("Determines the scan mode for paimon source, 'latest' 
or 'from-snapshot'.");
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
index d38c360d6..1cbd523c6 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
@@ -19,9 +19,6 @@
 
 package org.apache.geaflow.dsl.connector.paimon;
 
-import java.util.Collections;
-import java.util.List;
-import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.common.type.Types;
 import org.apache.geaflow.dsl.common.data.Row;
 import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
@@ -29,22 +26,21 @@ import 
org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
 import org.apache.geaflow.dsl.common.types.StructType;
 import org.apache.geaflow.dsl.common.types.TableField;
 import org.apache.geaflow.dsl.common.types.TableSchema;
-import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
 import org.apache.paimon.data.InternalRow;
 
-public class PaimonRecordDeserializer implements TableDeserializer<Object> {
+public class PaimonRecordDeserializer {
 
     private StructType schema;
-    private TableSchema tableSchema;
 
-    @Override
-    public void init(Configuration conf, StructType schema) {
-        this.tableSchema = (TableSchema) schema;
-        this.schema = this.tableSchema.getDataSchema();
+    public void init(StructType schema) {
+        TableSchema tableSchema = (TableSchema) schema;
+        this.schema = tableSchema.getDataSchema();
     }
 
-    @Override
-    public List<Row> deserialize(Object record) {
+    public Row deserialize(Object record) {
+        if (record == null) {
+            return null;
+        }
         InternalRow internalRow = (InternalRow) record;
         assert internalRow.getFieldCount() == schema.size();
         Object[] values = new Object[schema.size()];
@@ -70,8 +66,6 @@ public class PaimonRecordDeserializer implements 
TableDeserializer<Object> {
                     values[i] = internalRow.getLong(i);
                     break;
                 case Types.TYPE_NAME_STRING:
-                    values[i] = internalRow.getString(i);
-                    break;
                 case Types.TYPE_NAME_BINARY_STRING:
                     values[i] = internalRow.getString(i);
                     break;
@@ -80,6 +74,6 @@ public class PaimonRecordDeserializer implements 
TableDeserializer<Object> {
                         field.getType().getName());
             }
         }
-        return Collections.singletonList(ObjectRow.create(values));
+        return ObjectRow.create(values);
     }
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
index b980db11a..f5ba0eb59 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
@@ -21,20 +21,18 @@ package org.apache.geaflow.dsl.connector.paimon;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.commons.lang.StringUtils;
 import org.apache.geaflow.api.context.RuntimeContext;
 import org.apache.geaflow.common.config.Configuration;
-import org.apache.geaflow.common.config.keys.DSLConfigKeys;
 import org.apache.geaflow.common.utils.GsonUtil;
 import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
 import org.apache.geaflow.dsl.common.types.TableSchema;
-import org.apache.geaflow.dsl.common.util.Windows;
 import org.apache.geaflow.dsl.connector.api.FetchData;
 import org.apache.geaflow.dsl.connector.api.Offset;
 import org.apache.geaflow.dsl.connector.api.Partition;
@@ -50,19 +48,21 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.utils.CloseableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** Paimon table source. */
 public class PaimonTableSource implements TableSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonTableSource.class);
 
     private Configuration tableConf;
     private TableSchema tableSchema;
-    private boolean isAllWindow;
 
     private String path;
     private Map<String, String> options;
@@ -70,17 +70,18 @@ public class PaimonTableSource implements TableSource {
     private Map<String, String> configs;
     private String database;
     private String table;
+    private SourceMode sourceMode;
+    private final PaimonRecordDeserializer deserializer = new 
PaimonRecordDeserializer();
 
-    private transient CatalogContext catalogContext;
-    private transient Catalog catalog;
+    private transient long fromSnapshot;
     private transient ReadBuilder readBuilder;
-    private transient Map<PaimonPartition, RecordReader<InternalRow>> 
partition2Reader;
-    private transient Map<PaimonPartition, PaimonOffset> partition2InnerOffset;
+    private transient Map<Split, CloseableIterator<InternalRow>> iterators;
+    private transient Map<Split, RecordReader<InternalRow>> readers;
+    private transient Map<PaimonPartition, PaimonOffset> offsets;
 
     @Override
     public void init(Configuration tableConf, TableSchema tableSchema) {
         this.tableConf = tableConf;
-        this.isAllWindow = 
tableConf.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE) == 
Windows.SIZE_OF_ALL_WINDOW;
         this.tableSchema = tableSchema;
         this.path = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE, "");
         this.options = new HashMap<>();
@@ -89,71 +90,65 @@ public class PaimonTableSource implements TableSource {
             String optionJson = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_OPTIONS_JSON);
             Map<String, String> userOptions = GsonUtil.parse(optionJson);
             if (userOptions != null) {
-                for (Map.Entry<String, String> entry : userOptions.entrySet()) 
{
-                    options.put(entry.getKey(), entry.getValue());
-                }
+                options.putAll(userOptions);
             }
             this.configJson =
                 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON, "");
             if (!StringUtils.isBlank(configJson)) {
                 Map<String, String> userConfig = GsonUtil.parse(configJson);
                 if (userConfig != null) {
-                    for (Map.Entry<String, String> entry : 
userConfig.entrySet()) {
-                        configs.put(entry.getKey(), entry.getValue());
-                    }
+                    configs.putAll(userConfig);
                 }
             }
         }
         this.database = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME);
         this.table = 
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME);
+        this.sourceMode = 
SourceMode.valueOf(tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE).toUpperCase());
     }
 
     @Override
     public List<Partition> listPartitions(int parallelism) {
-        return listPartitions();
+        List<Partition> partitions = new ArrayList<>(parallelism);
+        for (int i = 0; i < parallelism; i++) {
+            partitions.add(new PaimonPartition(database, table));
+        }
+        return partitions;
+    }
+
+    @Override
+    public List<Partition> listPartitions() {
+        throw new UnsupportedOperationException("Please use listPartitions(int 
parallelism) instead");
     }
 
     @Override
     public void open(RuntimeContext context) {
-        if (StringUtils.isBlank(this.path)) {
-            if (StringUtils.isBlank(this.configJson)) {
-                this.catalogContext =
-                    Objects.requireNonNull(CatalogContext.create(new 
Options(options)));
-            } else {
-                org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
-                for (Map.Entry<String, String> entry : configs.entrySet()) {
-                    hadoopConf.set(entry.getKey(), entry.getValue());
-                }
-                this.catalogContext =
-                    Objects.requireNonNull(CatalogContext.create(new 
Options(options), hadoopConf));
-            }
-        } else {
-            this.catalogContext = 
Objects.requireNonNull(CatalogContext.create(new Path(path)));
-        }
-        this.catalog = 
Objects.requireNonNull(CatalogFactory.createCatalog(this.catalogContext));
+        Catalog catalog = getPaimonCatalog();
         Identifier identifier = Identifier.create(database, table);
         try {
             this.readBuilder = 
Objects.requireNonNull(catalog.getTable(identifier).newReadBuilder());
+            if 
(tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_MODE)
+                    .equalsIgnoreCase(StartupMode.FROM_SNAPSHOT.getValue())
+                && 
tableConf.contains(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID)) {
+                this.fromSnapshot = 
tableConf.getLong(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID);
+            } else {
+                this.fromSnapshot = 
catalog.getTable(identifier).latestSnapshotId().orElse(1L);
+            }
+            LOGGER.info("New partition will start from snapshot: {}", 
this.fromSnapshot);
         } catch (TableNotExistException e) {
             throw new GeaFlowDSLException("Table: {} in db: {} not exists.", 
table, database);
         }
-        this.partition2Reader = new HashMap<>();
-        this.partition2InnerOffset = new HashMap<>();
+        this.iterators = new HashMap<>();
+        this.readers = new HashMap<>();
+        this.offsets = new HashMap<>();
         LOGGER.info("Open paimon source, tableConf: {}, tableSchema: {}, path: 
{}, options: "
             + "{}, configs: {}, database: {}, tableName: {}", tableConf, 
tableSchema, path,
             options, configs, database, table);
-    }
-
-    @Override
-    public List<Partition> listPartitions() {
-        List<Split> splits = isAllWindow ? 
readBuilder.newScan().plan().splits() :
-                             readBuilder.newStreamScan().plan().splits();
-        return splits.stream().map(split -> new PaimonPartition(database, 
table, split)).collect(Collectors.toList());
+        this.deserializer.init(tableSchema);
     }
 
     @Override
     public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
-        return (TableDeserializer<IN>) new PaimonRecordDeserializer();
+        return null;
     }
 
     @Override
@@ -162,64 +157,220 @@ public class PaimonTableSource implements TableSource {
         PaimonPartition paimonPartition = (PaimonPartition) partition;
         assert paimonPartition.getDatabase().equals(this.database)
             && paimonPartition.getTable().equals(this.table);
-        RecordReader reader = partition2Reader.getOrDefault(partition,
-            readBuilder.newRead().createReader(paimonPartition.getSplit()));
-        partition2Reader.put(paimonPartition, reader);
 
-        PaimonOffset innerOffset = 
partition2InnerOffset.getOrDefault(partition,
-            new PaimonOffset());
-        partition2InnerOffset.put(paimonPartition, innerOffset);
+        long skip = 0;
+        PaimonOffset innerOffset = offsets.get(partition);
+        if (innerOffset == null) {
+            // first fetch, use custom specified snapshot id
+            innerOffset = new PaimonOffset(fromSnapshot);
+        }
+
+        // if startOffset is specified, use it and try reset innerOffset
+        if ((startOffset.isPresent() && 
!startOffset.get().equals(innerOffset))) {
+            skip = Math.abs(startOffset.get().getOffset() - 
innerOffset.getOffset());
+            innerOffset = PaimonOffset.from((PaimonOffset) startOffset.get());
+        }
+        if (paimonPartition.getCurrentSnapshot() != 
innerOffset.getSnapshotId()) {
+            paimonPartition.reset(loadSplitsFrom(innerOffset.getSnapshotId()));
+        }
+
+        Split split = paimonPartition.seek(innerOffset.getSplitIndex());
+        if (split == null) {
+            if (sourceMode == SourceMode.BATCH) {
+                LOGGER.info("No more split to fetch");
+                return FetchData.createBatchFetch(Collections.emptyIterator(), 
innerOffset);
+            } else {
+                LOGGER.debug("Snapshot {} not ready now", 
innerOffset.getSnapshotId());
+                return FetchData.createStreamFetch(Collections.emptyList(), 
innerOffset, false);
+            }
+        }
+        offsets.put(paimonPartition, innerOffset);
 
-        if (startOffset.isPresent() && !startOffset.get().equals(innerOffset)) 
{
-            throw new GeaFlowDSLException("Paimon connector not support reset 
offset.");
+        CloseableIterator<InternalRow> iterator = createRecordIterator(split);
+        if (skip > 0) {
+            while (iterator.hasNext() && skip > 0) {
+                iterator.next();
+                skip--;
+            }
         }
-        CloseableIterator iterator = reader.toCloseableIterator();
         switch (windowInfo.getType()) {
             case ALL_WINDOW:
-                return FetchData.createBatchFetch(iterator, new 
PaimonOffset());
+                return FetchData.createBatchFetch(new 
IteratorWrapper(iterator, deserializer), new PaimonOffset());
             case SIZE_TUMBLING_WINDOW:
                 List<Object> readContents = new ArrayList<>();
-                long i = 0;
-                for (; i < windowInfo.windowSize(); i++) {
+                long advance = 0;
+                for (long i = 0; i < windowInfo.windowSize(); i++) {
                     if (iterator.hasNext()) {
-                        readContents.add(iterator.next());
+                        advance++;
+                        
readContents.add(deserializer.deserialize(iterator.next()));
                     } else {
-                        break;
+                        if (sourceMode == SourceMode.BATCH) {
+                            break;
+                        }
+                        try {
+                            removeRecordIterator(split);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                        innerOffset = innerOffset.nextSplit();
+                        Split seek = 
paimonPartition.seek(innerOffset.getSplitIndex());
+                        if (seek == null) {
+                            // all split finished, try read next snapshot
+                            innerOffset = innerOffset.nextSnapshot();
+                            
paimonPartition.reset(loadSplitsFrom(innerOffset.getSnapshotId()));
+                        }
+                        offsets.put(paimonPartition, innerOffset);
+                        advance = 0L;
+                        seek = 
paimonPartition.seek(innerOffset.getSplitIndex());
+                        if (seek == null) {
+                            // no new snapshot discovered, retry next turn
+                            break;
+                        }
+                        iterator = createRecordIterator(seek);
+                        i--;
                     }
                 }
-                long nextOffset = innerOffset.getOffset() + i;
-                boolean isFinished = !iterator.hasNext();
-                return FetchData.createStreamFetch(readContents, new 
PaimonOffset(nextOffset), isFinished);
+                PaimonOffset next = innerOffset.advance(advance);
+                offsets.put(paimonPartition, next);
+                boolean isFinished = sourceMode != SourceMode.STREAM && 
!iterator.hasNext();
+                return FetchData.createStreamFetch(readContents, next, 
isFinished);
             default:
                 throw new GeaFlowDSLException("Paimon not support window:{}", 
windowInfo.getType());
         }
     }
 
+    /**
+     * Create a paimon record iterator.
+     * @param split the paimon data split
+     * @return a paimon record iterator
+     * @throws IOException if error occurs
+     */
+    private CloseableIterator<InternalRow> createRecordIterator(Split split) 
throws IOException {
+        CloseableIterator<InternalRow> iterator = iterators.get(split);
+        if (iterator != null) {
+            return iterator;
+        }
+        RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(split);
+        CloseableIterator<InternalRow> closeableIterator = 
reader.toCloseableIterator();
+        readers.put(split, reader);
+        iterators.put(split, closeableIterator);
+        return closeableIterator;
+    }
+
+    /**
+     * Remove the paimon record iterator and reader.
+     * @param split the paimon data split
+     * @throws Exception if error occurs
+     */
+    private void removeRecordIterator(Split split) throws Exception {
+        CloseableIterator<InternalRow> removed = iterators.remove(split);
+        if (removed != null) {
+            removed.close();
+        }
+        RecordReader<InternalRow> reader = readers.remove(split);
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    /**
+     * Load splits from snapshot.
+     * @param snapshotId the snapshot id
+     * @return all splits
+     */
+    private List<Split> loadSplitsFrom(long snapshotId) {
+        StreamTableScan streamTableScan = readBuilder.newStreamScan();
+        streamTableScan.restore(snapshotId);
+        long start = System.currentTimeMillis();
+        List<Split> splits = streamTableScan.plan().splits();
+        LOGGER.debug("Load splits from snapshot: {}, cost: {}ms", snapshotId, 
System.currentTimeMillis() - start);
+        return splits;
+    }
+
+    /**
+     * Get paimon catalog.
+     * @return the paimon catalog.
+     */
+    private Catalog getPaimonCatalog() {
+        CatalogContext catalogContext;
+        if (StringUtils.isBlank(this.path)) {
+            if (StringUtils.isBlank(this.configJson)) {
+                catalogContext =
+                        Objects.requireNonNull(CatalogContext.create(new 
Options(options)));
+            } else {
+                org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+                for (Map.Entry<String, String> entry : configs.entrySet()) {
+                    hadoopConf.set(entry.getKey(), entry.getValue());
+                }
+                catalogContext =
+                        Objects.requireNonNull(CatalogContext.create(new 
Options(options), hadoopConf));
+            }
+        } else {
+            catalogContext = Objects.requireNonNull(CatalogContext.create(new 
Path(path)));
+        }
+        return 
Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext));
+    }
+
     @Override
     public void close() {
-        for (RecordReader reader : partition2Reader.values()) {
+        for (CloseableIterator<InternalRow> reader : iterators.values()) {
+            if (reader != null) {
+                try {
+                    reader.close();
+                } catch (Exception e) {
+                    throw new GeaFlowDSLException("Error occurs when close 
paimon iterator.", e);
+                }
+            }
+        }
+        for ( RecordReader<InternalRow> reader : readers.values()) {
             if (reader != null) {
                 try {
                     reader.close();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     throw new GeaFlowDSLException("Error occurs when close 
paimon reader.", e);
                 }
             }
         }
-        partition2Reader.clear();
-        partition2InnerOffset.clear();
+        iterators.clear();
+        offsets.clear();
     }
 
     public static class PaimonPartition implements Partition {
 
         private final String database;
         private final String table;
-        private final Split split;
 
-        public PaimonPartition(String database, String table, Split split) {
+        private int index;
+        private int parallel;
+
+        // current snapshot id
+        private transient long currentSnapshot;
+        // assigned splits for this partition
+        private final transient List<Split> splits;
+
+        public PaimonPartition(String database, String table) {
             this.database = Objects.requireNonNull(database);
             this.table = Objects.requireNonNull(table);
-            this.split = Objects.requireNonNull(split);
+            this.splits = new ArrayList<>();
+            this.currentSnapshot = -1L;
+            this.index = 0;
+            this.parallel = 1;
+        }
+
+        public void reset(List<Split> splits) {
+            this.splits.clear();
+            for (Split split : splits) {
+                DataSplit dataSplit = (DataSplit) split;
+                int hash = Objects.hash(dataSplit.bucket(), 
dataSplit.partition());
+                if (hash % parallel == index) {
+                    this.splits.add(split);
+                    this.currentSnapshot = dataSplit.snapshotId();
+                }
+            }
+            if (!this.splits.isEmpty()) {
+                LOGGER.info("Assign paimon split(s) for table {}.{}, snapshot: 
{}, split size: {}",
+                        database, table, this.currentSnapshot, 
this.splits.size());
+            }
         }
 
         @Override
@@ -227,9 +378,39 @@ public class PaimonTableSource implements TableSource {
             return database + "-" + table;
         }
 
+        public String getDatabase() {
+            return database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public long getCurrentSnapshot() {
+            return currentSnapshot;
+        }
+
+        /**
+         * Seek the split by index.
+         * @param splitIndex the split index
+         * @return the split to read
+         */
+        public Split seek(int splitIndex) {
+            if (splitIndex >= splits.size()) {
+                return null;
+            }
+            return splits.get(splitIndex);
+        }
+
+        @Override
+        public void setIndex(int index, int parallel) {
+            this.parallel = parallel;
+            this.index = index;
+        }
+
         @Override
         public int hashCode() {
-            return Objects.hash(database, table, split);
+            return Objects.hash(database, table, index);
         }
 
         @Override
@@ -242,44 +423,50 @@ public class PaimonTableSource implements TableSource {
             }
             PaimonPartition that = (PaimonPartition) o;
             return Objects.equals(database, that.database) && Objects.equals(
-                table, that.table) && Objects.equals(
-                split, that.split);
-        }
-
-        public String getDatabase() {
-            return database;
-        }
-
-        public String getTable() {
-            return table;
-        }
-
-        public Split getSplit() {
-            return split;
-        }
-
-        @Override
-        public void setIndex(int index, int parallel) {
+                    table, that.table) && Objects.equals(
+                    index, that.index);
         }
     }
 
-
     public static class PaimonOffset implements Offset {
 
+        // if partition snapshot id is not equal to current snapshot id,
+        // need to load splits
+        private final long snapshotId;
+        // the index of current partition assigned splits
+        private final int splitIndex;
+        // the offset of current split
         private final long offset;
 
+        public static PaimonOffset from(PaimonOffset offset) {
+            return new PaimonOffset(offset.snapshotId, offset.splitIndex, 
offset.offset);
+        }
 
         public PaimonOffset() {
-            this.offset = 0L;
+            this(1L);
+        }
+
+        public PaimonOffset(long snapshotId) {
+            this(snapshotId, 0, 0);
         }
 
-        public PaimonOffset(long offset) {
+        public PaimonOffset(long snapshotId, int splitIndex, long offset) {
+            this.snapshotId = snapshotId;
+            this.splitIndex = splitIndex;
             this.offset = offset;
         }
 
         @Override
         public String humanReadable() {
-            return String.valueOf(offset);
+            return String.format("snapshot %d, split index %d, offset %d", 
snapshotId, splitIndex, offset);
+        }
+
+        public long getSnapshotId() {
+            return snapshotId;
+        }
+
+        public int getSplitIndex() {
+            return splitIndex;
         }
 
         @Override
@@ -287,6 +474,21 @@ public class PaimonTableSource implements TableSource {
             return offset;
         }
 
+        public PaimonOffset advance(long rows) {
+            if (rows == 0) {
+                return this;
+            }
+            return new PaimonOffset(snapshotId, splitIndex, offset + rows);
+        }
+
+        public PaimonOffset nextSplit() {
+            return new PaimonOffset(snapshotId, splitIndex + 1, 0);
+        }
+
+        public PaimonOffset nextSnapshot() {
+            return new PaimonOffset(snapshotId + 1, 0, 0);
+        }
+
         @Override
         public boolean isTimestamp() {
             return false;
@@ -301,12 +503,12 @@ public class PaimonTableSource implements TableSource {
                 return false;
             }
             PaimonOffset that = (PaimonOffset) o;
-            return offset == that.offset;
+            return snapshotId == that.snapshotId && splitIndex == 
that.splitIndex && offset == that.offset;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(offset);
+            return Objects.hash(snapshotId, splitIndex, offset);
         }
     }
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/SourceMode.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/SourceMode.java
new file mode 100644
index 000000000..637de0b0e
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/SourceMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+/** Paimon source mode. */
+public enum SourceMode {
+    BATCH,
+    STREAM
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/StartupMode.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/StartupMode.java
new file mode 100644
index 000000000..56182aab7
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/StartupMode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+public enum StartupMode {
+    LATEST(
+            "latest",
+            "Read changes starting from the latest snapshot."),
+
+    FROM_SNAPSHOT(
+            "from-snapshot",
+            "For streaming sources, continuously reads changes starting from 
snapshot "
+                    + "specified by \"scan.snapshot-id\", without producing a 
snapshot at the beginning. "
+                    + "For batch sources, produces a snapshot specified by 
\"scan.snapshot-id\" "
+                    + "but does not read new changes.");
+
+    private final String value;
+    private final String description;
+
+    StartupMode(String value, String description) {
+        this.value = value;
+        this.description = description;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public String toString() {
+        return value;
+    }
+
+
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
index 84746b465..edd36ab51 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
@@ -32,20 +32,21 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.common.config.keys.ConnectorConfigKeys;
+import org.apache.geaflow.common.tuple.Tuple;
 import org.apache.geaflow.common.type.Types;
-import org.apache.geaflow.dsl.common.data.Row;
 import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
 import org.apache.geaflow.dsl.common.types.StructType;
 import org.apache.geaflow.dsl.common.types.TableField;
 import org.apache.geaflow.dsl.common.types.TableSchema;
 import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
 import org.apache.geaflow.dsl.connector.api.Partition;
 import org.apache.geaflow.dsl.connector.api.TableConnector;
 import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
 import org.apache.geaflow.dsl.connector.api.TableSource;
-import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
 import org.apache.geaflow.dsl.connector.api.util.ConnectorFactory;
 import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow;
+import org.apache.geaflow.dsl.connector.api.window.SizeFetchWindow;
 import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
@@ -64,11 +65,28 @@ import org.apache.paimon.types.DoubleType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.VarCharType;
 import org.testng.Assert;
+import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
+import org.testng.collections.Lists;
 
 public class PaimonTableConnectorTest {
 
+    String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
+    String db = "paimon_db";
+
+    GenericRow record1 = GenericRow.of(1, BinaryString.fromString("a1"), 10.0);
+    GenericRow record2 = GenericRow.of(2, BinaryString.fromString("ab"), 12.0);
+    GenericRow record3 = GenericRow.of(3, BinaryString.fromString("a3"), 12.0);
+    GenericRow record4 = GenericRow.of(4, BinaryString.fromString("bcd"), 
15.0);
+    GenericRow record5 = GenericRow.of(5, BinaryString.fromString("a5"), 10.0);
+    GenericRow record6 = GenericRow.of(6, BinaryString.fromString("s1"), 9.0);
+    GenericRow record7 = GenericRow.of(7, BinaryString.fromString("sb"), 20.0);
+    GenericRow record8 = GenericRow.of(8, BinaryString.fromString("s3"), 16.0);
+    GenericRow record9 = GenericRow.of(9, BinaryString.fromString("bad"), 
12.0);
+    GenericRow record10 = GenericRow.of(10, BinaryString.fromString("aa5"), 
11.0);
+    GenericRow record11 = GenericRow.of(11, BinaryString.fromString("x11"), 
11.2);
+
     private final StructType dataSchema = new StructType(
         new TableField("id", Types.INTEGER, false),
         new TableField("name", Types.BINARY_STRING),
@@ -84,39 +102,37 @@ public class PaimonTableConnectorTest {
 
     @BeforeTest
     public void prepare() {
-        String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
         FileUtils.deleteQuietly(new File(tmpDir));
-        String db = "paimon_db";
-        String tableName = "paimon_table";
+    }
+
+    @AfterTest
+    public void clean() {
+        FileUtils.deleteQuietly(new File(tmpDir));
+    }
+
+    public void createSnapshot(String tableName, List<GenericRow> rows) {
         CatalogContext catalogContext =
-            Objects.requireNonNull(CatalogContext.create(new Path(tmpDir)));
+                Objects.requireNonNull(CatalogContext.create(new 
Path(tmpDir)));
         Catalog catalog = 
Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext));
         try {
-            catalog.createDatabase(db, false);
+            catalog.createDatabase(db, true);
             List<String> dbs = catalog.listDatabases();
             assert dbs.get(0).equals(db);
             Identifier identifier = new Identifier(db, tableName);
             catalog.createTable(identifier,
-                Schema.newBuilder()
-                    .column("id", new IntType())
-                    .column("name", new VarCharType(256))
-                    .column("price", new DoubleType())
-                    .build(), false);
+                    Schema.newBuilder()
+                            .column("id", new IntType())
+                            .column("name", new VarCharType(256))
+                            .column("price", new DoubleType())
+                            .build(), true);
             List<String> tables = catalog.listTables(dbs.get(0));
-            assert tables.get(0).equals(tableName);
+            assert tables.contains(tableName);
             Table table = catalog.getTable(identifier);
             BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
             BatchTableWrite write = writeBuilder.newWrite();
-            GenericRow record1 = GenericRow.of(1, 
BinaryString.fromString("a1"), 10.0);
-            GenericRow record2 = GenericRow.of(2, 
BinaryString.fromString("ab"), 12.0);
-            GenericRow record3 = GenericRow.of(3, 
BinaryString.fromString("a3"), 12.0);
-            GenericRow record4 = GenericRow.of(4, 
BinaryString.fromString("bcd"), 15.0);
-            GenericRow record5 = GenericRow.of(5, 
BinaryString.fromString("a5"), 10.0);
-            write.write(record1);
-            write.write(record2);
-            write.write(record3);
-            write.write(record4);
-            write.write(record5);
+            for (GenericRow row : rows) {
+                write.write(row);
+            }
             List<CommitMessage> messages = write.prepareCommit();
             BatchTableCommit commit = writeBuilder.newCommit();
             commit.commit(messages);
@@ -126,43 +142,246 @@ public class PaimonTableConnectorTest {
     }
 
     @Test
-    public void testReadPaimon() throws IOException {
-        String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
-        String db = "paimon_db";
-        String table = "paimon_table";
+    public void testReadPaimonStreamMode() throws IOException {
+        String table = "paimon_stream_table";
+
+        Tuple<TableSource, Configuration> tuple = createTableSource(table, 
true);
+        TableSource tableSource = tuple.f0;
+        Configuration tableConf = tuple.f1;
+
+        tableSource.init(tableConf, tableSchema);
+
+        // create snapshot 1
+        createSnapshot(table, Lists.newArrayList(record1, record2, record3, 
record4, record5));
+
+        tableSource.open(new DefaultRuntimeContext(tableConf));
+
+        List<Partition> partitions = tableSource.listPartitions(1);
+
+        Offset nextOffset = null;
+
+        List<Object> readRows = new ArrayList<>();
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(0L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertFalse(rows.isFinish());
+            nextOffset = rows.getNextOffset();
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[1, a1, 10.0]\n"
+                        + "[2, ab, 12.0]\n"
+                        + "[3, a3, 12.0]\n"
+                        + "[4, bcd, 15.0]");
+        readRows.clear();
+
+        // create snapshot 2
+        createSnapshot(table, Lists.newArrayList(record6, record7, record8, 
record9, record10));
+
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(1L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertFalse(rows.isFinish());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[5, a5, 10.0]\n"
+                        + "[6, s1, 9.0]\n"
+                        + "[7, sb, 20.0]\n"
+                        + "[8, s3, 16.0]");
+        readRows.clear();
+
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(2L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertFalse(rows.isFinish());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                        "[9, bad, 12.0]\n"
+                                + "[10, aa5, 11.0]");
+        readRows.clear();
+
+        // no new snapshot
+        for (int i = 0; i < 2; i++) {
+            for (Partition partition : partitions) {
+                FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(3L, 4));
+                while (rows.getDataIterator().hasNext()) {
+                    readRows.add(rows.getDataIterator().next());
+                }
+                Assert.assertFalse(rows.isFinish());
+                Assert.assertTrue(readRows.isEmpty());
+            }
+        }
+
+        // create snapshot 3
+        createSnapshot(table, Lists.newArrayList(record11));
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(3L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertFalse(rows.isFinish());
+            Assert.assertFalse(readRows.isEmpty());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[11, x11, 11.2]");
+        readRows.clear();
+
+        // test restore from offset
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.of(nextOffset), new SizeFetchWindow(4L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertFalse(rows.isFinish());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[5, a5, 10.0]\n"
+                        + "[6, s1, 9.0]\n"
+                        + "[7, sb, 20.0]\n"
+                        + "[8, s3, 16.0]");
+    }
+
+    @Test
+    public void testReadPaimonBatchMode() throws IOException {
+        String table = "paimon_batch_table";
+
+        Tuple<TableSource, Configuration> tuple = createTableSource(table, 
false);
+        TableSource tableSource = tuple.f0;
+        Configuration tableConf = tuple.f1;
+        tableSource.init(tableConf, tableSchema);
+
+        createSnapshot(table, Lists.newArrayList(record1, record2, record3, 
record4, record5));
+
+        tableSource.open(new DefaultRuntimeContext(tableConf));
+
+        List<Partition> partitions = tableSource.listPartitions(1);
+
+        List<Object> readRows = new ArrayList<>();
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(0L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertFalse(rows.isFinish());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[1, a1, 10.0]\n"
+                        + "[2, ab, 12.0]\n"
+                        + "[3, a3, 12.0]\n"
+                        + "[4, bcd, 15.0]");
+        readRows.clear();
+
+        createSnapshot(table, Lists.newArrayList(record6, record7, record8, 
record9, record10));
+
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new SizeFetchWindow(1L, 4));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertTrue(rows.isFinish());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[5, a5, 10.0]");
+        readRows.clear();
+    }
+
+    @Test
+    public void testReadPaimonFromSnapshot() throws IOException {
+        String table = "paimon_batch_table_2";
 
         TableConnector tableConnector = 
ConnectorFactory.loadConnector("PAIMON");
         Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT), 
"paimon");
         TableReadableConnector readableConnector = (TableReadableConnector) 
tableConnector;
-
-        Map<String, String> tableConfMap = new HashMap<>();
-        tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(), 
tmpDir);
-        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(), 
tmpDir);
-        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(), 
db);
-        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(), 
table);
+        Map<String, String> tableConfMap = getTableConf(table).getConfigMap();
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE.getKey(), 
SourceMode.BATCH.name());
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_MODE.getKey(), 
"From-Snapshot");
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID.getKey(), 
"1");
         Configuration tableConf = new Configuration(tableConfMap);
         TableSource tableSource = readableConnector.createSource(tableConf);
         tableSource.init(tableConf, tableSchema);
 
+        createSnapshot(table, Lists.newArrayList(record1, record2, record3, 
record4, record5));
+        createSnapshot(table, Lists.newArrayList(record6, record7, record8, 
record9, record10));
+
         tableSource.open(new DefaultRuntimeContext(tableConf));
 
-        List<Partition> partitions = tableSource.listPartitions();
+        List<Partition> partitions = tableSource.listPartitions(1);
 
-        TableDeserializer deserializer = 
tableSource.getDeserializer(tableConf);
-        deserializer.init(tableConf, tableSchema);
-        List<Row> readRows = new ArrayList<>();
+        List<Object> readRows = new ArrayList<>();
         for (Partition partition : partitions) {
-            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new AllFetchWindow(-1L));
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new AllFetchWindow(0L));
             while (rows.getDataIterator().hasNext()) {
-                
readRows.addAll(deserializer.deserialize(rows.getDataIterator().next()));
+                readRows.add(rows.getDataIterator().next());
             }
+            Assert.assertTrue(rows.isFinish());
         }
         Assert.assertEquals(StringUtils.join(readRows, "\n"),
-            "[1, a1, 10.0]\n"
-                + "[2, ab, 12.0]\n"
-                + "[3, a3, 12.0]\n"
-                + "[4, bcd, 15.0]\n"
-                + "[5, a5, 10.0]");
+                "[1, a1, 10.0]\n"
+                        + "[2, ab, 12.0]\n"
+                        + "[3, a3, 12.0]\n"
+                        + "[4, bcd, 15.0]\n"
+                        + "[5, a5, 10.0]");
+    }
+
+    @Test
+    public void testReadPaimonFromLastesSnapshot() throws IOException {
+        String table = "paimon_batch_table_3";
+
+        TableConnector tableConnector = 
ConnectorFactory.loadConnector("PAIMON");
+        Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT), 
"paimon");
+        TableReadableConnector readableConnector = (TableReadableConnector) 
tableConnector;
+        Configuration tableConf = getTableConf(table);
+        
tableConf.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE.getKey(), 
SourceMode.BATCH.name());
+        TableSource tableSource = readableConnector.createSource(tableConf);
+        tableSource.init(tableConf, tableSchema);
+
+        createSnapshot(table, Lists.newArrayList(record1, record2, record3, 
record4, record5));
+        createSnapshot(table, Lists.newArrayList(record6, record7, record8, 
record9, record10));
+
+        tableSource.open(new DefaultRuntimeContext(tableConf));
+
+        List<Partition> partitions = tableSource.listPartitions(1);
+
+        List<Object> readRows = new ArrayList<>();
+        for (Partition partition : partitions) {
+            FetchData<Object> rows = tableSource.fetch(partition, 
Optional.empty(), new AllFetchWindow(0L));
+            while (rows.getDataIterator().hasNext()) {
+                readRows.add(rows.getDataIterator().next());
+            }
+            Assert.assertTrue(rows.isFinish());
+        }
+        Assert.assertEquals(StringUtils.join(readRows, "\n"),
+                "[6, s1, 9.0]\n"
+                    + "[7, sb, 20.0]\n"
+                    + "[8, s3, 16.0]\n"
+                    + "[9, bad, 12.0]\n"
+                    + "[10, aa5, 11.0]");
+    }
+
+    private Configuration getTableConf(String table) {
+        Map<String, String> tableConfMap = new HashMap<>();
+        tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(), 
tmpDir);
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(), 
tmpDir);
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(), 
db);
+        
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(), 
table);
+        return new Configuration(tableConfMap);
+    }
+
+    private Tuple<TableSource, Configuration> createTableSource(String table, 
boolean streamMode) {
+        TableConnector tableConnector = 
ConnectorFactory.loadConnector("PAIMON");
+        Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT), 
"paimon");
+        TableReadableConnector readableConnector = (TableReadableConnector) 
tableConnector;
+        Map<String, String> tableConfMap = getTableConf(table).getConfigMap();
+        if (streamMode) {
+            
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE.getKey(), 
SourceMode.STREAM.name());
+        }
+        Configuration tableConf = new Configuration(tableConfMap);
+        return Tuple.of(readableConnector.createSource(tableConf), tableConf);
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to