This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new ea84be62 [FLINK-33463] Use Source API implementation in
`JdbcDynamicTableSource`
ea84be62 is described below
commit ea84be626268a5bc46c8d469d95b0d1e76790afb
Author: Yuepeng Pan <[email protected]>
AuthorDate: Sun Nov 23 00:13:34 2025 +0800
[FLINK-33463] Use Source API implementation in `JdbcDynamicTableSource`
---
.../reader/extractor/RowDataResultExtractor.java | 41 +++++++++++
.../jdbc/core/table/JdbcDynamicTableFactory.java | 4 +-
.../core/table/source/JdbcDynamicTableSource.java | 81 +++++++++++++++++-----
.../core/table/JdbcDynamicTableFactoryTest.java | 15 ++--
4 files changed, 118 insertions(+), 23 deletions(-)
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowDataResultExtractor.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowDataResultExtractor.java
new file mode 100644
index 00000000..e7c78418
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowDataResultExtractor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor;
+
+import
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/** The result extractor for {@link RowData}. */
+public class RowDataResultExtractor implements ResultExtractor<RowData> {
+
+ private final JdbcDialectConverter jdbcDialectConverter;
+
+ public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) {
+ this.jdbcDialectConverter =
Preconditions.checkNotNull(jdbcDialectConverter);
+ }
+
+ @Override
+ public RowData extract(ResultSet resultSet) throws SQLException {
+ return jdbcDialectConverter.toInternal(resultSet);
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
index e1c732ee..8c82f455 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
@@ -124,13 +124,15 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
config.get(URL),
config.get(COMPATIBLE_MODE),
context.getClassLoader());
+ final String tableIdentifier =
context.getObjectIdentifier().asSummaryString();
return new JdbcDynamicTableSource(
getJdbcOptions(helper.getOptions(), context.getClassLoader()),
getJdbcReadOptions(helper.getOptions()),
helper.getOptions().get(LookupOptions.MAX_RETRIES),
getLookupCache(config),
helper.getOptions().get(FILTER_HANDLING_POLICY),
- context.getPhysicalRowDataType());
+ context.getPhysicalRowDataType(),
+ tableIdentifier);
}
private static void validateDataTypeWithJdbcDialect(
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
index d26898a5..4de953f2 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
@@ -19,7 +19,12 @@
package org.apache.flink.connector.jdbc.core.table.source;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
+import
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
+import
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.RowDataResultExtractor;
import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
@@ -27,10 +32,14 @@ import
org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvide
import
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
import
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
@@ -39,6 +48,7 @@ import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushD
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
@@ -70,6 +80,8 @@ public class JdbcDynamicTableSource
SupportsFilterPushDown {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcDynamicTableSource.class);
+ private static final String JDBC_TRANSFORMATION = "jdbc";
+
private final InternalJdbcConnectionOptions options;
private final JdbcReadOptions readOptions;
private final int lookupMaxRetryTimes;
@@ -80,6 +92,7 @@ public class JdbcDynamicTableSource
private long limit = -1;
private List<String> resolvedPredicates = new ArrayList<>();
private Serializable[] pushdownParams = new Serializable[0];
+ private final String tableIdentifier;
public JdbcDynamicTableSource(
InternalJdbcConnectionOptions options,
@@ -87,7 +100,8 @@ public class JdbcDynamicTableSource
int lookupMaxRetryTimes,
@Nullable LookupCache cache,
FilterHandlingPolicy filterHandlingPolicy,
- DataType physicalRowDataType) {
+ DataType physicalRowDataType,
+ String tableIdentifier) {
this.options = options;
this.readOptions = readOptions;
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
@@ -95,6 +109,7 @@ public class JdbcDynamicTableSource
this.filterHandlingPolicy = filterHandlingPolicy;
this.physicalRowDataType = physicalRowDataType;
this.dialectName = options.getDialect().dialectName();
+ this.tableIdentifier = tableIdentifier;
}
@Override
@@ -126,17 +141,17 @@ public class JdbcDynamicTableSource
}
@Override
- public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
- final JdbcRowDataInputFormat.Builder builder =
- JdbcRowDataInputFormat.builder()
- .setDrivername(options.getDriverName())
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ final JdbcSourceBuilder<RowData> builder =
+ JdbcSource.<RowData>builder()
+ .setDriverName(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null))
.setAutoCommit(readOptions.getAutoCommit());
if (readOptions.getFetchSize() != 0) {
- builder.setFetchSize(readOptions.getFetchSize());
+ builder.setResultSetFetchSize(readOptions.getFetchSize());
}
final JdbcDialect dialect = options.getDialect();
String query =
@@ -158,19 +173,19 @@ public class JdbcDynamicTableSource
.ofBatchNum(numPartitions),
new
JdbcGenericParameterValuesProvider(allPushdownParams));
- builder.setParametersProvider(allParams);
+ builder.setJdbcParameterValuesProvider(allParams);
predicates.add(
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+ " BETWEEN ? AND ?");
} else {
- builder.setParametersProvider(
+ builder.setJdbcParameterValuesProvider(
new
JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
}
predicates.addAll(this.resolvedPredicates);
- if (predicates.size() > 0) {
+ if (!predicates.isEmpty()) {
String joinedConditions =
predicates.stream()
.map(pred -> String.format("(%s)", pred))
@@ -184,13 +199,15 @@ public class JdbcDynamicTableSource
LOG.debug("Query generated for JDBC scan: " + query);
- builder.setQuery(query);
+ builder.setSql(query);
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
- builder.setRowConverter(dialect.getRowConverter(rowType));
- builder.setRowDataTypeInfo(
-
runtimeProviderContext.createTypeInformation(physicalRowDataType));
-
- return InputFormatProvider.of(builder.build());
+ builder.setResultExtractor(new
RowDataResultExtractor(dialect.getRowConverter(rowType)));
+
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
+ options.getProperties()
+ .forEach(
+ (key, value) ->
+ builder.setConnectionProperty(key.toString(),
value.toString()));
+ return new JdbcDataStreamScanProvider(builder.build(),
tableIdentifier);
}
@Override
@@ -218,7 +235,8 @@ public class JdbcDynamicTableSource
lookupMaxRetryTimes,
cache,
filterHandlingPolicy,
- physicalRowDataType);
+ physicalRowDataType,
+ tableIdentifier);
newSource.resolvedPredicates = new
ArrayList<>(this.resolvedPredicates);
newSource.pushdownParams = Arrays.copyOf(this.pushdownParams,
this.pushdownParams.length);
return newSource;
@@ -314,4 +332,33 @@ public class JdbcDynamicTableSource
}
return allPushdownParams;
}
+
+ private static class JdbcDataStreamScanProvider implements
DataStreamScanProvider {
+
+ private final JdbcSource<RowData> source;
+ private final String tableIdentifier;
+
+ public JdbcDataStreamScanProvider(JdbcSource<RowData> source, String
tableIdentifier) {
+ this.source = Preconditions.checkNotNull(source);
+ this.tableIdentifier = Preconditions.checkNotNull(tableIdentifier);
+ }
+
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext, StreamExecutionEnvironment
execEnv) {
+ DataStreamSource<RowData> sourceStream =
+ execEnv.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ String.format(
+ "%s-%s", JdbcSource.class.getSimpleName(),
tableIdentifier));
+
providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid);
+ return sourceStream;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return source.getBoundedness() == Boundedness.BOUNDED;
+ }
+ }
}
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
index 4e799a62..a8f0dd64 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
@@ -91,7 +91,8 @@ class JdbcDynamicTableFactoryTest {
LookupOptions.MAX_RETRIES.defaultValue(),
null,
FilterHandlingPolicy.NEVER,
- SCHEMA.toPhysicalRowDataType());
+ SCHEMA.toPhysicalRowDataType(),
+ "anonymous");
assertThat(actualSource).isEqualTo(expectedSource);
// validation for sink
@@ -149,7 +150,8 @@ class JdbcDynamicTableFactoryTest {
LookupOptions.MAX_RETRIES.defaultValue(),
null,
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
- SCHEMA.toPhysicalRowDataType());
+ SCHEMA.toPhysicalRowDataType(),
+ "anonymous");
assertThat(actual).isEqualTo(expected);
}
@@ -178,7 +180,8 @@ class JdbcDynamicTableFactoryTest {
10,
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
- SCHEMA.toPhysicalRowDataType());
+ SCHEMA.toPhysicalRowDataType(),
+ "anonymous");
assertThat(actual).isEqualTo(expected);
}
@@ -207,7 +210,8 @@ class JdbcDynamicTableFactoryTest {
.expireAfterWrite(Duration.ofSeconds(10))
.build(),
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
- SCHEMA.toPhysicalRowDataType());
+ SCHEMA.toPhysicalRowDataType(),
+ "anonymous");
assertThat(actual).isEqualTo(expected);
}
@@ -393,7 +397,8 @@ class JdbcDynamicTableFactoryTest {
.expireAfterWrite(Duration.ofSeconds(10))
.build(),
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
- SCHEMA.toPhysicalRowDataType());
+ SCHEMA.toPhysicalRowDataType(),
+ "anonymous");
assertThat(actual).isEqualTo(expected);
}