This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 0033a7372171c5eeb6d08cbb533f720fcb78d69e Author: JingsongLi <[email protected]> AuthorDate: Thu Jun 12 15:48:33 2025 +0800 [hotfix] Minor refactor for Flink Dedicated Split Gen Source --- docs/content/append-table/query-performance.md | 11 ----------- docs/content/flink/sql-query.md | 17 +++++++++++++++++ docs/content/primary-key-table/query-performance.md | 12 ------------ .../apache/paimon/flink/source/FlinkSourceBuilder.java | 7 ++++--- 4 files changed, 21 insertions(+), 26 deletions(-) diff --git a/docs/content/append-table/query-performance.md b/docs/content/append-table/query-performance.md index d488e4e999..e2128bbd89 100644 --- a/docs/content/append-table/query-performance.md +++ b/docs/content/append-table/query-performance.md @@ -75,14 +75,3 @@ we use the procedure, you should config appropriate configurations in target tab `file-index.<filter-type>.columns` to the table. How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}}) - -## Dedicated Split Generation -When Paimon table snapshots contain large amount of source splits, Flink jobs reading from this table might endure long initialization time or even OOM in JobManagers. In this case, you can configure `'scan.dedicated-split-generation' = 'true'` to avoid such problem. This option would enable executing the source split generation process in a dedicated subtask that runs on TaskManager, instead of in the source coordinator on the JobManager. - -Note that this feature could have some side effects on your Flink jobs. For example: - -1. It will change the DAG of the flink job, thus breaking checkpoint compatibility if enabled on an existing job. -2. It may lead to the Flink AdaptiveBatchScheduler inferring a small parallelism for the source reader operator. you can configure `scan.infer-parallelism` to avoid this possible drawback. -3. The failover strategy of the Flink job would be forced into global failover instead of regional failover, given that the dedicated source split generation task would be connected to all downstream subtasks. - -So please make sure these side effects are acceptable to you before enabling it. diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md index 85940a36e8..c974527358 100644 --- a/docs/content/flink/sql-query.md +++ b/docs/content/flink/sql-query.md @@ -288,3 +288,20 @@ SELECT * FROM orders WHERE order_id=29495; SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495; ``` + +## Dedicated Split Generation + +When Paimon table snapshots contain large amount of source splits, Flink jobs reading from this table might endure long +initialization time or even OOM in JobManagers. In this case, you can configure `'scan.dedicated-split-generation' = 'true'` +to avoid such problem. This option would enable executing the source split generation process in a dedicated subtask +that runs on TaskManager, instead of in the source coordinator on the JobManager. + +Note that this feature could have some side effects on your Flink jobs. For example: + +1. It will change the DAG of the flink job, thus breaking checkpoint compatibility if enabled on an existing job. +2. It may lead to the Flink AdaptiveBatchScheduler inferring a small parallelism for the source reader operator. you can + configure `scan.infer-parallelism` to avoid this possible drawback. +3. The failover strategy of the Flink job would be forced into global failover instead of regional failover, given that + the dedicated source split generation task would be connected to all downstream subtasks. + +So please make sure these side effects are acceptable to you before enabling it. diff --git a/docs/content/primary-key-table/query-performance.md b/docs/content/primary-key-table/query-performance.md index ed0e357fc6..d27c4868fe 100644 --- a/docs/content/primary-key-table/query-performance.md +++ b/docs/content/primary-key-table/query-performance.md @@ -73,15 +73,3 @@ we use the procedure, you should config appropriate configurations in target tab `file-index.<filter-type>.columns` to the table. How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}}) - -## Dedicated Split Generation -When Paimon table snapshots contain large amount of source splits, Flink jobs reading from this table might endure long initialization time or even OOM in JobManagers. In this case, you can configure `'scan.dedicated-split-generation' = 'true'` to avoid such problem. This option would enable executing the source split generation process in a dedicated subtask that runs on TaskManager, instead of in the source coordinator on the JobManager. - -Note that this feature could have some side effects on your Flink jobs. For example: - -1. It will change the DAG of the flink job, thus breaking checkpoint compatibility if enabled on an existing job. -2. It may lead to the Flink AdaptiveBatchScheduler inferring a small parallelism for the source reader operator. you can configure `scan.infer-parallelism` to avoid this possible drawback. -3. The failover strategy of the Flink job would be forced into global failover instead of regional failover, given that the dedicated source split generation task would be connected to all downstream subtasks. - -So please make sure these side effects are acceptable to you before enabling it. - diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index ac776b7cd3..44b5790472 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -78,6 +78,7 @@ import static org.apache.paimon.utils.Preconditions.checkState; * @since 0.8 */ public class FlinkSourceBuilder { + private static final String SOURCE_NAME = "Source"; private final Table table; @@ -295,7 +296,7 @@ public class FlinkSourceBuilder { if (sourceBounded) { if (conf.get(FlinkConnectorOptions.SCAN_DEDICATED_SPLIT_GENERATION)) { - return buildContinuousStreamOperator(true); + return buildDedicatedSplitGenSource(true); } return buildStaticFileSource(); } @@ -328,14 +329,14 @@ public class FlinkSourceBuilder { } else if (conf.contains(CoreOptions.CONSUMER_ID) && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) == CoreOptions.ConsumerMode.EXACTLY_ONCE) { - return buildContinuousStreamOperator(false); + return buildDedicatedSplitGenSource(false); } else { return buildContinuousFileSource(); } } } - private DataStream<RowData> buildContinuousStreamOperator(boolean isBounded) { + private DataStream<RowData> buildDedicatedSplitGenSource(boolean isBounded) { DataStream<RowData> dataStream; if (limit != null && !isBounded) { throw new IllegalArgumentException(
