This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0013a12c8 [flink] Provide scan.push-down to disable source pushdown
(#1516)
0013a12c8 is described below
commit 0013a12c8434ef0a90e15fbd1f5b67d84ce9585b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jul 7 19:17:49 2023 +0800
[flink] Provide scan.push-down to disable source pushdown (#1516)
---
.../generated/flink_connector_configuration.html | 6 +
.../paimon/flink/source/DataTableSource.java | 64 -----
.../paimon/flink/source/table/RichTableSource.java | 53 ++++
.../paimon/flink/source/DataTableSource.java | 64 -----
.../paimon/flink/source/table/RichTableSource.java | 53 ++++
.../paimon/flink/AbstractFlinkTableFactory.java | 24 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 +
.../flink/source/AbstractDataTableSource.java | 302 ---------------------
.../paimon/flink/source/DataTableSource.java | 249 ++++++++++++++++-
.../paimon/flink/source/FlinkTableSource.java | 46 ++--
.../paimon/flink/source/SystemTableSource.java | 24 +-
.../paimon/flink/source/table/BaseTableSource.java | 49 ++++
.../flink/source/table/PushedRichTableSource.java | 66 +++++
.../flink/source/table/PushedTableSource.java | 66 +++++
.../paimon/flink/source/table/RichTableSource.java | 60 ++++
.../paimon/flink/ContinuousFileStoreITCase.java | 19 ++
16 files changed, 687 insertions(+), 466 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 18ae538fb..60a4629e2 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -50,6 +50,12 @@ under the License.
<td>Integer</td>
<td>Define a custom parallelism for the scan source. By default,
if this option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration. If user
enable the scan.infer-parallelism, the planner will derive the parallelism by
inferred parallelism.</td>
</tr>
+ <tr>
+ <td><h5>scan.push-down</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If true, flink will push down projection, filters, limit to
the source. The cost is that it is difficult to reuse the source in a job.</td>
+ </tr>
<tr>
<td><h5>scan.split-enumerator.batch-size</h5></td>
<td style="word-wrap: break-word;">10</td>
diff --git
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
deleted file mode 100644
index ce7ccd23d..000000000
---
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source;
-
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import javax.annotation.Nullable;
-
-/** Data table source. */
-public class DataTableSource extends AbstractDataTableSource {
- public DataTableSource(
- ObjectIdentifier tableIdentifier,
- Table table,
- boolean streaming,
- DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory) {
- super(tableIdentifier, table, streaming, context,
logStoreTableFactory);
- }
-
- protected DataTableSource(
- ObjectIdentifier tableIdentifier,
- Table table,
- boolean streaming,
- DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory,
- @Nullable Predicate predicate,
- @Nullable int[][] projectFields,
- @Nullable Long limit,
- @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
- super(
- tableIdentifier,
- table,
- streaming,
- context,
- logStoreTableFactory,
- predicate,
- projectFields,
- limit,
- watermarkStrategy);
- }
-}
diff --git
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
new file mode 100644
index 000000000..50c8dc5ff
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.table;
+
+import org.apache.paimon.flink.source.FlinkTableSource;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+
+/** The {@link BaseTableSource} with Lookup and watermark. */
+public class RichTableSource extends BaseTableSource
+ implements LookupTableSource, SupportsWatermarkPushDown {
+
+ private final FlinkTableSource source;
+
+ public RichTableSource(FlinkTableSource source) {
+ super(source);
+ this.source = source;
+ }
+
+ @Override
+ public RichTableSource copy() {
+ return new RichTableSource(source.copy());
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ return source.getLookupRuntimeProvider(context);
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ source.pushWatermark(watermarkStrategy);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
deleted file mode 100644
index ce7ccd23d..000000000
---
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source;
-
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import javax.annotation.Nullable;
-
-/** Data table source. */
-public class DataTableSource extends AbstractDataTableSource {
- public DataTableSource(
- ObjectIdentifier tableIdentifier,
- Table table,
- boolean streaming,
- DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory) {
- super(tableIdentifier, table, streaming, context,
logStoreTableFactory);
- }
-
- protected DataTableSource(
- ObjectIdentifier tableIdentifier,
- Table table,
- boolean streaming,
- DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory,
- @Nullable Predicate predicate,
- @Nullable int[][] projectFields,
- @Nullable Long limit,
- @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
- super(
- tableIdentifier,
- table,
- streaming,
- context,
- logStoreTableFactory,
- predicate,
- projectFields,
- limit,
- watermarkStrategy);
- }
-}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
new file mode 100644
index 000000000..50c8dc5ff
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.table;
+
+import org.apache.paimon.flink.source.FlinkTableSource;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+
+/** The {@link BaseTableSource} with Lookup and watermark. */
+public class RichTableSource extends BaseTableSource
+ implements LookupTableSource, SupportsWatermarkPushDown {
+
+ private final FlinkTableSource source;
+
+ public RichTableSource(FlinkTableSource source) {
+ super(source);
+ this.source = source;
+ }
+
+ @Override
+ public RichTableSource copy() {
+ return new RichTableSource(source.copy());
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ return source.getLookupRuntimeProvider(context);
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ source.pushWatermark(watermarkStrategy);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 59666f4b0..4ac6d542a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -27,6 +27,9 @@ import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
+import org.apache.paimon.flink.source.table.PushedRichTableSource;
+import org.apache.paimon.flink.source.table.PushedTableSource;
+import org.apache.paimon.flink.source.table.RichTableSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTableFactory;
@@ -60,6 +63,7 @@ import static
org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT_FULL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkConnectorOptions.NONE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PUSH_DOWN;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static
org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;
@@ -74,14 +78,20 @@ public abstract class AbstractFlinkTableFactory
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
- return new SystemTableSource(((SystemCatalogTable)
origin).table(), isStreamingMode);
+ return new PushedTableSource(
+ new SystemTableSource(((SystemCatalogTable)
origin).table(), isStreamingMode));
} else {
- return new DataTableSource(
- context.getObjectIdentifier(),
- buildPaimonTable(context),
- isStreamingMode,
- context,
- createOptionalLogStoreFactory(context).orElse(null));
+ Table table = buildPaimonTable(context);
+ DataTableSource source =
+ new DataTableSource(
+ context.getObjectIdentifier(),
+ table,
+ isStreamingMode,
+ context,
+
createOptionalLogStoreFactory(context).orElse(null));
+ return new Options(table.options()).get(SCAN_PUSH_DOWN)
+ ? new PushedRichTableSource(source)
+ : new RichTableSource(source);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index f6917ee49..e4030760e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -202,6 +202,14 @@ public class FlinkConnectorOptions {
"Weight of writer buffer in managed memory, Flink
will compute the memory size "
+ "for writer according to the weight, the
actual memory used depends on the running environment.");
+ public static final ConfigOption<Boolean> SCAN_PUSH_DOWN =
+ ConfigOptions.key("scan.push-down")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true, flink will push down projection,
filters, limit to the source. "
+ + "The cost is that it is difficult to
reuse the source in a job.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractDataTableSource.java
deleted file mode 100644
index 5c430986d..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AbstractDataTableSource.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.source;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.LogChangelogMode;
-import org.apache.paimon.CoreOptions.LogConsistency;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
-import org.apache.paimon.flink.PaimonDataStreamScanProvider;
-import org.apache.paimon.flink.log.LogSourceProvider;
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
-import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.utils.Projection;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableFactory;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.stream.IntStream;
-
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
-import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
-
-/**
- * Table source to create {@link StaticFileStoreSource} or {@link
ContinuousFileStoreSource} under
- * batch mode or change-tracking is disabled. For streaming mode with
change-tracking enabled and
- * FULL scan mode, it will create a {@link
- * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
- * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created
by {@link
- * LogSourceProvider}.
- */
-public abstract class AbstractDataTableSource extends FlinkTableSource
- implements LookupTableSource, SupportsWatermarkPushDown {
-
- private final ObjectIdentifier tableIdentifier;
- protected final boolean streaming;
- private final DynamicTableFactory.Context context;
- @Nullable private final LogStoreTableFactory logStoreTableFactory;
-
- @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
-
- protected SplitStatistics splitStatistics;
-
- public AbstractDataTableSource(
- ObjectIdentifier tableIdentifier,
- Table table,
- boolean streaming,
- DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory) {
- this(
- tableIdentifier,
- table,
- streaming,
- context,
- logStoreTableFactory,
- null,
- null,
- null,
- null);
- }
-
- protected AbstractDataTableSource(
- ObjectIdentifier tableIdentifier,
- Table table,
- boolean streaming,
- DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory,
- @Nullable Predicate predicate,
- @Nullable int[][] projectFields,
- @Nullable Long limit,
- @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
- super(table, predicate, projectFields, limit);
- this.tableIdentifier = tableIdentifier;
- this.streaming = streaming;
- this.context = context;
- this.logStoreTableFactory = logStoreTableFactory;
- this.predicate = predicate;
- this.projectFields = projectFields;
- this.limit = limit;
- this.watermarkStrategy = watermarkStrategy;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- if (!streaming) {
- // batch merge all, return insert only
- return ChangelogMode.insertOnly();
- }
-
- if (table instanceof AppendOnlyFileStoreTable) {
- return ChangelogMode.insertOnly();
- } else if (table instanceof ChangelogValueCountFileStoreTable) {
- return ChangelogMode.all();
- } else if (table instanceof ChangelogWithKeyFileStoreTable) {
- Options options = Options.fromMap(table.options());
-
- if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
- return ChangelogMode.all();
- }
-
- if (logStoreTableFactory == null
- && options.get(CHANGELOG_PRODUCER) !=
ChangelogProducer.NONE) {
- return ChangelogMode.all();
- }
-
- // optimization: transaction consistency and all changelog mode
avoid the generation of
- // normalized nodes. See FlinkTableSink.getChangelogMode
validation.
- return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
- && options.get(LOG_CHANGELOG_MODE) ==
LogChangelogMode.ALL
- ? ChangelogMode.all()
- : ChangelogMode.upsert();
- } else {
- throw new UnsupportedOperationException(
- "Unsupported Table subclass "
- + table.getClass().getName()
- + " for streaming mode.");
- }
- }
-
- @Override
- public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
- LogSourceProvider logSourceProvider = null;
- if (logStoreTableFactory != null) {
- logSourceProvider =
- logStoreTableFactory.createSourceProvider(context,
scanContext, projectFields);
- }
-
- WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
- Options options = Options.fromMap(table.options());
- if (watermarkStrategy != null) {
- WatermarkEmitStrategy emitStrategy =
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
- if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
- watermarkStrategy = new
OnEventWatermarkStrategy(watermarkStrategy);
- }
- Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
- if (idleTimeout != null) {
- watermarkStrategy =
watermarkStrategy.withIdleness(idleTimeout);
- }
- String watermarkAlignGroup =
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
- if (watermarkAlignGroup != null) {
- try {
- watermarkStrategy =
- WatermarkAlignUtils.withWatermarkAlignment(
- watermarkStrategy,
- watermarkAlignGroup,
-
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
-
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
- } catch (NoSuchMethodError error) {
- throw new RuntimeException(
- "Flink 1.14 does not support watermark alignment,
please check your Flink version.",
- error);
- }
- }
- }
-
- FlinkSourceBuilder sourceBuilder =
- new FlinkSourceBuilder(tableIdentifier, table)
- .withContinuousMode(streaming)
- .withLogSourceProvider(logSourceProvider)
- .withProjection(projectFields)
- .withPredicate(predicate)
- .withLimit(limit)
- .withWatermarkStrategy(watermarkStrategy);
-
- return new PaimonDataStreamScanProvider(
- !streaming, env -> configureSource(sourceBuilder, env));
- }
-
- private DataStream<RowData> configureSource(
- FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
- Options options = Options.fromMap(this.table.options());
- Integer parallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
- if (parallelism == null &&
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
- if (streaming) {
- parallelism = options.get(CoreOptions.BUCKET);
- } else {
- scanSplitsForInference();
- parallelism = splitStatistics.splitNumber();
- if (null != limit && limit > 0) {
- int limitCount =
- limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE :
limit.intValue();
- parallelism = Math.min(parallelism, limitCount);
- }
-
- parallelism = Math.max(1, parallelism);
- }
- }
-
- return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
- }
-
- protected void scanSplitsForInference() {
- if (splitStatistics == null) {
- List<Split> splits =
-
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
- splitStatistics = new SplitStatistics(splits);
- }
- }
-
- @Override
- public DynamicTableSource copy() {
- return new DataTableSource(
- tableIdentifier,
- table,
- streaming,
- context,
- logStoreTableFactory,
- predicate,
- projectFields,
- limit,
- watermarkStrategy);
- }
-
- @Override
- public String asSummaryString() {
- return "Paimon-DataSource";
- }
-
- @Override
- public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
- this.watermarkStrategy = watermarkStrategy;
- }
-
- @Override
- public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
- if (limit != null) {
- throw new RuntimeException(
- "Limit push down should not happen in Lookup source, but
it is " + limit);
- }
- int[] projection =
- projectFields == null
- ? IntStream.range(0,
table.rowType().getFieldCount()).toArray()
- : Projection.of(projectFields).toTopLevelIndexes();
- int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
- return LookupRuntimeProviderFactory.create(
- new FileStoreLookupFunction(table, projection, joinKey,
predicate));
- }
-
- /** Split statistics for inferring row count and parallelism size. */
- protected static class SplitStatistics {
-
- private final int splitNumber;
- private final long totalRowCount;
-
- private SplitStatistics(List<Split> splits) {
- this.splitNumber = splits.size();
- this.totalRowCount =
splits.stream().mapToLong(Split::rowCount).sum();
- }
-
- public int splitNumber() {
- return splitNumber;
- }
-
- public long totalRowCount() {
- return totalRowCount;
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index fe715e52e..accc39953 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -18,32 +18,92 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.LogChangelogMode;
+import org.apache.paimon.CoreOptions.LogConsistency;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
+import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
+import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
+import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
-import
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
+import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
+import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.TableStats;
import javax.annotation.Nullable;
-/** Data table source with table statistics. */
-public class DataTableSource extends AbstractDataTableSource implements
SupportsStatisticReport {
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+
+/**
+ * Table source to create {@link StaticFileStoreSource} or {@link
ContinuousFileStoreSource} under
+ * batch mode or change-tracking is disabled. For streaming mode with
change-tracking enabled and
+ * FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@code
+ * LogHybridSourceFactory.FlinkHybridFirstSource} and kafka log source created
by {@link
+ * LogSourceProvider}.
+ */
+public class DataTableSource extends FlinkTableSource {
+
+ private final ObjectIdentifier tableIdentifier;
+ protected final boolean streaming;
+ private final DynamicTableFactory.Context context;
+ @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+ @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
+
+ protected SplitStatistics splitStatistics;
+
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
boolean streaming,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
- super(tableIdentifier, table, streaming, context,
logStoreTableFactory);
+ this(
+ tableIdentifier,
+ table,
+ streaming,
+ context,
+ logStoreTableFactory,
+ null,
+ null,
+ null,
+ null);
}
- @VisibleForTesting
public DataTableSource(
ObjectIdentifier tableIdentifier,
Table table,
@@ -54,7 +114,137 @@ public class DataTableSource extends
AbstractDataTableSource implements Supports
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
- super(
+ super(table, predicate, projectFields, limit);
+ this.tableIdentifier = tableIdentifier;
+ this.streaming = streaming;
+ this.context = context;
+ this.logStoreTableFactory = logStoreTableFactory;
+ this.predicate = predicate;
+ this.projectFields = projectFields;
+ this.limit = limit;
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ if (!streaming) {
+ // batch merge all, return insert only
+ return ChangelogMode.insertOnly();
+ }
+
+ if (table instanceof AppendOnlyFileStoreTable) {
+ return ChangelogMode.insertOnly();
+ } else if (table instanceof ChangelogValueCountFileStoreTable) {
+ return ChangelogMode.all();
+ } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+ Options options = Options.fromMap(table.options());
+
+ if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
+ return ChangelogMode.all();
+ }
+
+ if (logStoreTableFactory == null
+ && options.get(CHANGELOG_PRODUCER) !=
ChangelogProducer.NONE) {
+ return ChangelogMode.all();
+ }
+
+ // optimization: transaction consistency and all changelog mode
avoid the generation of
+ // normalized nodes. See FlinkTableSink.getChangelogMode
validation.
+ return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
+ && options.get(LOG_CHANGELOG_MODE) ==
LogChangelogMode.ALL
+ ? ChangelogMode.all()
+ : ChangelogMode.upsert();
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported Table subclass "
+ + table.getClass().getName()
+ + " for streaming mode.");
+ }
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ LogSourceProvider logSourceProvider = null;
+ if (logStoreTableFactory != null) {
+ logSourceProvider =
+ logStoreTableFactory.createSourceProvider(context,
scanContext, projectFields);
+ }
+
+ WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
+ Options options = Options.fromMap(table.options());
+ if (watermarkStrategy != null) {
+ WatermarkEmitStrategy emitStrategy =
options.get(SCAN_WATERMARK_EMIT_STRATEGY);
+ if (emitStrategy == WatermarkEmitStrategy.ON_EVENT) {
+ watermarkStrategy = new
OnEventWatermarkStrategy(watermarkStrategy);
+ }
+ Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
+ if (idleTimeout != null) {
+ watermarkStrategy =
watermarkStrategy.withIdleness(idleTimeout);
+ }
+ String watermarkAlignGroup =
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
+ if (watermarkAlignGroup != null) {
+ try {
+ watermarkStrategy =
+ WatermarkAlignUtils.withWatermarkAlignment(
+ watermarkStrategy,
+ watermarkAlignGroup,
+
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
+
options.get(SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
+ } catch (NoSuchMethodError error) {
+ throw new RuntimeException(
+ "Flink 1.14 does not support watermark alignment,
please check your Flink version.",
+ error);
+ }
+ }
+ }
+
+ FlinkSourceBuilder sourceBuilder =
+ new FlinkSourceBuilder(tableIdentifier, table)
+ .withContinuousMode(streaming)
+ .withLogSourceProvider(logSourceProvider)
+ .withProjection(projectFields)
+ .withPredicate(predicate)
+ .withLimit(limit)
+ .withWatermarkStrategy(watermarkStrategy);
+
+ return new PaimonDataStreamScanProvider(
+ !streaming, env -> configureSource(sourceBuilder, env));
+ }
+
+ private DataStream<RowData> configureSource(
+ FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
+ Options options = Options.fromMap(this.table.options());
+ Integer parallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+ if (parallelism == null &&
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
+ if (streaming) {
+ parallelism = options.get(CoreOptions.BUCKET);
+ } else {
+ scanSplitsForInference();
+ parallelism = splitStatistics.splitNumber();
+ if (null != limit && limit > 0) {
+ int limitCount =
+ limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE :
limit.intValue();
+ parallelism = Math.min(parallelism, limitCount);
+ }
+
+ parallelism = Math.max(1, parallelism);
+ }
+ }
+
+ return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
+ }
+
+ private void scanSplitsForInference() {
+ if (splitStatistics == null) {
+ List<Split> splits =
+
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+ splitStatistics = new SplitStatistics(splits);
+ }
+ }
+
+ @Override
+ public DataTableSource copy() {
+ return new DataTableSource(
tableIdentifier,
table,
streaming,
@@ -66,6 +256,26 @@ public class DataTableSource extends
AbstractDataTableSource implements Supports
watermarkStrategy);
}
+ @Override
+ public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ if (limit != null) {
+ throw new RuntimeException(
+ "Limit push down should not happen in Lookup source, but
it is " + limit);
+ }
+ int[] projection =
+ projectFields == null
+ ? IntStream.range(0,
table.rowType().getFieldCount()).toArray()
+ : Projection.of(projectFields).toTopLevelIndexes();
+ int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
+ return LookupRuntimeProviderFactory.create(
+ new FileStoreLookupFunction(table, projection, joinKey,
predicate));
+ }
+
@Override
public TableStats reportStatistics() {
if (streaming) {
@@ -75,4 +285,29 @@ public class DataTableSource extends
AbstractDataTableSource implements Supports
scanSplitsForInference();
return new TableStats(splitStatistics.totalRowCount());
}
+
+ @Override
+ public String asSummaryString() {
+ return "Paimon-DataSource";
+ }
+
+ /** Split statistics for inferring row count and parallelism size. */
+ protected static class SplitStatistics {
+
+ private final int splitNumber;
+ private final long totalRowCount;
+
+ private SplitStatistics(List<Split> splits) {
+ this.splitNumber = splits.size();
+ this.totalRowCount =
splits.stream().mapToLong(Split::rowCount).sum();
+ }
+
+ public int splitNumber() {
+ return splitNumber;
+ }
+
+ public long totalRowCount() {
+ return totalRowCount;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 143fe0039..382974f43 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -24,11 +24,16 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
+import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
-import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
-import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
+import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
@@ -37,11 +42,7 @@ import java.util.ArrayList;
import java.util.List;
/** A Flink {@link ScanTableSource} for paimon. */
-public abstract class FlinkTableSource
- implements ScanTableSource,
- SupportsFilterPushDown,
- SupportsProjectionPushDown,
- SupportsLimitPushDown {
+public abstract class FlinkTableSource {
protected final Table table;
@@ -64,29 +65,34 @@ public abstract class FlinkTableSource
this.limit = limit;
}
- @Override
- public Result applyFilters(List<ResolvedExpression> filters) {
+ public void pushFilters(List<ResolvedExpression> filters) {
List<Predicate> converted = new ArrayList<>();
RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
for (ResolvedExpression filter : filters) {
PredicateConverter.convert(rowType,
filter).ifPresent(converted::add);
}
predicate = converted.isEmpty() ? null :
PredicateBuilder.and(converted);
- return Result.of(filters, filters);
}
- @Override
- public boolean supportsNestedProjection() {
- return false;
- }
-
- @Override
- public void applyProjection(int[][] projectedFields) {
+ public void pushProjection(int[][] projectedFields) {
this.projectFields = projectedFields;
}
- @Override
- public void applyLimit(long limit) {
+ public void pushLimit(long limit) {
this.limit = limit;
}
+
+ public abstract ChangelogMode getChangelogMode();
+
+ public abstract ScanRuntimeProvider getScanRuntimeProvider(ScanContext
scanContext);
+
+ public abstract void pushWatermark(WatermarkStrategy<RowData>
watermarkStrategy);
+
+ public abstract LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext context);
+
+ public abstract TableStats reportStatistics();
+
+ public abstract FlinkTableSource copy();
+
+ public abstract String asSummaryString();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index 89c55b03a..1ff70844d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -25,11 +25,15 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
+import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.plan.stats.TableStats;
import javax.annotation.Nullable;
@@ -82,7 +86,7 @@ public class SystemTableSource extends FlinkTableSource {
}
@Override
- public DynamicTableSource copy() {
+ public SystemTableSource copy() {
return new SystemTableSource(
table,
isStreamingMode,
@@ -97,4 +101,20 @@ public class SystemTableSource extends FlinkTableSource {
public String asSummaryString() {
return "Paimon-SystemTable-Source";
}
+
+ @Override
+ public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(
+ LookupTableSource.LookupContext context) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TableStats reportStatistics() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java
new file mode 100644
index 000000000..57c4b01a2
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/BaseTableSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.table;
+
+import org.apache.paimon.flink.source.FlinkTableSource;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/** The {@link ScanTableSource} with push down interfaces. */
+public abstract class BaseTableSource implements ScanTableSource {
+
+ private final FlinkTableSource source;
+
+ public BaseTableSource(FlinkTableSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return source.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ return source.getScanRuntimeProvider(runtimeProviderContext);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return source.asSummaryString();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
new file mode 100644
index 000000000..9d5083dda
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedRichTableSource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.table;
+
+import org.apache.paimon.flink.source.FlinkTableSource;
+
+import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.List;
+
+/** A {@link RichTableSource} with push down. */
+public class PushedRichTableSource extends RichTableSource
+ implements SupportsFilterPushDown, SupportsProjectionPushDown,
SupportsLimitPushDown {
+
+ private final FlinkTableSource source;
+
+ public PushedRichTableSource(FlinkTableSource source) {
+ super(source);
+ this.source = source;
+ }
+
+ @Override
+ public PushedRichTableSource copy() {
+ return new PushedRichTableSource(source.copy());
+ }
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ source.pushFilters(filters);
+ return Result.of(filters, filters);
+ }
+
+ @Override
+ public void applyLimit(long limit) {
+ source.pushLimit(limit);
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields) {
+ source.pushProjection(projectedFields);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
new file mode 100644
index 000000000..c5bb17acf
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/PushedTableSource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.table;
+
+import org.apache.paimon.flink.source.FlinkTableSource;
+
+import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.List;
+
+/** The {@link BaseTableSource} with push down. */
+public class PushedTableSource extends BaseTableSource
+ implements SupportsFilterPushDown, SupportsProjectionPushDown,
SupportsLimitPushDown {
+
+ private final FlinkTableSource source;
+
+ public PushedTableSource(FlinkTableSource source) {
+ super(source);
+ this.source = source;
+ }
+
+ @Override
+ public PushedTableSource copy() {
+ return new PushedTableSource(source.copy());
+ }
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ source.pushFilters(filters);
+ return Result.of(filters, filters);
+ }
+
+ @Override
+ public void applyLimit(long limit) {
+ source.pushLimit(limit);
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields) {
+ source.pushProjection(projectedFields);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
new file mode 100644
index 000000000..3fb4234c9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/table/RichTableSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.table;
+
+import org.apache.paimon.flink.source.FlinkTableSource;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.plan.stats.TableStats;
+
+/** The {@link BaseTableSource} with Lookup and watermark. */
+public class RichTableSource extends BaseTableSource
+ implements LookupTableSource, SupportsWatermarkPushDown,
SupportsStatisticReport {
+
+ private final FlinkTableSource source;
+
+ public RichTableSource(FlinkTableSource source) {
+ super(source);
+ this.source = source;
+ }
+
+ @Override
+ public RichTableSource copy() {
+ return new RichTableSource(source.copy());
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ return source.getLookupRuntimeProvider(context);
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ source.pushWatermark(watermarkStrategy);
+ }
+
+ @Override
+ public TableStats reportStatistics() {
+ return source.reportStatistics();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index f8063c941..6e834370a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
@@ -73,6 +74,24 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
+ options);
}
+ @TestTemplate
+ public void testSourceReuse() {
+ sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH
('connector'='print')");
+ sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH
('connector'='print')");
+
+ StatementSet statementSet = sEnv.createStatementSet();
+ statementSet.addInsertSql(
+ "INSERT INTO print1 SELECT a FROM T1 /*+
OPTIONS('scan.push-down' = 'false') */");
+ statementSet.addInsertSql(
+ "INSERT INTO print2 SELECT b FROM T1 /*+
OPTIONS('scan.push-down' = 'false') */");
+ assertThat(statementSet.compilePlan().explain()).contains("Reused");
+
+ statementSet = sEnv.createStatementSet();
+ statementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
+ statementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
+
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
+ }
+
@TestTemplate
public void testWithoutPrimaryKey() throws Exception {
testSimple("T1");