This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new ea84be62 [FLINK-33463] Use Source API implementation in 
`JdbcDynamicTableSource`
ea84be62 is described below

commit ea84be626268a5bc46c8d469d95b0d1e76790afb
Author: Yuepeng Pan <[email protected]>
AuthorDate: Sun Nov 23 00:13:34 2025 +0800

    [FLINK-33463] Use Source API implementation in `JdbcDynamicTableSource`
---
 .../reader/extractor/RowDataResultExtractor.java   | 41 +++++++++++
 .../jdbc/core/table/JdbcDynamicTableFactory.java   |  4 +-
 .../core/table/source/JdbcDynamicTableSource.java  | 81 +++++++++++++++++-----
 .../core/table/JdbcDynamicTableFactoryTest.java    | 15 ++--
 4 files changed, 118 insertions(+), 23 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowDataResultExtractor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowDataResultExtractor.java
new file mode 100644
index 00000000..e7c78418
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowDataResultExtractor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor;
+
+import 
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/** The result extractor for {@link RowData}. */
+public class RowDataResultExtractor implements ResultExtractor<RowData> {
+
+    private final JdbcDialectConverter jdbcDialectConverter;
+
+    public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) {
+        this.jdbcDialectConverter = 
Preconditions.checkNotNull(jdbcDialectConverter);
+    }
+
+    @Override
+    public RowData extract(ResultSet resultSet) throws SQLException {
+        return jdbcDialectConverter.toInternal(resultSet);
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
index e1c732ee..8c82f455 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
@@ -124,13 +124,15 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 config.get(URL),
                 config.get(COMPATIBLE_MODE),
                 context.getClassLoader());
+        final String tableIdentifier = 
context.getObjectIdentifier().asSummaryString();
         return new JdbcDynamicTableSource(
                 getJdbcOptions(helper.getOptions(), context.getClassLoader()),
                 getJdbcReadOptions(helper.getOptions()),
                 helper.getOptions().get(LookupOptions.MAX_RETRIES),
                 getLookupCache(config),
                 helper.getOptions().get(FILTER_HANDLING_POLICY),
-                context.getPhysicalRowDataType());
+                context.getPhysicalRowDataType(),
+                tableIdentifier);
     }
 
     private static void validateDataTypeWithJdbcDialect(
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
index d26898a5..4de953f2 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
@@ -19,7 +19,12 @@
 package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.RowDataResultExtractor;
 import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
@@ -27,10 +32,14 @@ import 
org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvide
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
@@ -39,6 +48,7 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushD
 import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
 import 
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
@@ -70,6 +80,8 @@ public class JdbcDynamicTableSource
                 SupportsFilterPushDown {
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcDynamicTableSource.class);
 
+    private static final String JDBC_TRANSFORMATION = "jdbc";
+
     private final InternalJdbcConnectionOptions options;
     private final JdbcReadOptions readOptions;
     private final int lookupMaxRetryTimes;
@@ -80,6 +92,7 @@ public class JdbcDynamicTableSource
     private long limit = -1;
     private List<String> resolvedPredicates = new ArrayList<>();
     private Serializable[] pushdownParams = new Serializable[0];
+    private final String tableIdentifier;
 
     public JdbcDynamicTableSource(
             InternalJdbcConnectionOptions options,
@@ -87,7 +100,8 @@ public class JdbcDynamicTableSource
             int lookupMaxRetryTimes,
             @Nullable LookupCache cache,
             FilterHandlingPolicy filterHandlingPolicy,
-            DataType physicalRowDataType) {
+            DataType physicalRowDataType,
+            String tableIdentifier) {
         this.options = options;
         this.readOptions = readOptions;
         this.lookupMaxRetryTimes = lookupMaxRetryTimes;
@@ -95,6 +109,7 @@ public class JdbcDynamicTableSource
         this.filterHandlingPolicy = filterHandlingPolicy;
         this.physicalRowDataType = physicalRowDataType;
         this.dialectName = options.getDialect().dialectName();
+        this.tableIdentifier = tableIdentifier;
     }
 
     @Override
@@ -126,17 +141,17 @@ public class JdbcDynamicTableSource
     }
 
     @Override
-    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-        final JdbcRowDataInputFormat.Builder builder =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(options.getDriverName())
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        final JdbcSourceBuilder<RowData> builder =
+                JdbcSource.<RowData>builder()
+                        .setDriverName(options.getDriverName())
                         .setDBUrl(options.getDbURL())
                         .setUsername(options.getUsername().orElse(null))
                         .setPassword(options.getPassword().orElse(null))
                         .setAutoCommit(readOptions.getAutoCommit());
 
         if (readOptions.getFetchSize() != 0) {
-            builder.setFetchSize(readOptions.getFetchSize());
+            builder.setResultSetFetchSize(readOptions.getFetchSize());
         }
         final JdbcDialect dialect = options.getDialect();
         String query =
@@ -158,19 +173,19 @@ public class JdbcDynamicTableSource
                                     .ofBatchNum(numPartitions),
                             new 
JdbcGenericParameterValuesProvider(allPushdownParams));
 
-            builder.setParametersProvider(allParams);
+            builder.setJdbcParameterValuesProvider(allParams);
 
             predicates.add(
                     
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
                             + " BETWEEN ? AND ?");
         } else {
-            builder.setParametersProvider(
+            builder.setJdbcParameterValuesProvider(
                     new 
JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
         }
 
         predicates.addAll(this.resolvedPredicates);
 
-        if (predicates.size() > 0) {
+        if (!predicates.isEmpty()) {
             String joinedConditions =
                     predicates.stream()
                             .map(pred -> String.format("(%s)", pred))
@@ -184,13 +199,15 @@ public class JdbcDynamicTableSource
 
         LOG.debug("Query generated for JDBC scan: " + query);
 
-        builder.setQuery(query);
+        builder.setSql(query);
         final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
-        builder.setRowConverter(dialect.getRowConverter(rowType));
-        builder.setRowDataTypeInfo(
-                
runtimeProviderContext.createTypeInformation(physicalRowDataType));
-
-        return InputFormatProvider.of(builder.build());
+        builder.setResultExtractor(new 
RowDataResultExtractor(dialect.getRowConverter(rowType)));
+        
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
+        options.getProperties()
+                .forEach(
+                        (key, value) ->
+                                builder.setConnectionProperty(key.toString(), 
value.toString()));
+        return new JdbcDataStreamScanProvider(builder.build(), 
tableIdentifier);
     }
 
     @Override
@@ -218,7 +235,8 @@ public class JdbcDynamicTableSource
                         lookupMaxRetryTimes,
                         cache,
                         filterHandlingPolicy,
-                        physicalRowDataType);
+                        physicalRowDataType,
+                        tableIdentifier);
         newSource.resolvedPredicates = new 
ArrayList<>(this.resolvedPredicates);
         newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, 
this.pushdownParams.length);
         return newSource;
@@ -314,4 +332,33 @@ public class JdbcDynamicTableSource
         }
         return allPushdownParams;
     }
+
+    private static class JdbcDataStreamScanProvider implements 
DataStreamScanProvider {
+
+        private final JdbcSource<RowData> source;
+        private final String tableIdentifier;
+
+        public JdbcDataStreamScanProvider(JdbcSource<RowData> source, String 
tableIdentifier) {
+            this.source = Preconditions.checkNotNull(source);
+            this.tableIdentifier = Preconditions.checkNotNull(tableIdentifier);
+        }
+
+        @Override
+        public DataStream<RowData> produceDataStream(
+                ProviderContext providerContext, StreamExecutionEnvironment 
execEnv) {
+            DataStreamSource<RowData> sourceStream =
+                    execEnv.fromSource(
+                            source,
+                            WatermarkStrategy.noWatermarks(),
+                            String.format(
+                                    "%s-%s", JdbcSource.class.getSimpleName(), 
tableIdentifier));
+            
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
+            return sourceStream;
+        }
+
+        @Override
+        public boolean isBounded() {
+            return source.getBoundedness() == Boundedness.BOUNDED;
+        }
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
index 4e799a62..a8f0dd64 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
@@ -91,7 +91,8 @@ class JdbcDynamicTableFactoryTest {
                         LookupOptions.MAX_RETRIES.defaultValue(),
                         null,
                         FilterHandlingPolicy.NEVER,
-                        SCHEMA.toPhysicalRowDataType());
+                        SCHEMA.toPhysicalRowDataType(),
+                        "anonymous");
         assertThat(actualSource).isEqualTo(expectedSource);
 
         // validation for sink
@@ -149,7 +150,8 @@ class JdbcDynamicTableFactoryTest {
                         LookupOptions.MAX_RETRIES.defaultValue(),
                         null,
                         
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
-                        SCHEMA.toPhysicalRowDataType());
+                        SCHEMA.toPhysicalRowDataType(),
+                        "anonymous");
 
         assertThat(actual).isEqualTo(expected);
     }
@@ -178,7 +180,8 @@ class JdbcDynamicTableFactoryTest {
                         10,
                         
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
                         
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
-                        SCHEMA.toPhysicalRowDataType());
+                        SCHEMA.toPhysicalRowDataType(),
+                        "anonymous");
 
         assertThat(actual).isEqualTo(expected);
     }
@@ -207,7 +210,8 @@ class JdbcDynamicTableFactoryTest {
                                 .expireAfterWrite(Duration.ofSeconds(10))
                                 .build(),
                         
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
-                        SCHEMA.toPhysicalRowDataType());
+                        SCHEMA.toPhysicalRowDataType(),
+                        "anonymous");
 
         assertThat(actual).isEqualTo(expected);
     }
@@ -393,7 +397,8 @@ class JdbcDynamicTableFactoryTest {
                                 .expireAfterWrite(Duration.ofSeconds(10))
                                 .build(),
                         
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
-                        SCHEMA.toPhysicalRowDataType());
+                        SCHEMA.toPhysicalRowDataType(),
+                        "anonymous");
 
         assertThat(actual).isEqualTo(expected);
     }

Reply via email to