This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a710041f0 [flink] Remove scan.push-down support which should be 
supported by Flink (#4224)
a710041f0 is described below

commit a710041f0ecddc9a0c22244a3e24f0fb7c06450d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Sep 23 14:24:56 2024 +0800

    [flink] Remove scan.push-down support which should be supported by Flink 
(#4224)
---
 .../generated/flink_connector_configuration.html   |   6 -
 .../paimon/flink/source/DataTableSource.java       |  96 +++++++++++
 .../paimon/flink/source/table/RichTableSource.java |  53 ------
 .../paimon/flink/AbstractFlinkTableFactory.java    |  29 ++--
 .../apache/paimon/flink/FlinkConnectorOptions.java |   9 -
 ...taTableSource.java => BaseDataTableSource.java} | 101 ++---------
 .../paimon/flink/source/DataTableSource.java       | 186 ++-------------------
 .../paimon/flink/source/FlinkTableSource.java      |  52 +++---
 .../paimon/flink/source/SystemTableSource.java     |  35 ----
 .../paimon/flink/source/table/BaseTableSource.java |  49 ------
 .../flink/source/table/PushedRichTableSource.java  |  65 -------
 .../flink/source/table/PushedTableSource.java      |  65 -------
 .../paimon/flink/source/table/RichTableSource.java |  76 ---------
 .../paimon/flink/ContinuousFileStoreITCase.java    |  27 ---
 .../paimon/flink/source/FlinkTableSourceTest.java  |  30 ++--
 15 files changed, 176 insertions(+), 703 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index e96f2f65e..cf4c7c1da 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -116,12 +116,6 @@ under the License.
             <td>Integer</td>
             <td>Define a custom parallelism for the scan source. By default, 
if this option is not defined, the planner will derive the parallelism for each 
statement individually by also considering the global configuration. If user 
enable the scan.infer-parallelism, the planner will derive the parallelism by 
inferred parallelism.</td>
         </tr>
-        <tr>
-            <td><h5>scan.push-down</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>If true, flink will push down projection, filters, limit to 
the source. The cost is that it is difficult to reuse the source in a job. With 
flink 1.18 or higher version, it is possible to reuse the source even with 
projection push down.</td>
-        </tr>
         <tr>
             <td><h5>scan.remove-normalize</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
new file mode 100644
index 000000000..f870d8370
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+/** A {@link BaseDataTableSource} for Flink 1.15. */
+public class DataTableSource extends BaseDataTableSource {
+
+    public DataTableSource(
+            ObjectIdentifier tableIdentifier,
+            Table table,
+            boolean streaming,
+            DynamicTableFactory.Context context,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        this(
+                tableIdentifier,
+                table,
+                streaming,
+                context,
+                logStoreTableFactory,
+                null,
+                null,
+                null,
+                null);
+    }
+
+    public DataTableSource(
+            ObjectIdentifier tableIdentifier,
+            Table table,
+            boolean streaming,
+            DynamicTableFactory.Context context,
+            @Nullable LogStoreTableFactory logStoreTableFactory,
+            @Nullable Predicate predicate,
+            @Nullable int[][] projectFields,
+            @Nullable Long limit,
+            @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
+        super(
+                tableIdentifier,
+                table,
+                streaming,
+                context,
+                logStoreTableFactory,
+                predicate,
+                projectFields,
+                limit,
+                watermarkStrategy);
+    }
+
+    @Override
+    public DataTableSource copy() {
+        return new DataTableSource(
+                tableIdentifier,
+                table,
+                streaming,
+                context,
+                logStoreTableFactory,
+                predicate,
+                projectFields,
+                limit,
+                watermarkStrategy);
+    }
+
+    @Override
+    protected List<String> dynamicPartitionFilteringFields() {
+        return Collections.emptyList();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
deleted file mode 100644
index 50c8dc5ff..000000000
--- 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source.table;
-
-import org.apache.paimon.flink.source.FlinkTableSource;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
-import org.apache.flink.table.data.RowData;
-
-/** The {@link BaseTableSource} with Lookup and watermark. */
-public class RichTableSource extends BaseTableSource
-        implements LookupTableSource, SupportsWatermarkPushDown {
-
-    private final FlinkTableSource source;
-
-    public RichTableSource(FlinkTableSource source) {
-        super(source);
-        this.source = source;
-    }
-
-    @Override
-    public RichTableSource copy() {
-        return new RichTableSource(source.copy());
-    }
-
-    @Override
-    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
-        return source.getLookupRuntimeProvider(context);
-    }
-
-    @Override
-    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        source.pushWatermark(watermarkStrategy);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 86a22dc07..e469044f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -28,9 +28,6 @@ import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.source.DataTableSource;
 import org.apache.paimon.flink.source.SystemTableSource;
-import org.apache.paimon.flink.source.table.PushedRichTableSource;
-import org.apache.paimon.flink.source.table.PushedTableSource;
-import org.apache.paimon.flink.source.table.RichTableSource;
 import org.apache.paimon.lineage.LineageMeta;
 import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.lineage.TableLineageEntity;
@@ -83,7 +80,6 @@ import static 
org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
 import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PUSH_DOWN;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static 
org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;
 
@@ -100,11 +96,10 @@ public abstract class AbstractFlinkTableFactory
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
         if (origin instanceof SystemCatalogTable) {
-            return new PushedTableSource(
-                    new SystemTableSource(
-                            ((SystemCatalogTable) origin).table(),
-                            isStreamingMode,
-                            context.getObjectIdentifier()));
+            return new SystemTableSource(
+                    ((SystemCatalogTable) origin).table(),
+                    isStreamingMode,
+                    context.getObjectIdentifier());
         } else {
             Table table = buildPaimonTable(context);
             if (table instanceof FileStoreTable) {
@@ -120,16 +115,12 @@ public abstract class AbstractFlinkTableFactory
                             }
                         });
             }
-            DataTableSource source =
-                    new DataTableSource(
-                            context.getObjectIdentifier(),
-                            table,
-                            isStreamingMode,
-                            context,
-                            
createOptionalLogStoreFactory(context).orElse(null));
-            return new Options(table.options()).get(SCAN_PUSH_DOWN)
-                    ? new PushedRichTableSource(source)
-                    : new RichTableSource(source);
+            return new DataTableSource(
+                    context.getObjectIdentifier(),
+                    table,
+                    isStreamingMode,
+                    context,
+                    createOptionalLogStoreFactory(context).orElse(null));
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 2a8f5f4cd..d181d7b5a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -244,15 +244,6 @@ public class FlinkConnectorOptions {
                             "Weight of managed memory for RocksDB in 
cross-partition update, Flink will compute the memory size "
                                     + "according to the weight, the actual 
memory used depends on the running environment.");
 
-    public static final ConfigOption<Boolean> SCAN_PUSH_DOWN =
-            ConfigOptions.key("scan.push-down")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            "If true, flink will push down projection, 
filters, limit to the source. The cost is that it "
-                                    + "is difficult to reuse the source in a 
job. With flink 1.18 or higher version, it "
-                                    + "is possible to reuse the source even 
with projection push down.");
-
     public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
             ConfigOptions.key("source.checkpoint-align.enabled")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
similarity index 74%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index c4544426f..8775ab8f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -36,18 +36,18 @@ import org.apache.paimon.utils.Projection;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
 import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
 import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
 import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.plan.stats.TableStats;
 
 import javax.annotation.Nullable;
 
 import java.time.Duration;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.IntStream;
 
@@ -62,47 +62,22 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
-import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
  * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
- * batch mode or change-tracking is disabled. For streaming mode with 
change-tracking enabled and
- * FULL scan mode, it will create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
- * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created 
by {@link
- * LogSourceProvider}.
+ * batch mode or streaming mode.
  */
-public class DataTableSource extends FlinkTableSource {
+public abstract class BaseDataTableSource extends FlinkTableSource
+        implements LookupTableSource, SupportsWatermarkPushDown {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final boolean streaming;
-    private final DynamicTableFactory.Context context;
-    @Nullable private final LogStoreTableFactory logStoreTableFactory;
+    protected final ObjectIdentifier tableIdentifier;
+    protected final boolean streaming;
+    protected final DynamicTableFactory.Context context;
+    @Nullable protected final LogStoreTableFactory logStoreTableFactory;
 
-    @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
+    @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
 
-    @Nullable private List<String> dynamicPartitionFilteringFields;
-
-    public DataTableSource(
-            ObjectIdentifier tableIdentifier,
-            Table table,
-            boolean streaming,
-            DynamicTableFactory.Context context,
-            @Nullable LogStoreTableFactory logStoreTableFactory) {
-        this(
-                tableIdentifier,
-                table,
-                streaming,
-                context,
-                logStoreTableFactory,
-                null,
-                null,
-                null,
-                null,
-                null);
-    }
-
-    public DataTableSource(
+    public BaseDataTableSource(
             ObjectIdentifier tableIdentifier,
             Table table,
             boolean streaming,
@@ -111,8 +86,7 @@ public class DataTableSource extends FlinkTableSource {
             @Nullable Predicate predicate,
             @Nullable int[][] projectFields,
             @Nullable Long limit,
-            @Nullable WatermarkStrategy<RowData> watermarkStrategy,
-            @Nullable List<String> dynamicPartitionFilteringFields) {
+            @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
         super(table, predicate, projectFields, limit);
         this.tableIdentifier = tableIdentifier;
         this.streaming = streaming;
@@ -122,7 +96,6 @@ public class DataTableSource extends FlinkTableSource {
         this.projectFields = projectFields;
         this.limit = limit;
         this.watermarkStrategy = watermarkStrategy;
-        this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
     }
 
     @Override
@@ -198,7 +171,7 @@ public class DataTableSource extends FlinkTableSource {
                         .predicate(predicate)
                         .limit(limit)
                         .watermarkStrategy(watermarkStrategy)
-                        
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
+                        
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());
 
         return new PaimonDataStreamScanProvider(
                 !streaming,
@@ -209,23 +182,10 @@ public class DataTableSource extends FlinkTableSource {
                                 .build());
     }
 
-    @Override
-    public DataTableSource copy() {
-        return new DataTableSource(
-                tableIdentifier,
-                table,
-                streaming,
-                context,
-                logStoreTableFactory,
-                predicate,
-                projectFields,
-                limit,
-                watermarkStrategy,
-                dynamicPartitionFilteringFields);
-    }
+    protected abstract List<String> dynamicPartitionFilteringFields();
 
     @Override
-    public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
     }
 
@@ -249,42 +209,11 @@ public class DataTableSource extends FlinkTableSource {
                 asyncThreadNumber);
     }
 
-    @Override
-    public TableStats reportStatistics() {
-        if (streaming) {
-            return TableStats.UNKNOWN;
-        }
-
-        scanSplitsForInference();
-        return new TableStats(splitStatistics.totalRowCount());
-    }
-
     @Override
     public String asSummaryString() {
         return "Paimon-DataSource";
     }
 
-    @Override
-    public List<String> listAcceptedFilterFields() {
-        // note that streaming query doesn't support dynamic filtering
-        return streaming ? Collections.emptyList() : table.partitionKeys();
-    }
-
-    @Override
-    public void applyDynamicFiltering(List<String> candidateFilterFields) {
-        checkState(
-                !streaming,
-                "Cannot apply dynamic filtering to Paimon table '%s' when 
streaming reading.",
-                table.name());
-
-        checkState(
-                !table.partitionKeys().isEmpty(),
-                "Cannot apply dynamic filtering to non-partitioned Paimon 
table '%s'.",
-                table.name());
-
-        this.dynamicPartitionFilteringFields = candidateFilterFields;
-    }
-
     @Override
     public boolean isStreaming() {
         return streaming;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index c4544426f..200550c88 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -18,68 +18,31 @@
 
 package org.apache.paimon.flink.source;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.LogChangelogMode;
-import org.apache.paimon.CoreOptions.LogConsistency;
-import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
-import org.apache.paimon.flink.PaimonDataStreamScanProvider;
-import org.apache.paimon.flink.log.LogSourceProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
-import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
-import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
-import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
-import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import 
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.plan.stats.TableStats;
 
 import javax.annotation.Nullable;
 
-import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.IntStream;
 
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
-import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
- * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
- * batch mode or change-tracking is disabled. For streaming mode with 
change-tracking enabled and
- * FULL scan mode, it will create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
- * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created 
by {@link
- * LogSourceProvider}.
+ * A {@link BaseDataTableSource} implements {@link SupportsStatisticReport} 
and {@link
+ * SupportsDynamicFiltering}.
  */
-public class DataTableSource extends FlinkTableSource {
-
-    private final ObjectIdentifier tableIdentifier;
-    private final boolean streaming;
-    private final DynamicTableFactory.Context context;
-    @Nullable private final LogStoreTableFactory logStoreTableFactory;
-
-    @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
+public class DataTableSource extends BaseDataTableSource
+        implements SupportsStatisticReport, SupportsDynamicFiltering {
 
     @Nullable private List<String> dynamicPartitionFilteringFields;
 
@@ -113,102 +76,19 @@ public class DataTableSource extends FlinkTableSource {
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
             @Nullable List<String> dynamicPartitionFilteringFields) {
-        super(table, predicate, projectFields, limit);
-        this.tableIdentifier = tableIdentifier;
-        this.streaming = streaming;
-        this.context = context;
-        this.logStoreTableFactory = logStoreTableFactory;
-        this.predicate = predicate;
-        this.projectFields = projectFields;
-        this.limit = limit;
-        this.watermarkStrategy = watermarkStrategy;
+        super(
+                tableIdentifier,
+                table,
+                streaming,
+                context,
+                logStoreTableFactory,
+                predicate,
+                projectFields,
+                limit,
+                watermarkStrategy);
         this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
     }
 
-    @Override
-    public ChangelogMode getChangelogMode() {
-        if (!streaming) {
-            // batch merge all, return insert only
-            return ChangelogMode.insertOnly();
-        }
-
-        if (table.primaryKeys().isEmpty()) {
-            return ChangelogMode.insertOnly();
-        } else {
-            Options options = Options.fromMap(table.options());
-
-            if (new CoreOptions(options).mergeEngine() == 
CoreOptions.MergeEngine.FIRST_ROW) {
-                return ChangelogMode.insertOnly();
-            }
-
-            if (options.get(SCAN_REMOVE_NORMALIZE)) {
-                return ChangelogMode.all();
-            }
-
-            if (logStoreTableFactory == null
-                    && options.get(CHANGELOG_PRODUCER) != 
ChangelogProducer.NONE) {
-                return ChangelogMode.all();
-            }
-
-            // optimization: transaction consistency and all changelog mode 
avoid the generation of
-            // normalized nodes. See FlinkTableSink.getChangelogMode 
validation.
-            return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
-                            && options.get(LOG_CHANGELOG_MODE) == 
LogChangelogMode.ALL
-                    ? ChangelogMode.all()
-                    : ChangelogMode.upsert();
-        }
-    }
-
-    @Override
-    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
-        LogSourceProvider logSourceProvider = null;
-        if (logStoreTableFactory != null) {
-            logSourceProvider =
-                    logStoreTableFactory.createSourceProvider(context, 
scanContext, projectFields);
-        }
-
-        WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
-        Options options = Options.fromMap(table.options());
-        if (watermarkStrategy != null) {
-            WatermarkEmitStrategy emitStrategy = 
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
-            if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
-                watermarkStrategy = new 
OnEventWatermarkStrategy(watermarkStrategy);
-            }
-            Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
-            if (idleTimeout != null) {
-                watermarkStrategy = 
watermarkStrategy.withIdleness(idleTimeout);
-            }
-            String watermarkAlignGroup = 
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
-            if (watermarkAlignGroup != null) {
-                watermarkStrategy =
-                        WatermarkAlignUtils.withWatermarkAlignment(
-                                watermarkStrategy,
-                                watermarkAlignGroup,
-                                
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
-                                
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
-            }
-        }
-
-        FlinkSourceBuilder sourceBuilder =
-                new FlinkSourceBuilder(table)
-                        .sourceName(tableIdentifier.asSummaryString())
-                        .sourceBounded(!streaming)
-                        .logSourceProvider(logSourceProvider)
-                        .projection(projectFields)
-                        .predicate(predicate)
-                        .limit(limit)
-                        .watermarkStrategy(watermarkStrategy)
-                        
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
-
-        return new PaimonDataStreamScanProvider(
-                !streaming,
-                env ->
-                        sourceBuilder
-                                .sourceParallelism(inferSourceParallelism(env))
-                                .env(env)
-                                .build());
-    }
-
     @Override
     public DataTableSource copy() {
         return new DataTableSource(
@@ -224,31 +104,6 @@ public class DataTableSource extends FlinkTableSource {
                 dynamicPartitionFilteringFields);
     }
 
-    @Override
-    public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        this.watermarkStrategy = watermarkStrategy;
-    }
-
-    @Override
-    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
-        if (limit != null) {
-            throw new RuntimeException(
-                    "Limit push down should not happen in Lookup source, but 
it is " + limit);
-        }
-        int[] projection =
-                projectFields == null
-                        ? IntStream.range(0, 
table.rowType().getFieldCount()).toArray()
-                        : Projection.of(projectFields).toTopLevelIndexes();
-        int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
-        Options options = new Options(table.options());
-        boolean enableAsync = options.get(LOOKUP_ASYNC);
-        int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER);
-        return LookupRuntimeProviderFactory.create(
-                new FileStoreLookupFunction(table, projection, joinKey, 
predicate),
-                enableAsync,
-                asyncThreadNumber);
-    }
-
     @Override
     public TableStats reportStatistics() {
         if (streaming) {
@@ -259,11 +114,6 @@ public class DataTableSource extends FlinkTableSource {
         return new TableStats(splitStatistics.totalRowCount());
     }
 
-    @Override
-    public String asSummaryString() {
-        return "Paimon-DataSource";
-    }
-
     @Override
     public List<String> listAcceptedFilterFields() {
         // note that streaming query doesn't support dynamic filtering
@@ -286,7 +136,7 @@ public class DataTableSource extends FlinkTableSource {
     }
 
     @Override
-    public boolean isStreaming() {
-        return streaming;
+    protected List<String> dynamicPartitionFilteringFields() {
+        return dynamicPartitionFilteringFields;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 7254eefaa..920a1ba14 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -30,18 +30,13 @@ import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
 
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
-import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
-import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
-import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +50,11 @@ import java.util.Optional;
 import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
 
 /** A Flink {@link ScanTableSource} for paimon. */
-public abstract class FlinkTableSource {
+public abstract class FlinkTableSource
+        implements ScanTableSource,
+                SupportsFilterPushDown,
+                SupportsProjectionPushDown,
+                SupportsLimitPushDown {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTableSource.class);
 
@@ -85,8 +84,8 @@ public abstract class FlinkTableSource {
         this.limit = limit;
     }
 
-    /** @return The unconsumed filters. */
-    public List<ResolvedExpression> pushFilters(List<ResolvedExpression> 
filters) {
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
         List<String> partitionKeys = table.partitionKeys();
         RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
 
@@ -115,35 +114,24 @@ public abstract class FlinkTableSource {
         predicate = converted.isEmpty() ? null : 
PredicateBuilder.and(converted);
         LOG.info("Consumed filters: {} of {}", consumedFilters, filters);
 
-        return unConsumedFilters;
+        return Result.of(filters, unConsumedFilters);
     }
 
-    public void pushProjection(int[][] projectedFields) {
+    @Override
+    public boolean supportsNestedProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields) {
         this.projectFields = projectedFields;
     }
 
-    public void pushLimit(long limit) {
+    @Override
+    public void applyLimit(long limit) {
         this.limit = limit;
     }
 
-    public abstract ChangelogMode getChangelogMode();
-
-    public abstract ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
scanContext);
-
-    public abstract void pushWatermark(WatermarkStrategy<RowData> 
watermarkStrategy);
-
-    public abstract LookupRuntimeProvider 
getLookupRuntimeProvider(LookupContext context);
-
-    public abstract TableStats reportStatistics();
-
-    public abstract FlinkTableSource copy();
-
-    public abstract String asSummaryString();
-
-    public abstract List<String> listAcceptedFilterFields();
-
-    public abstract void applyDynamicFiltering(List<String> 
candidateFilterFields);
-
     public abstract boolean isStreaming();
 
     @Nullable
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index 49ed0c0b8..e914c617f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -32,17 +32,12 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
 import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.plan.stats.TableStats;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
-import java.util.List;
-
 /** A {@link FlinkTableSource} for system table. */
 public class SystemTableSource extends FlinkTableSource {
 
@@ -127,36 +122,6 @@ public class SystemTableSource extends FlinkTableSource {
         return "Paimon-SystemTable-Source";
     }
 
-    @Override
-    public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(
-            LookupTableSource.LookupContext context) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public TableStats reportStatistics() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<String> listAcceptedFilterFields() {
-        // system table doesn't support dynamic filtering
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void applyDynamicFiltering(List<String> candidateFilterFields) {
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Cannot apply dynamic filtering to Paimon system table 
'%s'.",
-                        table.name()));
-    }
-
     @Override
     public boolean isStreaming() {
         return isStreamingMode;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java
deleted file mode 100644
index 57c4b01a2..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source.table;
-
-import org.apache.paimon.flink.source.FlinkTableSource;
-
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.ScanTableSource;
-
-/** The {@link ScanTableSource} with push down interfaces. */
-public abstract class BaseTableSource implements ScanTableSource {
-
-    private final FlinkTableSource source;
-
-    public BaseTableSource(FlinkTableSource source) {
-        this.source = source;
-    }
-
-    @Override
-    public ChangelogMode getChangelogMode() {
-        return source.getChangelogMode();
-    }
-
-    @Override
-    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-        return source.getScanRuntimeProvider(runtimeProviderContext);
-    }
-
-    @Override
-    public String asSummaryString() {
-        return source.asSummaryString();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
deleted file mode 100644
index 7e55b83f6..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source.table;
-
-import org.apache.paimon.flink.source.FlinkTableSource;
-
-import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
-import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.expressions.ResolvedExpression;
-
-import java.util.List;
-
-/** A {@link RichTableSource} with push down. */
-public class PushedRichTableSource extends RichTableSource
-        implements SupportsFilterPushDown, SupportsProjectionPushDown, 
SupportsLimitPushDown {
-
-    private final FlinkTableSource source;
-
-    public PushedRichTableSource(FlinkTableSource source) {
-        super(source);
-        this.source = source;
-    }
-
-    @Override
-    public PushedRichTableSource copy() {
-        return new PushedRichTableSource(source.copy());
-    }
-
-    @Override
-    public Result applyFilters(List<ResolvedExpression> filters) {
-        return Result.of(filters, source.pushFilters(filters));
-    }
-
-    @Override
-    public void applyLimit(long limit) {
-        source.pushLimit(limit);
-    }
-
-    @Override
-    public boolean supportsNestedProjection() {
-        return false;
-    }
-
-    @Override
-    public void applyProjection(int[][] projectedFields) {
-        source.pushProjection(projectedFields);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
deleted file mode 100644
index a1389b5bf..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source.table;
-
-import org.apache.paimon.flink.source.FlinkTableSource;
-
-import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
-import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import org.apache.flink.table.expressions.ResolvedExpression;
-
-import java.util.List;
-
-/** The {@link BaseTableSource} with push down. */
-public class PushedTableSource extends BaseTableSource
-        implements SupportsFilterPushDown, SupportsProjectionPushDown, 
SupportsLimitPushDown {
-
-    private final FlinkTableSource source;
-
-    public PushedTableSource(FlinkTableSource source) {
-        super(source);
-        this.source = source;
-    }
-
-    @Override
-    public PushedTableSource copy() {
-        return new PushedTableSource(source.copy());
-    }
-
-    @Override
-    public Result applyFilters(List<ResolvedExpression> filters) {
-        return Result.of(filters, source.pushFilters(filters));
-    }
-
-    @Override
-    public void applyLimit(long limit) {
-        source.pushLimit(limit);
-    }
-
-    @Override
-    public boolean supportsNestedProjection() {
-        return false;
-    }
-
-    @Override
-    public void applyProjection(int[][] projectedFields) {
-        source.pushProjection(projectedFields);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
deleted file mode 100644
index 4bf0c169b..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source.table;
-
-import org.apache.paimon.flink.source.FlinkTableSource;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
-import 
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
-import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.plan.stats.TableStats;
-
-import java.util.List;
-
-/** The {@link BaseTableSource} with lookup, watermark, statistic and dynamic 
filtering. */
-public class RichTableSource extends BaseTableSource
-        implements LookupTableSource,
-                SupportsWatermarkPushDown,
-                SupportsStatisticReport,
-                SupportsDynamicFiltering {
-
-    private final FlinkTableSource source;
-
-    public RichTableSource(FlinkTableSource source) {
-        super(source);
-        this.source = source;
-    }
-
-    @Override
-    public RichTableSource copy() {
-        return new RichTableSource(source.copy());
-    }
-
-    @Override
-    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
-        return source.getLookupRuntimeProvider(context);
-    }
-
-    @Override
-    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        source.pushWatermark(watermarkStrategy);
-    }
-
-    @Override
-    public TableStats reportStatistics() {
-        return source.reportStatistics();
-    }
-
-    @Override
-    public List<String> listAcceptedFilterFields() {
-        return source.listAcceptedFilterFields();
-    }
-
-    @Override
-    public void applyDynamicFiltering(List<String> candidateFilterFields) {
-        source.applyDynamicFiltering(candidateFilterFields);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index a0ce8be3a..cf97f7b67 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -57,33 +57,6 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                         + " WITH ('changelog-producer'='input', 'bucket' = 
'1')");
     }
 
-    @Test
-    public void testSourceReuseWithoutScanPushDown() {
-        sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH 
('connector'='print')");
-        sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH 
('connector'='print')");
-
-        StatementSet statementSet = sEnv.createStatementSet();
-        statementSet.addInsertSql(
-                "INSERT INTO print1 SELECT a FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */");
-        statementSet.addInsertSql(
-                "INSERT INTO print2 SELECT b FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */");
-        assertThat(statementSet.compilePlan().explain()).contains("Reused");
-
-        statementSet = sEnv.createStatementSet();
-        statementSet.addInsertSql(
-                "INSERT INTO print1 SELECT a FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache'");
-        statementSet.addInsertSql(
-                "INSERT INTO print2 SELECT b FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon'");
-        assertThat(statementSet.compilePlan().explain()).contains("Reused");
-
-        statementSet = sEnv.createStatementSet();
-        statementSet.addInsertSql(
-                "INSERT INTO print1 SELECT a FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache' LIMIT 5");
-        statementSet.addInsertSql(
-                "INSERT INTO print2 SELECT b FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon' LIMIT 10");
-        assertThat(statementSet.compilePlan().explain()).contains("Reused");
-    }
-
     @Test
     public void testSourceReuseWithScanPushDown() {
         // source can be reused with projection applied
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
index 95f5721c4..cff9ab6f4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkTableSourceTest.java
@@ -52,13 +52,14 @@ public class FlinkTableSourceTest extends TableTestBase {
         Schema schema = Schema.newBuilder().column("col1", 
DataTypes.INT()).build();
         TableSchema tableSchema = new SchemaManager(fileIO, 
tablePath).createTable(schema);
         Table table = FileStoreTableFactory.create(LocalFileIO.create(), 
tablePath, tableSchema);
-        FlinkTableSource tableSource =
+        DataTableSource tableSource =
                 new DataTableSource(
                         ObjectIdentifier.of("catalog1", "db1", "T"), table, 
false, null, null);
 
         // col1 = 1
         List<ResolvedExpression> filters = ImmutableList.of(col1Equal1());
-        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
+                .isEqualTo(filters);
     }
 
     @Test
@@ -81,54 +82,57 @@ public class FlinkTableSourceTest extends TableTestBase {
 
         // col1 = 1 && p1 = 1 => [p1 = 1]
         List<ResolvedExpression> filters = ImmutableList.of(col1Equal1(), 
p1Equal1());
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .isEqualTo(ImmutableList.of(filters.get(0)));
 
         // col1 = 1 && p2 like '%a' => None
         filters = ImmutableList.of(col1Equal1(), p2Like("%a"));
-        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
+                .isEqualTo(filters);
 
         // col1 = 1 && p2 like 'a%' => [p2 like 'a%']
         filters = ImmutableList.of(col1Equal1(), p2Like("a%"));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .isEqualTo(ImmutableList.of(filters.get(0)));
 
         // rand(42) > 0.1 => None
         filters = ImmutableList.of(rand());
-        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
+                .isEqualTo(filters);
 
         // upper(p1) = "A" => [upper(p1) = "A"]
         filters = ImmutableList.of(upperP2EqualA());
-        
Assertions.assertThat(tableSource.pushFilters(filters)).isEqualTo(filters);
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
+                .isEqualTo(filters);
 
         // col1 = 1 && (p2 like 'a%' or p1 = 1) => [p2 like 'a%' or p1 = 1]
         filters = ImmutableList.of(col1Equal1(), or(p2Like("a%"), p1Equal1()));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .isEqualTo(ImmutableList.of(filters.get(0)));
 
         // col1 = 1 && (p2 like '%a' or p1 = 1) => None
         filters = ImmutableList.of(col1Equal1(), or(p2Like("%a"), p1Equal1()));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .containsExactlyInAnyOrder(filters.toArray(new 
ResolvedExpression[0]));
 
         // col1 = 1 && (p2 like 'a%' && p1 = 1) => [p2 like 'a%' && p1 = 1]
         filters = ImmutableList.of(col1Equal1(), and(p2Like("a%"), 
p1Equal1()));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .isEqualTo(ImmutableList.of(filters.get(0)));
 
         // col1 = 1 && (p2 like '%a' && p1 = 1) => None
         filters = ImmutableList.of(col1Equal1(), and(p2Like("%a"), 
p1Equal1()));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .containsExactlyInAnyOrder(filters.toArray(new 
ResolvedExpression[0]));
 
         // p2 like 'a%' && (col1 = 1 or p1 = 1) => [col1 = 1 or p1 = 1]
         filters = ImmutableList.of(p2Like("a%"), or(col1Equal1(), p1Equal1()));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .isEqualTo(ImmutableList.of(filters.get(1)));
 
         // p2 like 'a%' && (col1 = 1 && p1 = 1) => [col1 = 1 && p1 = 1]
         filters = ImmutableList.of(p2Like("a%"), and(col1Equal1(), 
p1Equal1()));
-        Assertions.assertThat(tableSource.pushFilters(filters))
+        
Assertions.assertThat(tableSource.applyFilters(filters).getRemainingFilters())
                 .isEqualTo(ImmutableList.of(filters.get(1)));
     }
 

Reply via email to