This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ff1380d5 [flink] Replace FileStoreTable to public Table in flink 
sources (#730)
7ff1380d5 is described below

commit 7ff1380d5b5332b145125139567e89632db47c33
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 29 13:40:53 2023 +0800

    [flink] Replace FileStoreTable to public Table in flink sources (#730)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   4 +
 .../paimon/table/source/BatchDataTableScan.java    |  12 ---
 .../paimon/table/source/StreamDataTableScan.java   |  52 ----------
 .../paimon/table/source/TableStreamingReader.java  |  32 +++----
 .../paimon/flink/AbstractFlinkTableFactory.java    |  30 +++---
 .../org/apache/paimon/flink/DataCatalogTable.java  |   7 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  19 ++--
 .../apache/paimon/flink/action/CompactAction.java  |   6 ++
 .../flink/lookup/FileStoreLookupFunction.java      |  37 ++++---
 .../apache/paimon/flink/sink/FlinkTableSink.java   |  14 +--
 .../flink/source/CompactorSourceBuilder.java       |  37 +++----
 .../source/ContinuousFileSplitEnumerator.java      |  10 +-
 .../flink/source/ContinuousFileStoreSource.java    |  40 ++++----
 .../paimon/flink/source/DataTableSource.java       |  21 ++--
 .../paimon/flink/source/FlinkSourceBuilder.java    |  26 +++--
 .../paimon/flink/source/FlinkTableSource.java      |   2 +-
 .../paimon/flink/source/SimpleSystemSource.java    |  59 ------------
 .../paimon/flink/source/StaticFileStoreSource.java |  55 ++++-------
 .../source/StaticFileStoreSplitEnumerator.java     |   2 +-
 .../paimon/flink/source/SystemTableSource.java     |  32 +++----
 .../apache/paimon/flink/utils/TableScanUtils.java  | 106 +++++++++++++++++++++
 .../org/apache/paimon/flink/LookupJoinITCase.java  |   5 +-
 .../apache/paimon/flink/ReadWriteTableITCase.java  |   4 +-
 23 files changed, 273 insertions(+), 339 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 8feb424a9..149ca6f24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -544,6 +544,10 @@ public class CoreOptions implements Serializable {
         this.options = options;
     }
 
+    public static CoreOptions fromMap(Map<String, String> options) {
+        return new CoreOptions(options);
+    }
+
     public Options toConfiguration() {
         return options;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
index e2839253e..a25f2dfae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
@@ -20,12 +20,9 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.utils.Filter;
 
-import java.io.Serializable;
-
 /** {@link DataTableScan} for batch planning. */
 public interface BatchDataTableScan extends DataTableScan {
 
@@ -42,13 +39,4 @@ public interface BatchDataTableScan extends DataTableScan {
     BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter);
 
     BatchDataTableScan withStartingScanner(StartingScanner startingScanner);
-
-    // ------------------------------------------------------------------------
-    //  factory interface
-    // ------------------------------------------------------------------------
-
-    /** Factory to create {@link BatchDataTableScan}. */
-    interface Factory extends Serializable {
-        BatchDataTableScan create(DataTable dataTable);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index 471c8428d..4d60c4402 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -18,21 +18,13 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.utils.Filter;
 
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
 /** {@link DataTableScan} for streaming planning. */
 public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan {
 
@@ -57,48 +49,4 @@ public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan
     StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker);
 
     StreamDataTableScan withSnapshotStarting();
-
-    static void validate(TableSchema schema) {
-        CoreOptions options = new CoreOptions(schema.options());
-        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-        HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
-                new HashMap<CoreOptions.MergeEngine, String>() {
-                    {
-                        put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial 
update");
-                        put(CoreOptions.MergeEngine.AGGREGATE, 
"Pre-aggregate");
-                    }
-                };
-        if (schema.primaryKeys().size() > 0 && 
mergeEngineDesc.containsKey(mergeEngine)) {
-            switch (options.changelogProducer()) {
-                case NONE:
-                case INPUT:
-                    throw new RuntimeException(
-                            mergeEngineDesc.get(mergeEngine)
-                                    + " continuous reading is not supported. 
You can use "
-                                    + "'lookup' or 'full-compaction' changelog 
producer to support streaming reading.");
-                default:
-            }
-        }
-    }
-
-    // ------------------------------------------------------------------------
-    //  factory interface
-    // ------------------------------------------------------------------------
-
-    /** Factory to create {@link StreamDataTableScan}. */
-    interface Factory extends Serializable {
-
-        StreamDataTableScan create(DataTable dataTable, @Nullable Long 
nextSnapshotId);
-    }
-
-    /** A default {@link Factory} to create {@link StreamDataTableScan}. */
-    class DefaultFactory implements Factory {
-
-        @Override
-        public StreamDataTableScan create(DataTable dataTable, @Nullable Long 
nextSnapshotId) {
-            StreamDataTableScan scan = dataTable.newStreamScan();
-            scan.restore(nextSnapshotId);
-            return scan;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
index 28c2d9fed..d1f2e1dff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
@@ -23,7 +23,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
 import org.apache.paimon.reader.RecordReaderIterator;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
 import org.apache.paimon.utils.TypeUtils;
 
@@ -44,21 +44,16 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
 /** A streaming reader to read table. */
 public class TableStreamingReader {
 
-    private final FileStoreTable table;
-    private final int[] projection;
-    @Nullable private final Predicate predicate;
+    private final ReadBuilder readBuilder;
     @Nullable private final PredicateFilter recordFilter;
     private final StreamDataTableScan scan;
 
-    public TableStreamingReader(
-            FileStoreTable table, int[] projection, @Nullable Predicate 
predicate) {
-        this.table = table;
-        this.projection = projection;
-        this.predicate = predicate;
+    public TableStreamingReader(Table table, int[] projection, @Nullable 
Predicate predicate) {
+        this.readBuilder = 
table.newReadBuilder().withProjection(projection).withFilter(predicate);
 
         if (predicate != null) {
-            List<String> fieldNames = table.schema().fieldNames();
-            List<String> primaryKeys = table.schema().primaryKeys();
+            List<String> fieldNames = table.rowType().getFieldNames();
+            List<String> primaryKeys = table.primaryKeys();
 
             // for pk table: only filter by pk, the stream is upsert instead 
of changelog
             // for non-pk table: filter all
@@ -71,20 +66,18 @@ public class TableStreamingReader {
                     };
 
             int[] fieldIdxToProjectionIdx =
-                    IntStream.range(0, 
table.schema().fields().size()).map(operator).toArray();
+                    IntStream.range(0, 
table.rowType().getFieldCount()).map(operator).toArray();
 
             this.recordFilter =
                     new PredicateFilter(
-                            TypeUtils.project(table.schema().logicalRowType(), 
projection),
+                            TypeUtils.project(table.rowType(), projection),
                             transformFieldMapping(predicate, 
fieldIdxToProjectionIdx).orElse(null));
         } else {
             recordFilter = null;
         }
 
-        scan = table.newStreamScan().withSnapshotStarting();
-        if (predicate != null) {
-            scan.withFilter(predicate);
-        }
+        scan = (StreamDataTableScan) readBuilder.newStreamScan();
+        scan.withSnapshotStarting();
     }
 
     public Iterator<InternalRow> nextBatch() throws Exception {
@@ -97,10 +90,7 @@ public class TableStreamingReader {
     }
 
     private Iterator<InternalRow> read(DataFilePlan plan) throws IOException {
-        InnerTableRead read = table.newRead().withProjection(projection);
-        if (predicate != null) {
-            read.withFilter(predicate);
-        }
+        TableRead read = readBuilder.newRead();
 
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
         for (DataSplit split : plan.splits) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index c12a5b0b2..b7ea563ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -28,9 +28,8 @@ import org.apache.paimon.flink.source.DataTableSource;
 import org.apache.paimon.flink.source.SystemTableSource;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -70,15 +69,11 @@ public abstract class AbstractFlinkTableFactory
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
         if (origin instanceof SystemCatalogTable) {
-            int splitBatchSize =
-                    Options.fromMap(origin.getOptions())
-                            
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
-            return new SystemTableSource(
-                    ((SystemCatalogTable) origin).table(), isStreamingMode, 
splitBatchSize);
+            return new SystemTableSource(((SystemCatalogTable) 
origin).table(), isStreamingMode);
         } else {
             return new DataTableSource(
                     context.getObjectIdentifier(),
-                    buildFileStoreTable(context),
+                    buildPaimonTable(context),
                     isStreamingMode,
                     context,
                     createOptionalLogStoreFactory(context).orElse(null));
@@ -89,7 +84,7 @@ public abstract class AbstractFlinkTableFactory
     public DynamicTableSink createDynamicTableSink(Context context) {
         return new FlinkTableSink(
                 context.getObjectIdentifier(),
-                buildFileStoreTable(context),
+                buildPaimonTable(context),
                 context,
                 createOptionalLogStoreFactory(context).orElse(null));
     }
@@ -144,16 +139,15 @@ public abstract class AbstractFlinkTableFactory
                 context.getCatalogTable().getOptions(), 
context.getConfiguration());
     }
 
-    static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context 
context) {
+    static Table buildPaimonTable(DynamicTableFactory.Context context) {
         CatalogTable origin = context.getCatalogTable().getOrigin();
-        FileStoreTable table;
+        Table table;
         if (origin instanceof DataCatalogTable) {
             table = ((DataCatalogTable) 
origin).table().copy(origin.getOptions());
         } else {
             table = 
FileStoreTableFactory.create(createCatalogContext(context));
         }
 
-        TableSchema tableSchema = table.schema();
         Schema schema = 
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
 
         RowType rowType = toLogicalType(schema.rowType());
@@ -162,24 +156,24 @@ public abstract class AbstractFlinkTableFactory
 
         // compare fields to ignore the outside nullability and nested fields' 
comments
         Preconditions.checkArgument(
-                schemaEquals(toLogicalType(tableSchema.logicalRowType()), 
rowType),
+                schemaEquals(toLogicalType(table.rowType()), rowType),
                 "Flink schema and store schema are not the same, "
                         + "store schema is %s, Flink schema is %s",
-                tableSchema.logicalRowType(),
+                table.rowType(),
                 rowType);
 
         Preconditions.checkArgument(
-                tableSchema.partitionKeys().equals(partitionKeys),
+                table.partitionKeys().equals(partitionKeys),
                 "Flink partitionKeys and store partitionKeys are not the same, 
"
                         + "store partitionKeys is %s, Flink partitionKeys is 
%s",
-                tableSchema.partitionKeys(),
+                table.partitionKeys(),
                 partitionKeys);
 
         Preconditions.checkArgument(
-                tableSchema.primaryKeys().equals(primaryKeys),
+                table.primaryKeys().equals(primaryKeys),
                 "Flink primaryKeys and store primaryKeys are not the same, "
                         + "store primaryKeys is %s, Flink primaryKeys is %s",
-                tableSchema.primaryKeys(),
+                table.primaryKeys(),
                 primaryKeys);
 
         return table;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
index 6b33ac501..1dd62cb2c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -33,10 +34,10 @@ import java.util.Map;
 /** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
 public class DataCatalogTable extends CatalogTableImpl {
 
-    private final FileStoreTable table;
+    private final Table table;
 
     public DataCatalogTable(
-            FileStoreTable table,
+            Table table,
             TableSchema tableSchema,
             List<String> partitionKeys,
             Map<String, String> properties,
@@ -45,7 +46,7 @@ public class DataCatalogTable extends CatalogTableImpl {
         this.table = table;
     }
 
-    public FileStoreTable table() {
+    public Table table() {
         return table;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 618c4f735..01dc89d50 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -166,7 +166,7 @@ public class FlinkCatalog extends AbstractCatalog {
         }
 
         if (table instanceof FileStoreTable) {
-            return toCatalogTable((FileStoreTable) table);
+            return toCatalogTable(table);
         } else {
             return new SystemCatalogTable(table);
         }
@@ -314,9 +314,9 @@ public class FlinkCatalog extends AbstractCatalog {
         }
     }
 
-    private CatalogTableImpl toCatalogTable(FileStoreTable table) {
+    private CatalogTableImpl toCatalogTable(Table table) {
         TableSchema schema;
-        Map<String, String> newOptions = new 
HashMap<>(table.schema().options());
+        Map<String, String> newOptions = new HashMap<>(table.options());
 
         // try to read schema from options
         // in the case of virtual columns and watermark
@@ -334,23 +334,18 @@ public class FlinkCatalog extends AbstractCatalog {
             removeProperties.asMap().keySet().forEach(newOptions::remove);
         } else {
             TableSchema.Builder builder = TableSchema.builder();
-            for (RowType.RowField field :
-                    
toLogicalType(table.schema().logicalRowType()).getFields()) {
+            for (RowType.RowField field : 
toLogicalType(table.rowType()).getFields()) {
                 builder.field(field.getName(), 
fromLogicalToDataType(field.getType()));
             }
-            if (table.schema().primaryKeys().size() > 0) {
-                builder.primaryKey(table.schema().primaryKeys().toArray(new 
String[0]));
+            if (table.primaryKeys().size() > 0) {
+                builder.primaryKey(table.primaryKeys().toArray(new String[0]));
             }
 
             schema = builder.build();
         }
 
         return new DataCatalogTable(
-                table,
-                schema,
-                table.schema().partitionKeys(),
-                newOptions,
-                table.schema().comment());
+                table, schema, table.partitionKeys(), newOptions, 
table.comment().orElse(""));
     }
 
     public static Schema fromCatalogTable(CatalogTable catalogTable) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 5b7c329e1..d21d5e704 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -53,6 +53,12 @@ public class CompactAction extends ActionBase {
 
     CompactAction(String warehouse, String database, String tableName) {
         super(warehouse, database, tableName, new 
Options().set(CoreOptions.WRITE_ONLY, false));
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports compact action. The 
table type is '%s'.",
+                            table.getClass().getName()));
+        }
 
         sourceBuilder =
                 new CompactorSourceBuilder(identifier.getFullName(), 
(FileStoreTable) table);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 748850284..629f6effd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -22,12 +22,11 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.TableStreamingReader;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
@@ -70,7 +69,7 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreLookupFunction.class);
 
-    private final FileStoreTable table;
+    private final Table table;
     private final List<String> projectFields;
     private final List<String> joinKeys;
     @Nullable private final Predicate predicate;
@@ -85,31 +84,27 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     private transient TableStreamingReader streamingReader;
 
     public FileStoreLookupFunction(
-            FileStoreTable table,
-            int[] projection,
-            int[] joinKeyIndex,
-            @Nullable Predicate predicate) {
-        TableSchema schema = table.schema();
+            Table table, int[] projection, int[] joinKeyIndex, @Nullable 
Predicate predicate) {
         checkArgument(
-                schema.partitionKeys().isEmpty(), "Currently only support 
non-partitioned table.");
-        checkArgument(schema.primaryKeys().size() > 0, "Currently only support 
primary key table.");
-        StreamDataTableScan.validate(table.schema());
+                table.partitionKeys().isEmpty(), "Currently only support 
non-partitioned table.");
+        checkArgument(table.primaryKeys().size() > 0, "Currently only support 
primary key table.");
+        TableScanUtils.streamingReadingValidate(table);
 
         this.table = table;
 
         // join keys are based on projection fields
         this.joinKeys =
                 Arrays.stream(joinKeyIndex)
-                        .mapToObj(i -> schema.fieldNames().get(projection[i]))
+                        .mapToObj(i -> 
table.rowType().getFieldNames().get(projection[i]))
                         .collect(Collectors.toList());
 
         this.projectFields =
                 Arrays.stream(projection)
-                        .mapToObj(i -> schema.fieldNames().get(i))
+                        .mapToObj(i -> table.rowType().getFieldNames().get(i))
                         .collect(Collectors.toList());
 
         // add primary keys
-        for (String field : schema.primaryKeys()) {
+        for (String field : table.primaryKeys()) {
             if (!projectFields.contains(field)) {
                 projectFields.add(field);
             }
@@ -122,20 +117,20 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         String tmpDirectory = getTmpDirectory(context);
         this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
 
-        Options options = Options.fromMap(table.schema().options());
+        Options options = Options.fromMap(table.options());
         this.refreshInterval = 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
         this.stateFactory = new RocksDBStateFactory(path.toString(), options);
 
-        List<String> fieldNames = 
table.schema().logicalRowType().getFieldNames();
+        List<String> fieldNames = table.rowType().getFieldNames();
         int[] projection = 
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
-        RowType rowType = TypeUtils.project(table.schema().logicalRowType(), 
projection);
+        RowType rowType = TypeUtils.project(table.rowType(), projection);
 
         PredicateFilter recordFilter = createRecordFilter(projection);
         this.lookupTable =
                 LookupTable.create(
                         stateFactory,
                         rowType,
-                        table.schema().primaryKeys(),
+                        table.primaryKeys(),
                         joinKeys,
                         recordFilter,
                         options.get(LOOKUP_CACHE_ROWS));
@@ -153,13 +148,13 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             adjustedPredicate =
                     transformFieldMapping(
                                     this.predicate,
-                                    IntStream.range(0, 
table.schema().fields().size())
+                                    IntStream.range(0, 
table.rowType().getFieldCount())
                                             .map(i -> Ints.indexOf(projection, 
i))
                                             .toArray())
                             .orElse(null);
         }
         return new PredicateFilter(
-                TypeUtils.project(table.schema().logicalRowType(), 
projection), adjustedPredicate);
+                TypeUtils.project(table.rowType(), projection), 
adjustedPredicate);
     }
 
     public Collection<RowData> lookup(RowData keyRow) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 4d20118f8..6b983265e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
 import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -56,7 +57,7 @@ import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite, 
SupportsPartitioning {
 
     private final ObjectIdentifier tableIdentifier;
-    private final FileStoreTable table;
+    private final Table table;
     private final DynamicTableFactory.Context context;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
@@ -66,7 +67,7 @@ public class FlinkTableSink implements DynamicTableSink, 
SupportsOverwrite, Supp
 
     public FlinkTableSink(
             ObjectIdentifier tableIdentifier,
-            FileStoreTable table,
+            Table table,
             DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
         this.tableIdentifier = tableIdentifier;
@@ -85,7 +86,7 @@ public class FlinkTableSink implements DynamicTableSink, 
SupportsOverwrite, Supp
             // no primary key, sink all changelogs
             return requestedMode;
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
-            Options options = Options.fromMap(table.schema().options());
+            Options options = Options.fromMap(table.options());
             if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
                 return requestedMode;
             }
@@ -124,13 +125,13 @@ public class FlinkTableSink implements DynamicTableSink, 
SupportsOverwrite, Supp
             logSinkProvider = 
logStoreTableFactory.createSinkProvider(this.context, context);
         }
 
-        Options conf = Options.fromMap(table.schema().options());
+        Options conf = Options.fromMap(table.options());
         // Do not sink to log store when overwrite mode
         final LogSinkFunction logSinkFunction =
                 overwrite ? null : (logSinkProvider == null ? null : 
logSinkProvider.createSink());
         return new PaimonDataStreamSinkProvider(
                 (dataStream) ->
-                        new FlinkSinkBuilder(table)
+                        new FlinkSinkBuilder((FileStoreTable) table)
                                 .withInput(
                                         new DataStream<>(
                                                 
dataStream.getExecutionEnvironment(),
@@ -163,8 +164,7 @@ public class FlinkTableSink implements DynamicTableSink, 
SupportsOverwrite, Supp
 
     @Override
     public void applyStaticPartition(Map<String, String> partition) {
-        table.schema()
-                .partitionKeys()
+        table.partitionKeys()
                 .forEach(
                         partitionKey -> {
                             if (partition.containsKey(partitionKey)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 731d2d787..9761c4cbe 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -18,15 +18,13 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.snapshot.BoundedChecker;
-import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
-import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
-import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.BucketsTable;
 import org.apache.paimon.types.RowType;
 
@@ -93,31 +91,22 @@ public class CompactorSourceBuilder {
                                     .toArray(Predicate[]::new));
         }
 
+        ReadBuilder readBuilder = 
bucketsTable.newReadBuilder().withFilter(partitionPredicate);
         if (isContinuous) {
             return new ContinuousFileStoreSource(
-                    bucketsTable,
+                    readBuilder,
+                    bucketsTable.options(),
                     null,
-                    partitionPredicate,
-                    null,
-                    (table, nextSnapshotId) -> {
-                        StreamDataTableScan scan =
-                                table.newStreamScan()
-                                        .withStartingScanner(
-                                                new 
ContinuousCompactorStartingScanner())
-                                        .withFollowUpScanner(
-                                                new 
ContinuousCompactorFollowUpScanner())
-                                        
.withBoundedChecker(BoundedChecker.neverEnd());
-                        scan.restore(nextSnapshotId);
-                        return scan;
-                    });
+                    TableScanUtils.compactStreamScanFactory());
         } else {
             return new StaticFileStoreSource(
-                    bucketsTable,
-                    null,
-                    partitionPredicate,
+                    readBuilder,
                     null,
-                    // static compactor source will compact all current files
-                    table -> table.newScan().withStartingScanner(new 
FullStartingScanner()));
+                    bucketsTable
+                            .coreOptions()
+                            .toConfiguration()
+                            
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+                    TableScanUtils.compactBatchScanFactory());
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 0eae09631..771c5b57a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -19,9 +19,9 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
 import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -62,7 +62,7 @@ public class ContinuousFileSplitEnumerator
 
     private final FileStoreSourceSplitGenerator splitGenerator;
 
-    private final StreamDataTableScan scan;
+    private final StreamTableScan scan;
 
     @Nullable private Long nextSnapshotId;
 
@@ -73,7 +73,7 @@ public class ContinuousFileSplitEnumerator
             Collection<FileStoreSourceSplit> remainSplits,
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
-            StreamDataTableScan scan) {
+            StreamTableScan scan) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
         this.bucketSplits = new HashMap<>();
@@ -150,7 +150,7 @@ public class ContinuousFileSplitEnumerator
 
     // ------------------------------------------------------------------------
 
-    private void processDiscoveredSplits(DataFilePlan plan, Throwable error) {
+    private void processDiscoveredSplits(TableScan.Plan plan, Throwable error) 
{
         if (error != null) {
             if (error instanceof EndOfScanException) {
                 // finished
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 909a736fe..09abb116d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.flink.source;
 
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.Boundedness;
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Map;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
 
@@ -40,28 +42,22 @@ public class ContinuousFileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 3L;
 
-    private final DataTable table;
-    private final StreamDataTableScan.Factory scanFactory;
-    private final Predicate predicate;
+    private final Map<String, String> options;
+    private final TableScanUtils.StreamTableScanFactory scanFactory;
 
     public ContinuousFileStoreSource(
-            DataTable table,
-            @Nullable int[][] projectedFields,
-            @Nullable Predicate predicate,
-            @Nullable Long limit) {
-        this(table, projectedFields, predicate, limit, new 
StreamDataTableScan.DefaultFactory());
+            ReadBuilder readBuilder, Map<String, String> options, @Nullable 
Long limit) {
+        this(readBuilder, options, limit, 
TableScanUtils.defaultStreamScanFactory());
     }
 
     public ContinuousFileStoreSource(
-            DataTable table,
-            @Nullable int[][] projectedFields,
-            @Nullable Predicate predicate,
+            ReadBuilder readBuilder,
+            Map<String, String> options,
             @Nullable Long limit,
-            StreamDataTableScan.Factory scanFactory) {
-        
super(table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
 limit);
-        this.table = table;
+            TableScanUtils.StreamTableScanFactory scanFactory) {
+        super(readBuilder, limit);
+        this.options = options;
         this.scanFactory = scanFactory;
-        this.predicate = predicate;
     }
 
     @Override
@@ -84,19 +80,19 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 context,
                 splits,
                 nextSnapshotId,
-                table.coreOptions().continuousDiscoveryInterval().toMillis(),
-                scanFactory.create(table, 
nextSnapshotId).withFilter(predicate));
+                
CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
+                scanFactory.create(readBuilder, nextSnapshotId));
     }
 
     @Override
     public FileStoreSourceReader<?> createSourceReader(
             SourceReaderContext context, TableRead read, @Nullable Long limit) 
{
-        return table.coreOptions().toConfiguration().get(STREAMING_READ_ATOMIC)
+        return Options.fromMap(options).get(STREAMING_READ_ATOMIC)
                 ? new FileStoreSourceReader<>(RecordsFunction.forSingle(), 
context, read, limit)
                 : new FileStoreSourceReader<>(RecordsFunction.forIterate(), 
context, read, limit);
     }
 
     private boolean isBounded() {
-        return table.coreOptions().scanBoundedWatermark() != null;
+        return CoreOptions.fromMap(options).scanBoundedWatermark() != null;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 121156dec..6529c8f04 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -33,7 +33,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
 import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -64,14 +64,14 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_
  * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
  * batch mode or change-tracking is disabled. For streaming mode with 
change-tracking enabled and
  * FULL scan mode, it will create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link 
StaticFileStoreSource} and
- * kafka log source created by {@link LogSourceProvider}.
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
+ * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created 
by {@link
+ * LogSourceProvider}.
  */
 public class DataTableSource extends FlinkTableSource
         implements LookupTableSource, SupportsWatermarkPushDown {
 
     private final ObjectIdentifier tableIdentifier;
-    private final FileStoreTable table;
     private final boolean streaming;
     private final DynamicTableFactory.Context context;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
@@ -80,7 +80,7 @@ public class DataTableSource extends FlinkTableSource
 
     public DataTableSource(
             ObjectIdentifier tableIdentifier,
-            FileStoreTable table,
+            Table table,
             boolean streaming,
             DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
@@ -98,7 +98,7 @@ public class DataTableSource extends FlinkTableSource
 
     private DataTableSource(
             ObjectIdentifier tableIdentifier,
-            FileStoreTable table,
+            Table table,
             boolean streaming,
             DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory,
@@ -108,7 +108,6 @@ public class DataTableSource extends FlinkTableSource
             @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
         super(table, predicate, projectFields, limit);
         this.tableIdentifier = tableIdentifier;
-        this.table = table;
         this.streaming = streaming;
         this.context = context;
         this.logStoreTableFactory = logStoreTableFactory;
@@ -130,7 +129,7 @@ public class DataTableSource extends FlinkTableSource
         } else if (table instanceof ChangelogValueCountFileStoreTable) {
             return ChangelogMode.all();
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
-            Options options = Options.fromMap(table.schema().options());
+            Options options = Options.fromMap(table.options());
 
             if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
                 return ChangelogMode.all();
@@ -164,7 +163,7 @@ public class DataTableSource extends FlinkTableSource
         }
 
         WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
-        Options options = table.coreOptions().toConfiguration();
+        Options options = Options.fromMap(table.options());
         if (watermarkStrategy != null) {
             WatermarkEmitStrategy emitStrategy = 
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
             if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
@@ -199,7 +198,7 @@ public class DataTableSource extends FlinkTableSource
                         .withPredicate(predicate)
                         .withLimit(limit)
                         .withParallelism(
-                                Options.fromMap(table.schema().options())
+                                Options.fromMap(table.options())
                                         
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
                         .withWatermarkStrategy(watermarkStrategy);
 
@@ -239,7 +238,7 @@ public class DataTableSource extends FlinkTableSource
         }
         int[] projection =
                 projectFields == null
-                        ? IntStream.range(0, 
table.schema().fields().size()).toArray()
+                        ? IntStream.range(0, 
table.rowType().getFieldCount()).toArray()
                         : Projection.of(projectFields).toTopLevelIndexes();
         int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
         return LookupRuntimeProviderFactory.create(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 0876926a5..7de7e3c2c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -20,12 +20,13 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.StartupMode;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.Projection;
 import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -52,7 +53,7 @@ import static 
org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 public class FlinkSourceBuilder {
 
     private final ObjectIdentifier tableIdentifier;
-    private final FileStoreTable table;
+    private final Table table;
     private final Options conf;
 
     private boolean isContinuous = false;
@@ -64,10 +65,10 @@ public class FlinkSourceBuilder {
     @Nullable private Long limit;
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
 
-    public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, FileStoreTable 
table) {
+    public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
-        this.conf = Options.fromMap(table.schema().options());
+        this.conf = Options.fromMap(table.options());
     }
 
     public FlinkSourceBuilder withContinuousMode(boolean isContinuous) {
@@ -112,16 +113,23 @@ public class FlinkSourceBuilder {
     }
 
     private StaticFileStoreSource buildStaticFileSource() {
-        return new StaticFileStoreSource(table, projectedFields, predicate, 
limit);
+        return new StaticFileStoreSource(
+                
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
+                limit,
+                Options.fromMap(table.options())
+                        
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
     }
 
     private ContinuousFileStoreSource buildContinuousFileSource() {
-        return new ContinuousFileStoreSource(table, projectedFields, 
predicate, limit);
+        return new ContinuousFileStoreSource(
+                
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
+                table.options(),
+                limit);
     }
 
     public Source<RowData, ?, ?> buildSource() {
         if (isContinuous) {
-            StreamDataTableScan.validate(table.schema());
+            TableScanUtils.streamingReadingValidate(table);
 
             // TODO visit all options through CoreOptions
             StartupMode startupMode = CoreOptions.startupMode(conf);
@@ -149,7 +157,7 @@ public class FlinkSourceBuilder {
             throw new IllegalArgumentException("StreamExecutionEnvironment 
should not be null.");
         }
 
-        RowType rowType = toLogicalType(table.schema().logicalRowType());
+        RowType rowType = toLogicalType(table.rowType());
         LogicalType produceType =
                 Optional.ofNullable(projectedFields)
                         .map(Projection::of)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 541c1a26c..143fe0039 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -43,7 +43,7 @@ public abstract class FlinkTableSource
                 SupportsProjectionPushDown,
                 SupportsLimitPushDown {
 
-    private final Table table;
+    protected final Table table;
 
     @Nullable protected Predicate predicate;
     @Nullable protected int[][] projectFields;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
deleted file mode 100644
index 5fe27f25d..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSystemSource.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source;
-
-import org.apache.paimon.table.source.ReadBuilder;
-
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-
-/** A {@link FlinkSource} for system table. */
-public class SimpleSystemSource extends FlinkSource {
-
-    private static final long serialVersionUID = 2L;
-    private final int splitBatchSize;
-
-    public SimpleSystemSource(ReadBuilder readBuilder, @Nullable Long limit, 
int splitBatchSize) {
-        super(readBuilder, limit);
-        this.splitBatchSize = splitBatchSize;
-    }
-
-    @Override
-    public Boundedness getBoundedness() {
-        return Boundedness.BOUNDED;
-    }
-
-    @Override
-    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
-            SplitEnumeratorContext<FileStoreSourceSplit> context,
-            PendingSplitsCheckpoint checkpoint) {
-        Collection<FileStoreSourceSplit> splits =
-                checkpoint == null
-                        ? new FileStoreSourceSplitGenerator()
-                                .createSplits(readBuilder.newScan().plan())
-                        : checkpoint.splits();
-
-        return new StaticFileStoreSplitEnumerator(context, null, splits, 
splitBatchSize);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 3af455a67..219f2c13a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,11 +18,8 @@
 
 package org.apache.paimon.flink.source;
 
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.flink.utils.TableScanUtils;
+import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -37,28 +34,22 @@ public class StaticFileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 3L;
 
-    private final DataTable table;
-    private final BatchDataTableScan.Factory scanFactory;
-    private final Predicate predicate;
+    private final int splitBatchSize;
+    private final TableScanUtils.TableScanFactory scanFactory;
 
     public StaticFileStoreSource(
-            DataTable table,
-            @Nullable int[][] projectedFields,
-            @Nullable Predicate predicate,
-            @Nullable Long limit) {
-        this(table, projectedFields, predicate, limit, DataTable::newScan);
+            ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize) 
{
+        this(readBuilder, limit, splitBatchSize, ReadBuilder::newScan);
     }
 
     public StaticFileStoreSource(
-            DataTable table,
-            @Nullable int[][] projectedFields,
-            @Nullable Predicate predicate,
+            ReadBuilder readBuilder,
             @Nullable Long limit,
-            BatchDataTableScan.Factory scanFactory) {
-        
super(table.newReadBuilder().withFilter(predicate).withProjection(projectedFields),
 limit);
-        this.table = table;
+            int splitBatchSize,
+            TableScanUtils.TableScanFactory scanFactory) {
+        super(readBuilder, limit);
+        this.splitBatchSize = splitBatchSize;
         this.scanFactory = scanFactory;
-        this.predicate = predicate;
     }
 
     @Override
@@ -70,24 +61,12 @@ public class StaticFileStoreSource extends FlinkSource {
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
-        Collection<FileStoreSourceSplit> splits;
-        if (checkpoint == null) {
-            FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
-            // read all splits from scan
-            DataTableScan.DataFilePlan plan =
-                    scanFactory.create(table).withFilter(predicate).plan();
-            splits = splitGenerator.createSplits(plan);
-        } else {
-            // restore from checkpoint
-            splits = checkpoint.splits();
-        }
+        Collection<FileStoreSourceSplit> splits =
+                checkpoint == null
+                        ? new FileStoreSourceSplitGenerator()
+                                
.createSplits(scanFactory.create(readBuilder).plan())
+                        : checkpoint.splits();
 
-        return new StaticFileStoreSplitEnumerator(
-                context,
-                null,
-                splits,
-                table.coreOptions()
-                        .toConfiguration()
-                        
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
+        return new StaticFileStoreSplitEnumerator(context, null, splits, 
splitBatchSize);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 89b1c0518..54361a622 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -91,7 +91,7 @@ public class StaticFileStoreSplitEnumerator
         while (taskSplits != null && !taskSplits.isEmpty() && 
assignment.size() < splitBatchSize) {
             assignment.add(taskSplits.poll());
         }
-        if (assignment != null && assignment.size() > 0) {
+        if (assignment.size() > 0) {
             context.assignSplits(
                     new SplitsAssignment<>(Collections.singletonMap(subtask, 
assignment)));
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e0a9a5502..a4d911540 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -18,9 +18,12 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -33,15 +36,15 @@ import javax.annotation.Nullable;
 /** A {@link FlinkTableSource} for system table. */
 public class SystemTableSource extends FlinkTableSource {
 
-    private final Table table;
     private final boolean isStreamingMode;
     private final int splitBatchSize;
 
-    public SystemTableSource(Table table, boolean isStreamingMode, int 
splitBatchSize) {
+    public SystemTableSource(Table table, boolean isStreamingMode) {
         super(table);
-        this.table = table;
         this.isStreamingMode = isStreamingMode;
-        this.splitBatchSize = splitBatchSize;
+        this.splitBatchSize =
+                Options.fromMap(table.options())
+                        
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
     }
 
     public SystemTableSource(
@@ -52,7 +55,6 @@ public class SystemTableSource extends FlinkTableSource {
             @Nullable Long limit,
             int splitBatchSize) {
         super(table, predicate, projectFields, limit);
-        this.table = table;
         this.isStreamingMode = isStreamingMode;
         this.splitBatchSize = splitBatchSize;
     }
@@ -65,21 +67,13 @@ public class SystemTableSource extends FlinkTableSource {
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
         Source<RowData, ?, ?> source;
-        if (table instanceof DataTable) {
-            DataTable dataTable = (DataTable) table;
-            source =
-                    isStreamingMode
-                            ? new ContinuousFileStoreSource(
-                                    dataTable, projectFields, predicate, limit)
-                            : new StaticFileStoreSource(dataTable, 
projectFields, predicate, limit);
+        ReadBuilder readBuilder =
+                
table.newReadBuilder().withProjection(projectFields).withFilter(predicate);
+
+        if (isStreamingMode && table instanceof DataTable) {
+            source = new ContinuousFileStoreSource(readBuilder, 
table.options(), limit);
         } else {
-            source =
-                    new SimpleSystemSource(
-                            table.newReadBuilder()
-                                    .withFilter(predicate)
-                                    .withProjection(projectFields),
-                            limit,
-                            splitBatchSize);
+            source = new StaticFileStoreSource(readBuilder, limit, 
splitBatchSize);
         }
         return SourceProvider.of(source);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
new file mode 100644
index 000000000..9ab22f00c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.BatchDataTableScan;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
+import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
+import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
+import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+/** Utility methods for {@link TableScan}, such as validating and creating. */
+public class TableScanUtils {
+
+    public static void streamingReadingValidate(Table table) {
+        CoreOptions options = CoreOptions.fromMap(table.options());
+        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+        HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
+                new HashMap<CoreOptions.MergeEngine, String>() {
+                    {
+                        put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial 
update");
+                        put(CoreOptions.MergeEngine.AGGREGATE, 
"Pre-aggregate");
+                    }
+                };
+        if (table.primaryKeys().size() > 0 && 
mergeEngineDesc.containsKey(mergeEngine)) {
+            switch (options.changelogProducer()) {
+                case NONE:
+                case INPUT:
+                    throw new RuntimeException(
+                            mergeEngineDesc.get(mergeEngine)
+                                    + " streaming reading is not supported. 
You can use "
+                                    + "'lookup' or 'full-compaction' changelog 
producer to support streaming reading.");
+                default:
+            }
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    // TableScan factories
+    // ------------------------------------------------------------------------
+
+    /** Factory to create batch {@link TableScan}. */
+    public interface TableScanFactory extends Serializable {
+        TableScan create(ReadBuilder readBuilder);
+    }
+
+    /** Factory to create {@link StreamTableScan}. */
+    public interface StreamTableScanFactory extends Serializable {
+
+        StreamTableScan create(ReadBuilder readBuilder, @Nullable Long 
nextSnapshotId);
+    }
+
+    public static StreamTableScanFactory defaultStreamScanFactory() {
+        return (builder, nextSnapshotId) -> {
+            StreamTableScan scan = builder.newStreamScan();
+            scan.restore(nextSnapshotId);
+            return scan;
+        };
+    }
+
+    public static TableScanFactory compactBatchScanFactory() {
+        return readBuilder -> {
+            BatchDataTableScan scan = (BatchDataTableScan) 
readBuilder.newScan();
+            // static compactor source will compact all current files
+            scan.withStartingScanner(new FullStartingScanner());
+            return scan;
+        };
+    }
+
+    public static StreamTableScanFactory compactStreamScanFactory() {
+        return (readBuilder, nextSnapshotId) -> {
+            StreamDataTableScan scan = (StreamDataTableScan) 
readBuilder.newStreamScan();
+            scan.withStartingScanner(new ContinuousCompactorStartingScanner())
+                    .withFollowUpScanner(new 
ContinuousCompactorFollowUpScanner())
+                    .withBoundedChecker(BoundedChecker.neverEnd());
+            scan.restore(nextSnapshotId);
+            return scan;
+        };
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 1abb25c4e..38ad31d40 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -457,7 +457,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testLookupPartialUpdateIllegal() throws Exception {
+    public void testLookupPartialUpdateIllegal() {
         sql(
                 "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
                         + " 
('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')");
@@ -465,7 +465,8 @@ public class LookupJoinITCase extends CatalogITCaseBase {
                 "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for 
system_time as of T.proctime AS D ON T.i = D.i";
         assertThatThrownBy(() -> sEnv.executeSql(query))
                 .hasRootCauseMessage(
-                        "Partial update continuous reading is not supported. "
+                        "Partial update streaming"
+                                + " reading is not supported. "
                                 + "You can use 'lookup' or 'full-compaction' 
changelog producer to support streaming reading.");
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index b394a28cd..57d9fb118 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -57,7 +57,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static 
org.apache.paimon.flink.AbstractFlinkTableFactory.buildFileStoreTable;
+import static 
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
 import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
@@ -1155,7 +1155,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
 
         DynamicTableSink tableSink =
                 new FlinkTableSink(
-                        context.getObjectIdentifier(), 
buildFileStoreTable(context), context, null);
+                        context.getObjectIdentifier(), 
buildPaimonTable(context), context, null);
         assertThat(tableSink).isInstanceOf(FlinkTableSink.class);
 
         // 2. get sink provider

Reply via email to