This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 503580834e0 [FLINK-34316] Reduce instantiation of ScanRuntimeProvider 
in streaming mode
503580834e0 is described below

commit 503580834e06455bbcaf619a51f8d18812bbe83f
Author: Timo Walther <[email protected]>
AuthorDate: Wed Jan 31 09:34:48 2024 +0100

    [FLINK-34316] Reduce instantiation of ScanRuntimeProvider in streaming mode
---
 .../apache/flink/table/planner/connectors/DynamicSourceUtils.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 925d1a00b11..2ac8aeafe75 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -498,14 +498,12 @@ public final class DynamicSourceUtils {
             ScanTableSource scanSource,
             boolean isBatchMode,
             ReadableConfig config) {
-        final ScanRuntimeProvider provider =
-                
scanSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         final ChangelogMode changelogMode = scanSource.getChangelogMode();
 
         validateWatermarks(tableDebugName, schema);
 
         if (isBatchMode) {
-            validateScanSourceForBatch(tableDebugName, changelogMode, 
provider);
+            validateScanSourceForBatch(tableDebugName, scanSource, 
changelogMode);
         } else {
             validateScanSourceForStreaming(
                     tableDebugName, schema, scanSource, changelogMode, config);
@@ -558,7 +556,9 @@ public final class DynamicSourceUtils {
     }
 
     private static void validateScanSourceForBatch(
-            String tableDebugName, ChangelogMode changelogMode, 
ScanRuntimeProvider provider) {
+            String tableDebugName, ScanTableSource scanSource, ChangelogMode 
changelogMode) {
+        final ScanRuntimeProvider provider =
+                
scanSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         // batch only supports bounded source
         if (!provider.isBounded()) {
             throw new ValidationException(

Reply via email to