leonardBang commented on code in PR #4409: URL: https://github.com/apache/flink-cdc/pull/4409#discussion_r3426717261
########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source.discover; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * A {@link TableDiscoverer} that reads the list of subscribed tables from a JDBC database. + * + * <p>This implementation connects to any JDBC-compatible database (e.g., MySQL, PostgreSQL) and + * returns fully-qualified table names parsed as {@link TableId} objects. + * + * <h3>Default mode — shared subscription table (recommended)</h3> + * + * <p>By default, the discoverer assumes that subscriptions for many CDC jobs live in a single + * shared database table, and that each subscription set is identified by a {@code subscribe-id}. + * The discoverer issues a parameterized + * + * <pre>{@code + * SELECT column-name FROM table-name WHERE subscribe-id-column = ? + * }</pre> + * + * <p>using a {@link PreparedStatement} (injection-safe), and only the rows whose subscribe-id + * matches the configured value are returned. + * + * <p><b>Required keys:</b> {@code table.discoverer.jdbc.url}, {@code + * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, {@code + * table.discoverer.jdbc.table-name}, {@code table.discoverer.jdbc.subscribe-id}. + * + * <p><b>Optional keys:</b> {@code table.discoverer.jdbc.column-name} (defaults to {@code + * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} (defaults to {@code + * "subscribe_id"}). + * + * <p><b>Recommended schema:</b> + * + * <pre>{@code + * CREATE TABLE cdc_subscriptions ( + * subscribe_id VARCHAR(64) NOT NULL, + * subscribe_table_name VARCHAR(255) NOT NULL, + * PRIMARY KEY (subscribe_id, subscribe_table_name) + * ); + * INSERT INTO cdc_subscriptions VALUES + * ('orders-subscription', 'source_db.orders'), + * ('orders-subscription', 'source_db.order_items'), + * ('analytics-subscription', 'analytics_db.user_events'); + * }</pre> + * + * <h3>Advanced escape hatch — custom query (overrides the default mode)</h3> + * + * <p>For uncommon layouts (e.g., needing JOINs or extra filters), users may set {@code + * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; column #1 of each row is + * treated as a fully-qualified table name. <b>When this option is set it takes priority over the + * default mode and all of {@code table-name}, {@code column-name}, {@code subscribe-id-column} and + * {@code subscribe-id} are ignored.</b> Use this only when the default schema cannot model your + * subscriptions. + * + * <p>Null values and rows that cannot be parsed into a valid {@link TableId} are silently skipped. + */ +public class JdbcTableDiscoverer implements TableDiscoverer { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JdbcTableDiscoverer.class); + + public static final ConfigOption<String> JDBC_URL = + ConfigOptions.key("table.discoverer.jdbc.url") + .stringType() + .noDefaultValue() + .withDescription("The JDBC connection URL for the table discovery database."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("table.discoverer.jdbc.username") + .stringType() + .noDefaultValue() + .withDescription("The JDBC username for the table discovery database."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("table.discoverer.jdbc.password") + .stringType() + .noDefaultValue() + .withDescription("The JDBC password for the table discovery database."); + + public static final ConfigOption<String> SUBSCRIBE_QUERY = + ConfigOptions.key("table.discoverer.jdbc.subscribe-query") + .stringType() + .noDefaultValue() + .withDescription( + "Custom SELECT statement used to discover subscribed tables. When set, " + + "this takes priority over the shared-table options. Column #1 of " + + "every row must be a fully-qualified table name."); + + public static final ConfigOption<String> TABLE_NAME = + ConfigOptions.key("table.discoverer.jdbc.table-name") + .stringType() + .noDefaultValue() + .withDescription( + "The shared subscription table that stores subscription entries for " + + "one or more CDC jobs. Required in shared-table mode."); + + public static final ConfigOption<String> COLUMN_NAME = + ConfigOptions.key("table.discoverer.jdbc.column-name") + .stringType() + .defaultValue("subscribe_table_name") + .withDescription( + "The column name in the subscription table that contains the " + + "fully-qualified table names to subscribe to."); + + public static final ConfigOption<String> SUBSCRIBE_ID_COLUMN = + ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column") + .stringType() + .defaultValue("subscribe_id") + .withDescription( + "The column name in the subscription table that holds the " + + "subscription-set identifier used for filtering."); + + public static final ConfigOption<String> SUBSCRIBE_ID = + ConfigOptions.key("table.discoverer.jdbc.subscribe-id") + .stringType() + .noDefaultValue() + .withDescription( + "The current subscription-set identifier. Required in shared-table " + + "mode; rows whose subscribe-id column matches this value are " + + "discovered as subscribed tables."); + + /** Compiled SQL to execute on every {@link #discover()} call. */ + private transient String sql; + + /** When non-null, the discoverer runs in shared-table mode and binds this as parameter #1. */ + private transient String subscribeId; + + private transient Connection connection; + + @Override + public void open(Context context) throws Exception { + Configuration config = context.getConfiguration(); + + String jdbcUrl = requireNonEmpty(config, JDBC_URL); + String username = requireNonEmpty(config, USERNAME); + String password = requireNonEmpty(config, PASSWORD); + + String subscribeQuery = config.get(SUBSCRIBE_QUERY); + if (subscribeQuery != null && !subscribeQuery.isEmpty()) { + // Mode A — custom query takes priority. Filter options are intentionally ignored. + this.sql = subscribeQuery; + this.subscribeId = null; + LOG.info( + "JdbcTableDiscoverer running in custom-query mode. URL='{}', query='{}'.", + jdbcUrl, + subscribeQuery); + } else { + // Mode B — shared-table filter; subscribe-id is mandatory. + String tableName = requireNonEmpty(config, TABLE_NAME); + String columnName = config.get(COLUMN_NAME); + String subscribeIdColumn = config.get(SUBSCRIBE_ID_COLUMN); + this.subscribeId = requireNonEmpty(config, SUBSCRIBE_ID); + this.sql = + "SELECT " + + columnName + + " FROM " + + tableName + + " WHERE " + + subscribeIdColumn + + " = ?"; + LOG.info( + "JdbcTableDiscoverer running in shared-table mode. URL='{}', table='{}', " + + "column='{}', subscribeIdColumn='{}', subscribeId='{}'.", + jdbcUrl, + tableName, + columnName, + subscribeIdColumn, + subscribeId); + } + + connection = DriverManager.getConnection(jdbcUrl, username, password); Review Comment: Context now exposes a user code class loader, but it is not used here when opening the JDBC connection. In deployments where the driver only exists in the user or connector classpath, this can still fail with `No suitable driver`. Please explicitly load or register the driver with that class loader before calling `DriverManager.getConnection(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
