This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 503580834e0 [FLINK-34316] Reduce instantiation of ScanRuntimeProvider
in streaming mode
503580834e0 is described below
commit 503580834e06455bbcaf619a51f8d18812bbe83f
Author: Timo Walther <[email protected]>
AuthorDate: Wed Jan 31 09:34:48 2024 +0100
[FLINK-34316] Reduce instantiation of ScanRuntimeProvider in streaming mode
---
.../apache/flink/table/planner/connectors/DynamicSourceUtils.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 925d1a00b11..2ac8aeafe75 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -498,14 +498,12 @@ public final class DynamicSourceUtils {
ScanTableSource scanSource,
boolean isBatchMode,
ReadableConfig config) {
- final ScanRuntimeProvider provider =
-
scanSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
final ChangelogMode changelogMode = scanSource.getChangelogMode();
validateWatermarks(tableDebugName, schema);
if (isBatchMode) {
- validateScanSourceForBatch(tableDebugName, changelogMode,
provider);
+ validateScanSourceForBatch(tableDebugName, scanSource,
changelogMode);
} else {
validateScanSourceForStreaming(
tableDebugName, schema, scanSource, changelogMode, config);
@@ -558,7 +556,9 @@ public final class DynamicSourceUtils {
}
private static void validateScanSourceForBatch(
- String tableDebugName, ChangelogMode changelogMode,
ScanRuntimeProvider provider) {
+ String tableDebugName, ScanTableSource scanSource, ChangelogMode
changelogMode) {
+ final ScanRuntimeProvider provider =
+
scanSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
// batch only supports bounded source
if (!provider.isBounded()) {
throw new ValidationException(