Copilot commented on code in PR #4409:
URL: https://github.com/apache/flink-cdc/pull/4409#discussion_r3310537264


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.util.HashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+/**
+ * SPI factory that creates a {@link TableDiscoverer}. Implementations are 
discovered at runtime via
+ * Java's {@link ServiceLoader} mechanism. To register a new discoverer type, 
add a line with the
+ * fully-qualified factory class name to:
+ *
+ * <pre>{@code
+ * 
META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory
+ * }</pre>
+ *
+ * <p>The discoverer type is selected by the user via the {@code 
table.discoverer.type} option,
+ * which is matched (case-insensitively) against {@link #identifier()}.
+ */
+@PublicEvolving
+public interface TableDiscovererFactory {
+
+    /**
+     * The unique identifier of this discoverer factory, used to match against 
the {@code
+     * table.discoverer.type} option. Must be lowercase, e.g. {@code "jdbc"} 
or {@code
+     * "fluss-default"}.
+     */
+    String identifier();
+
+    /**
+     * Creates a new {@link TableDiscoverer} instance. The returned discoverer 
is not yet
+     * initialized; the caller must invoke {@link 
TableDiscoverer#open(TableDiscoverer.Context)}
+     * before calling {@link TableDiscoverer#discover()}.
+     */
+    TableDiscoverer createDiscoverer();
+
+    /**
+     * Creates a {@link TableDiscoverer.Context} with the given configuration 
and class loader. This
+     * is a convenience factory method so that callers do not need to 
implement the {@link
+     * TableDiscoverer.Context} interface themselves.
+     *
+     * @param configuration The full connector configuration.
+     * @param classLoader The user code class loader.
+     * @return A new {@link TableDiscoverer.Context} instance.
+     */
+    static TableDiscoverer.Context createContext(
+            Configuration configuration, ClassLoader classLoader) {
+        return new DefaultDiscovererContext(configuration, classLoader);
+    }

Review Comment:
   `createContext(...)` passes through a potentially null `classLoader`, which 
can lead to `Context#getUserCodeClassLoader()` returning null and forcing 
discoverer implementations to defensively handle nulls. Consider applying the 
same fallback-to-TCCL behavior used in `createDiscoverer(...)` so the context 
always exposes a usable classloader.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.util.HashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+/**
+ * SPI factory that creates a {@link TableDiscoverer}. Implementations are 
discovered at runtime via
+ * Java's {@link ServiceLoader} mechanism. To register a new discoverer type, 
add a line with the
+ * fully-qualified factory class name to:
+ *
+ * <pre>{@code
+ * 
META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory
+ * }</pre>
+ *
+ * <p>The discoverer type is selected by the user via the {@code 
table.discoverer.type} option,
+ * which is matched (case-insensitively) against {@link #identifier()}.
+ */
+@PublicEvolving
+public interface TableDiscovererFactory {
+
+    /**
+     * The unique identifier of this discoverer factory, used to match against 
the {@code
+     * table.discoverer.type} option. Must be lowercase, e.g. {@code "jdbc"} 
or {@code
+     * "fluss-default"}.
+     */
+    String identifier();
+
+    /**
+     * Creates a new {@link TableDiscoverer} instance. The returned discoverer 
is not yet
+     * initialized; the caller must invoke {@link 
TableDiscoverer#open(TableDiscoverer.Context)}
+     * before calling {@link TableDiscoverer#discover()}.
+     */
+    TableDiscoverer createDiscoverer();
+
+    /**
+     * Creates a {@link TableDiscoverer.Context} with the given configuration 
and class loader. This
+     * is a convenience factory method so that callers do not need to 
implement the {@link
+     * TableDiscoverer.Context} interface themselves.
+     *
+     * @param configuration The full connector configuration.
+     * @param classLoader The user code class loader.
+     * @return A new {@link TableDiscoverer.Context} instance.
+     */
+    static TableDiscoverer.Context createContext(
+            Configuration configuration, ClassLoader classLoader) {
+        return new DefaultDiscovererContext(configuration, classLoader);
+    }
+
+    /**
+     * Utility method that discovers a {@link TableDiscovererFactory} via SPI 
whose {@link
+     * #identifier()} matches the given {@code type}, and delegates discoverer 
creation to it.
+     *
+     * @param type The discoverer type identifier (e.g. "jdbc", 
"fluss-default").
+     * @param classLoader The class loader used for SPI discovery.
+     * @return A new, uninitialized {@link TableDiscoverer}.
+     * @throws IllegalArgumentException if no factory matches the given type.
+     * @throws IllegalStateException if multiple factories share the same 
identifier.
+     */
+    static TableDiscoverer createDiscoverer(String type, ClassLoader 
classLoader) {
+        ClassLoader loader =
+                classLoader != null ? classLoader : 
Thread.currentThread().getContextClassLoader();
+        ServiceLoader<TableDiscovererFactory> serviceLoader =
+                ServiceLoader.load(TableDiscovererFactory.class, loader);
+

Review Comment:
   `createDiscoverer(...)` calls `factory.identifier().equalsIgnoreCase(type)` 
without validating `type`; if `type` is null, this will throw 
`NullPointerException` instead of a clear `IllegalArgumentException`. 
Validating and trimming `type` up-front will make configuration errors easier 
to diagnose.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a 
JDBC database.
+ *
+ * <p>This implementation connects to any JDBC-compatible database (e.g., 
MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ * <h3>Default mode — shared subscription table (recommended)</h3>
+ *
+ * <p>By default, the discoverer assumes that subscriptions for many CDC jobs 
live in a single
+ * shared database table, and that each subscription set is identified by a 
{@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ * <pre>{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }</pre>
+ *
+ * <p>using a {@link PreparedStatement} (injection-safe), and only the rows 
whose subscribe-id
+ * matches the configured value are returned.

Review Comment:
   The Javadoc claims the shared-table query is "injection-safe" due to 
`PreparedStatement`, but only the `subscribe-id` value is parameterized; 
`table-name` / `column-name` / `subscribe-id-column` are still 
string-concatenated into SQL. This wording is misleading—either validate/quote 
identifiers or adjust the Javadoc to clarify what is actually parameterized.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a 
JDBC database.
+ *
+ * <p>This implementation connects to any JDBC-compatible database (e.g., 
MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ * <h3>Default mode — shared subscription table (recommended)</h3>
+ *
+ * <p>By default, the discoverer assumes that subscriptions for many CDC jobs 
live in a single
+ * shared database table, and that each subscription set is identified by a 
{@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ * <pre>{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }</pre>
+ *
+ * <p>using a {@link PreparedStatement} (injection-safe), and only the rows 
whose subscribe-id
+ * matches the configured value are returned.
+ *
+ * <p><b>Required keys:</b> {@code table.discoverer.jdbc.url}, {@code
+ * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, 
{@code
+ * table.discoverer.jdbc.table-name}, {@code 
table.discoverer.jdbc.subscribe-id}.
+ *
+ * <p><b>Optional keys:</b> {@code table.discoverer.jdbc.column-name} 
(defaults to {@code
+ * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} 
(defaults to {@code
+ * "subscribe_id"}).
+ *
+ * <p><b>Recommended schema:</b>
+ *
+ * <pre>{@code
+ * CREATE TABLE cdc_subscriptions (
+ *     subscribe_id         VARCHAR(64)  NOT NULL,
+ *     subscribe_table_name VARCHAR(255) NOT NULL,
+ *     PRIMARY KEY (subscribe_id, subscribe_table_name)
+ * );
+ * INSERT INTO cdc_subscriptions VALUES
+ *   ('orders-subscription',    'source_db.orders'),
+ *   ('orders-subscription',    'source_db.order_items'),
+ *   ('analytics-subscription', 'analytics_db.user_events');
+ * }</pre>
+ *
+ * <h3>Advanced escape hatch — custom query (overrides the default mode)</h3>
+ *
+ * <p>For uncommon layouts (e.g., needing JOINs or extra filters), users may 
set {@code
+ * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; 
column #1 of each row is
+ * treated as a fully-qualified table name. <b>When this option is set it 
takes priority over the
+ * default mode and all of {@code table-name}, {@code column-name}, {@code 
subscribe-id-column} and
+ * {@code subscribe-id} are ignored.</b> Use this only when the default schema 
cannot model your
+ * subscriptions.
+ *
+ * <p>Null values and rows that cannot be parsed into a valid {@link TableId} 
are silently skipped.
+ */
+public class JdbcTableDiscoverer implements TableDiscoverer {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcTableDiscoverer.class);
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("table.discoverer.jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC connection URL for the table 
discovery database.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("table.discoverer.jdbc.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC username for the table 
discovery database.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("table.discoverer.jdbc.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC password for the table 
discovery database.");
+
+    public static final ConfigOption<String> SUBSCRIBE_QUERY =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom SELECT statement used to discover 
subscribed tables. When set, "
+                                    + "this takes priority over the 
shared-table options. Column #1 of "
+                                    + "every row must be a fully-qualified 
table name.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The shared subscription table that stores 
subscription entries for "
+                                    + "one or more CDC jobs. Required in 
shared-table mode.");
+
+    public static final ConfigOption<String> COLUMN_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.column-name")
+                    .stringType()
+                    .defaultValue("subscribe_table_name")
+                    .withDescription(
+                            "The column name in the subscription table that 
contains the "
+                                    + "fully-qualified table names to 
subscribe to.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID_COLUMN =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column")
+                    .stringType()
+                    .defaultValue("subscribe_id")
+                    .withDescription(
+                            "The column name in the subscription table that 
holds the "
+                                    + "subscription-set identifier used for 
filtering.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The current subscription-set identifier. Required 
in shared-table "
+                                    + "mode; rows whose subscribe-id column 
matches this value are "
+                                    + "discovered as subscribed tables.");
+
+    /** Compiled SQL to execute on every {@link #discover()} call. */
+    private transient String sql;
+
+    /** When non-null, the discoverer runs in shared-table mode and binds this 
as parameter #1. */
+    private transient String subscribeId;
+
+    private transient Connection connection;
+
+    @Override
+    public void open(Context context) throws Exception {
+        Configuration config = context.getConfiguration();
+
+        String jdbcUrl = requireNonEmpty(config, JDBC_URL);
+        String username = requireNonEmpty(config, USERNAME);
+        String password = requireNonEmpty(config, PASSWORD);
+
+        String subscribeQuery = config.get(SUBSCRIBE_QUERY);
+        if (subscribeQuery != null && !subscribeQuery.isEmpty()) {
+            // Mode A — custom query takes priority. Filter options are 
intentionally ignored.
+            this.sql = subscribeQuery;

Review Comment:
   The mode labels in the inline comments are swapped relative to the 
class-level docs/PR description (shared-table is the default mode; custom-query 
is the escape hatch). This is minor, but it’s easy to misread when maintaining 
the code.
   



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java:
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.source.discover.JdbcTableDiscoverer;
+import org.apache.flink.cdc.common.source.discover.TableDiscoverer;
+import org.apache.flink.cdc.common.source.discover.TableDiscovererFactory;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for {@link JdbcTableDiscoverer} and {@link 
TableDiscovererFactory} SPI loading.
+ * Uses a real MySQL container via Testcontainers to verify both the default 
shared-table mode and
+ * the advanced custom-query escape hatch.
+ */
+@Testcontainers
+class JdbcTableDiscovererITCase {
+
+    private static final String SUBSCRIBE_ID_ORDERS = "orders-subscription";
+    private static final String SUBSCRIBE_ID_ANALYTICS = 
"analytics-subscription";
+
+    @Container
+    private static final MySQLContainer<?> MYSQL =
+            new MySQLContainer<>("mysql:8.0")
+                    .withDatabaseName("meta_db")

Review Comment:
   Using the floating Docker tag `mysql:8.0` can make the IT case 
non-reproducible (the tag can move to newer patch versions and occasionally 
break tests). Pinning to a specific patch version keeps CI stable.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a 
JDBC database.
+ *
+ * <p>This implementation connects to any JDBC-compatible database (e.g., 
MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ * <h3>Default mode — shared subscription table (recommended)</h3>
+ *
+ * <p>By default, the discoverer assumes that subscriptions for many CDC jobs 
live in a single
+ * shared database table, and that each subscription set is identified by a 
{@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ * <pre>{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }</pre>
+ *
+ * <p>using a {@link PreparedStatement} (injection-safe), and only the rows 
whose subscribe-id
+ * matches the configured value are returned.
+ *
+ * <p><b>Required keys:</b> {@code table.discoverer.jdbc.url}, {@code
+ * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, 
{@code
+ * table.discoverer.jdbc.table-name}, {@code 
table.discoverer.jdbc.subscribe-id}.
+ *
+ * <p><b>Optional keys:</b> {@code table.discoverer.jdbc.column-name} 
(defaults to {@code
+ * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} 
(defaults to {@code
+ * "subscribe_id"}).
+ *
+ * <p><b>Recommended schema:</b>
+ *
+ * <pre>{@code
+ * CREATE TABLE cdc_subscriptions (
+ *     subscribe_id         VARCHAR(64)  NOT NULL,
+ *     subscribe_table_name VARCHAR(255) NOT NULL,
+ *     PRIMARY KEY (subscribe_id, subscribe_table_name)
+ * );
+ * INSERT INTO cdc_subscriptions VALUES
+ *   ('orders-subscription',    'source_db.orders'),
+ *   ('orders-subscription',    'source_db.order_items'),
+ *   ('analytics-subscription', 'analytics_db.user_events');
+ * }</pre>
+ *
+ * <h3>Advanced escape hatch — custom query (overrides the default mode)</h3>
+ *
+ * <p>For uncommon layouts (e.g., needing JOINs or extra filters), users may 
set {@code
+ * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; 
column #1 of each row is
+ * treated as a fully-qualified table name. <b>When this option is set it 
takes priority over the
+ * default mode and all of {@code table-name}, {@code column-name}, {@code 
subscribe-id-column} and
+ * {@code subscribe-id} are ignored.</b> Use this only when the default schema 
cannot model your
+ * subscriptions.
+ *
+ * <p>Null values and rows that cannot be parsed into a valid {@link TableId} 
are silently skipped.
+ */
+public class JdbcTableDiscoverer implements TableDiscoverer {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcTableDiscoverer.class);
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("table.discoverer.jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC connection URL for the table 
discovery database.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("table.discoverer.jdbc.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC username for the table 
discovery database.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("table.discoverer.jdbc.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC password for the table 
discovery database.");
+
+    public static final ConfigOption<String> SUBSCRIBE_QUERY =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom SELECT statement used to discover 
subscribed tables. When set, "
+                                    + "this takes priority over the 
shared-table options. Column #1 of "
+                                    + "every row must be a fully-qualified 
table name.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The shared subscription table that stores 
subscription entries for "
+                                    + "one or more CDC jobs. Required in 
shared-table mode.");
+
+    public static final ConfigOption<String> COLUMN_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.column-name")
+                    .stringType()
+                    .defaultValue("subscribe_table_name")
+                    .withDescription(
+                            "The column name in the subscription table that 
contains the "
+                                    + "fully-qualified table names to 
subscribe to.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID_COLUMN =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column")
+                    .stringType()
+                    .defaultValue("subscribe_id")
+                    .withDescription(
+                            "The column name in the subscription table that 
holds the "
+                                    + "subscription-set identifier used for 
filtering.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The current subscription-set identifier. Required 
in shared-table "
+                                    + "mode; rows whose subscribe-id column 
matches this value are "
+                                    + "discovered as subscribed tables.");
+
+    /** Compiled SQL to execute on every {@link #discover()} call. */
+    private transient String sql;
+
+    /** When non-null, the discoverer runs in shared-table mode and binds this 
as parameter #1. */
+    private transient String subscribeId;
+
+    private transient Connection connection;
+
+    @Override
+    public void open(Context context) throws Exception {
+        Configuration config = context.getConfiguration();
+
+        String jdbcUrl = requireNonEmpty(config, JDBC_URL);
+        String username = requireNonEmpty(config, USERNAME);
+        String password = requireNonEmpty(config, PASSWORD);
+
+        String subscribeQuery = config.get(SUBSCRIBE_QUERY);
+        if (subscribeQuery != null && !subscribeQuery.isEmpty()) {
+            // Mode A — custom query takes priority. Filter options are 
intentionally ignored.
+            this.sql = subscribeQuery;
+            this.subscribeId = null;
+            LOG.info(
+                    "JdbcTableDiscoverer running in custom-query mode. 
URL='{}', query='{}'.",
+                    jdbcUrl,
+                    subscribeQuery);
+        } else {
+            // Mode B — shared-table filter; subscribe-id is mandatory.
+            String tableName = requireNonEmpty(config, TABLE_NAME);

Review Comment:
   Same as above: this comment calls the shared-table path "Mode B", but it is 
the default mode in the surrounding Javadoc/PR description. Aligning 
terminology reduces confusion.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a 
JDBC database.
+ *
+ * <p>This implementation connects to any JDBC-compatible database (e.g., 
MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ * <h3>Default mode — shared subscription table (recommended)</h3>
+ *
+ * <p>By default, the discoverer assumes that subscriptions for many CDC jobs 
live in a single
+ * shared database table, and that each subscription set is identified by a 
{@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ * <pre>{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }</pre>
+ *
+ * <p>using a {@link PreparedStatement} (injection-safe), and only the rows 
whose subscribe-id
+ * matches the configured value are returned.
+ *
+ * <p><b>Required keys:</b> {@code table.discoverer.jdbc.url}, {@code
+ * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, 
{@code
+ * table.discoverer.jdbc.table-name}, {@code 
table.discoverer.jdbc.subscribe-id}.
+ *
+ * <p><b>Optional keys:</b> {@code table.discoverer.jdbc.column-name} 
(defaults to {@code
+ * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} 
(defaults to {@code
+ * "subscribe_id"}).
+ *
+ * <p><b>Recommended schema:</b>
+ *
+ * <pre>{@code
+ * CREATE TABLE cdc_subscriptions (
+ *     subscribe_id         VARCHAR(64)  NOT NULL,
+ *     subscribe_table_name VARCHAR(255) NOT NULL,
+ *     PRIMARY KEY (subscribe_id, subscribe_table_name)
+ * );
+ * INSERT INTO cdc_subscriptions VALUES
+ *   ('orders-subscription',    'source_db.orders'),
+ *   ('orders-subscription',    'source_db.order_items'),
+ *   ('analytics-subscription', 'analytics_db.user_events');
+ * }</pre>
+ *
+ * <h3>Advanced escape hatch — custom query (overrides the default mode)</h3>
+ *
+ * <p>For uncommon layouts (e.g., needing JOINs or extra filters), users may 
set {@code
+ * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; 
column #1 of each row is
+ * treated as a fully-qualified table name. <b>When this option is set it 
takes priority over the
+ * default mode and all of {@code table-name}, {@code column-name}, {@code 
subscribe-id-column} and
+ * {@code subscribe-id} are ignored.</b> Use this only when the default schema 
cannot model your
+ * subscriptions.
+ *
+ * <p>Null values and rows that cannot be parsed into a valid {@link TableId} 
are silently skipped.
+ */

Review Comment:
   The Javadoc says invalid/NULL rows are "silently skipped", but the 
implementation logs a WARN for invalid table names. Keeping the Javadoc 
consistent with the actual behavior will avoid surprising users/operators.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a 
JDBC database.
+ *
+ * <p>This implementation connects to any JDBC-compatible database (e.g., 
MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ * <h3>Default mode — shared subscription table (recommended)</h3>
+ *
+ * <p>By default, the discoverer assumes that subscriptions for many CDC jobs 
live in a single
+ * shared database table, and that each subscription set is identified by a 
{@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ * <pre>{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }</pre>
+ *
+ * <p>using a {@link PreparedStatement} (injection-safe), and only the rows 
whose subscribe-id
+ * matches the configured value are returned.
+ *
+ * <p><b>Required keys:</b> {@code table.discoverer.jdbc.url}, {@code
+ * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, 
{@code
+ * table.discoverer.jdbc.table-name}, {@code 
table.discoverer.jdbc.subscribe-id}.
+ *
+ * <p><b>Optional keys:</b> {@code table.discoverer.jdbc.column-name} 
(defaults to {@code
+ * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} 
(defaults to {@code
+ * "subscribe_id"}).
+ *
+ * <p><b>Recommended schema:</b>
+ *
+ * <pre>{@code
+ * CREATE TABLE cdc_subscriptions (
+ *     subscribe_id         VARCHAR(64)  NOT NULL,
+ *     subscribe_table_name VARCHAR(255) NOT NULL,
+ *     PRIMARY KEY (subscribe_id, subscribe_table_name)
+ * );
+ * INSERT INTO cdc_subscriptions VALUES
+ *   ('orders-subscription',    'source_db.orders'),
+ *   ('orders-subscription',    'source_db.order_items'),
+ *   ('analytics-subscription', 'analytics_db.user_events');
+ * }</pre>
+ *
+ * <h3>Advanced escape hatch — custom query (overrides the default mode)</h3>
+ *
+ * <p>For uncommon layouts (e.g., needing JOINs or extra filters), users may 
set {@code
+ * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; 
column #1 of each row is
+ * treated as a fully-qualified table name. <b>When this option is set it 
takes priority over the
+ * default mode and all of {@code table-name}, {@code column-name}, {@code 
subscribe-id-column} and
+ * {@code subscribe-id} are ignored.</b> Use this only when the default schema 
cannot model your
+ * subscriptions.
+ *
+ * <p>Null values and rows that cannot be parsed into a valid {@link TableId} 
are silently skipped.
+ */
+public class JdbcTableDiscoverer implements TableDiscoverer {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcTableDiscoverer.class);
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("table.discoverer.jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC connection URL for the table 
discovery database.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("table.discoverer.jdbc.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC username for the table 
discovery database.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("table.discoverer.jdbc.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC password for the table 
discovery database.");
+
+    public static final ConfigOption<String> SUBSCRIBE_QUERY =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom SELECT statement used to discover 
subscribed tables. When set, "
+                                    + "this takes priority over the 
shared-table options. Column #1 of "
+                                    + "every row must be a fully-qualified 
table name.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The shared subscription table that stores 
subscription entries for "
+                                    + "one or more CDC jobs. Required in 
shared-table mode.");
+
+    public static final ConfigOption<String> COLUMN_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.column-name")
+                    .stringType()
+                    .defaultValue("subscribe_table_name")
+                    .withDescription(
+                            "The column name in the subscription table that 
contains the "
+                                    + "fully-qualified table names to 
subscribe to.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID_COLUMN =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column")
+                    .stringType()
+                    .defaultValue("subscribe_id")
+                    .withDescription(
+                            "The column name in the subscription table that 
holds the "
+                                    + "subscription-set identifier used for 
filtering.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The current subscription-set identifier. Required 
in shared-table "
+                                    + "mode; rows whose subscribe-id column 
matches this value are "
+                                    + "discovered as subscribed tables.");
+
+    /** Compiled SQL to execute on every {@link #discover()} call. */
+    private transient String sql;
+
+    /** When non-null, the discoverer runs in shared-table mode and binds this 
as parameter #1. */
+    private transient String subscribeId;
+
+    private transient Connection connection;
+
+    @Override
+    public void open(Context context) throws Exception {
+        Configuration config = context.getConfiguration();
+
+        String jdbcUrl = requireNonEmpty(config, JDBC_URL);
+        String username = requireNonEmpty(config, USERNAME);
+        String password = requireNonEmpty(config, PASSWORD);
+
+        String subscribeQuery = config.get(SUBSCRIBE_QUERY);
+        if (subscribeQuery != null && !subscribeQuery.isEmpty()) {
+            // Mode A — custom query takes priority. Filter options are 
intentionally ignored.
+            this.sql = subscribeQuery;
+            this.subscribeId = null;
+            LOG.info(
+                    "JdbcTableDiscoverer running in custom-query mode. 
URL='{}', query='{}'.",
+                    jdbcUrl,
+                    subscribeQuery);
+        } else {
+            // Mode B — shared-table filter; subscribe-id is mandatory.
+            String tableName = requireNonEmpty(config, TABLE_NAME);
+            String columnName = config.get(COLUMN_NAME);
+            String subscribeIdColumn = config.get(SUBSCRIBE_ID_COLUMN);
+            this.subscribeId = requireNonEmpty(config, SUBSCRIBE_ID);
+            this.sql =
+                    "SELECT "
+                            + columnName
+                            + " FROM "
+                            + tableName
+                            + " WHERE "
+                            + subscribeIdColumn
+                            + " = ?";
+            LOG.info(
+                    "JdbcTableDiscoverer running in shared-table mode. 
URL='{}', table='{}', "
+                            + "column='{}', subscribeIdColumn='{}', 
subscribeId='{}'.",
+                    jdbcUrl,
+                    tableName,
+                    columnName,
+                    subscribeIdColumn,
+                    subscribeId);
+        }
+
+        connection = DriverManager.getConnection(jdbcUrl, username, password);
+    }
+
+    @Override
+    public Set<TableId> discover() throws Exception {
+        Set<TableId> result = new LinkedHashSet<>();
+        if (subscribeId != null) {
+            try (PreparedStatement ps = connection.prepareStatement(sql)) {
+                ps.setString(1, subscribeId);
+                try (ResultSet rs = ps.executeQuery()) {
+                    collect(rs, result);
+                }
+            }
+        } else {
+            try (Statement stmt = connection.createStatement();
+                    ResultSet rs = stmt.executeQuery(sql)) {
+                collect(rs, result);
+            }
+        }
+        LOG.info("JdbcTableDiscoverer discovered {} tables.", result.size());
+        return result;
+    }
+
+    private void collect(ResultSet rs, Set<TableId> result) throws Exception {
+        while (rs.next()) {
+            String value = rs.getString(1);
+            if (value == null || value.isEmpty()) {
+                continue;
+            }
+            try {
+                result.add(TableId.parse(value));
+            } catch (IllegalArgumentException e) {
+                LOG.warn(
+                        "Skipping invalid table name '{}' returned by 
JdbcTableDiscoverer.", value);
+            }
+        }
+    }
+
+    private static String requireNonEmpty(Configuration config, 
ConfigOption<String> option) {
+        String value = config.get(option);
+        if (value == null || value.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "'" + option.key() + "' is required for 
JdbcTableDiscoverer.");
+        }
+        return value;
+    }

Review Comment:
   `requireNonEmpty(...)` treats whitespace-only values as valid (e.g., `"   
"`), which can later produce confusing SQL/JDBC errors. Using 
`String#isBlank()` keeps validation strict while preserving the original value 
(important for passwords).
   



##########
flink-cdc-common/pom.xml:
##########
@@ -27,6 +27,28 @@ limitations under the License.
 
     <artifactId>flink-cdc-common</artifactId>
 
+    <dependencies>
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.27</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   This module adds `mysql:mysql-connector-java` as a test dependency without 
excluding `com.google.protobuf:protobuf-java`. Elsewhere in this repo the MySQL 
driver is consistently declared with a protobuf exclusion (e.g., 
`flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml:44-54`
 and `flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml:47-58`). Aligning 
this dependency helps avoid pulling in extra protobuf jars and reduces the 
chance of version conflicts.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.source.discover;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * A {@link TableDiscoverer} that reads the list of subscribed tables from a 
JDBC database.
+ *
+ * <p>This implementation connects to any JDBC-compatible database (e.g., 
MySQL, PostgreSQL) and
+ * returns fully-qualified table names parsed as {@link TableId} objects.
+ *
+ * <h3>Default mode — shared subscription table (recommended)</h3>
+ *
+ * <p>By default, the discoverer assumes that subscriptions for many CDC jobs 
live in a single
+ * shared database table, and that each subscription set is identified by a 
{@code subscribe-id}.
+ * The discoverer issues a parameterized
+ *
+ * <pre>{@code
+ * SELECT column-name FROM table-name WHERE subscribe-id-column = ?
+ * }</pre>
+ *
+ * <p>using a {@link PreparedStatement} (injection-safe), and only the rows 
whose subscribe-id
+ * matches the configured value are returned.
+ *
+ * <p><b>Required keys:</b> {@code table.discoverer.jdbc.url}, {@code
+ * table.discoverer.jdbc.username}, {@code table.discoverer.jdbc.password}, 
{@code
+ * table.discoverer.jdbc.table-name}, {@code 
table.discoverer.jdbc.subscribe-id}.
+ *
+ * <p><b>Optional keys:</b> {@code table.discoverer.jdbc.column-name} 
(defaults to {@code
+ * "subscribe_table_name"}), {@code table.discoverer.jdbc.subscribe-id-column} 
(defaults to {@code
+ * "subscribe_id"}).
+ *
+ * <p><b>Recommended schema:</b>
+ *
+ * <pre>{@code
+ * CREATE TABLE cdc_subscriptions (
+ *     subscribe_id         VARCHAR(64)  NOT NULL,
+ *     subscribe_table_name VARCHAR(255) NOT NULL,
+ *     PRIMARY KEY (subscribe_id, subscribe_table_name)
+ * );
+ * INSERT INTO cdc_subscriptions VALUES
+ *   ('orders-subscription',    'source_db.orders'),
+ *   ('orders-subscription',    'source_db.order_items'),
+ *   ('analytics-subscription', 'analytics_db.user_events');
+ * }</pre>
+ *
+ * <h3>Advanced escape hatch — custom query (overrides the default mode)</h3>
+ *
+ * <p>For uncommon layouts (e.g., needing JOINs or extra filters), users may 
set {@code
+ * table.discoverer.jdbc.subscribe-query} to any {@code SELECT} statement; 
column #1 of each row is
+ * treated as a fully-qualified table name. <b>When this option is set it 
takes priority over the
+ * default mode and all of {@code table-name}, {@code column-name}, {@code 
subscribe-id-column} and
+ * {@code subscribe-id} are ignored.</b> Use this only when the default schema 
cannot model your
+ * subscriptions.
+ *
+ * <p>Null values and rows that cannot be parsed into a valid {@link TableId} 
are silently skipped.
+ */
+public class JdbcTableDiscoverer implements TableDiscoverer {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcTableDiscoverer.class);
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("table.discoverer.jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC connection URL for the table 
discovery database.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("table.discoverer.jdbc.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC username for the table 
discovery database.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("table.discoverer.jdbc.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC password for the table 
discovery database.");
+
+    public static final ConfigOption<String> SUBSCRIBE_QUERY =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom SELECT statement used to discover 
subscribed tables. When set, "
+                                    + "this takes priority over the 
shared-table options. Column #1 of "
+                                    + "every row must be a fully-qualified 
table name.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The shared subscription table that stores 
subscription entries for "
+                                    + "one or more CDC jobs. Required in 
shared-table mode.");
+
+    public static final ConfigOption<String> COLUMN_NAME =
+            ConfigOptions.key("table.discoverer.jdbc.column-name")
+                    .stringType()
+                    .defaultValue("subscribe_table_name")
+                    .withDescription(
+                            "The column name in the subscription table that 
contains the "
+                                    + "fully-qualified table names to 
subscribe to.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID_COLUMN =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id-column")
+                    .stringType()
+                    .defaultValue("subscribe_id")
+                    .withDescription(
+                            "The column name in the subscription table that 
holds the "
+                                    + "subscription-set identifier used for 
filtering.");
+
+    public static final ConfigOption<String> SUBSCRIBE_ID =
+            ConfigOptions.key("table.discoverer.jdbc.subscribe-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The current subscription-set identifier. Required 
in shared-table "
+                                    + "mode; rows whose subscribe-id column 
matches this value are "
+                                    + "discovered as subscribed tables.");
+
+    /** Compiled SQL to execute on every {@link #discover()} call. */
+    private transient String sql;
+
+    /** When non-null, the discoverer runs in shared-table mode and binds this 
as parameter #1. */
+    private transient String subscribeId;
+
+    private transient Connection connection;
+
+    @Override
+    public void open(Context context) throws Exception {
+        Configuration config = context.getConfiguration();
+
+        String jdbcUrl = requireNonEmpty(config, JDBC_URL);
+        String username = requireNonEmpty(config, USERNAME);
+        String password = requireNonEmpty(config, PASSWORD);
+
+        String subscribeQuery = config.get(SUBSCRIBE_QUERY);
+        if (subscribeQuery != null && !subscribeQuery.isEmpty()) {
+            // Mode A — custom query takes priority. Filter options are 
intentionally ignored.
+            this.sql = subscribeQuery;
+            this.subscribeId = null;
+            LOG.info(
+                    "JdbcTableDiscoverer running in custom-query mode. 
URL='{}', query='{}'.",
+                    jdbcUrl,
+                    subscribeQuery);
+        } else {
+            // Mode B — shared-table filter; subscribe-id is mandatory.
+            String tableName = requireNonEmpty(config, TABLE_NAME);
+            String columnName = config.get(COLUMN_NAME);
+            String subscribeIdColumn = config.get(SUBSCRIBE_ID_COLUMN);
+            this.subscribeId = requireNonEmpty(config, SUBSCRIBE_ID);
+            this.sql =
+                    "SELECT "
+                            + columnName
+                            + " FROM "
+                            + tableName
+                            + " WHERE "
+                            + subscribeIdColumn
+                            + " = ?";
+            LOG.info(
+                    "JdbcTableDiscoverer running in shared-table mode. 
URL='{}', table='{}', "
+                            + "column='{}', subscribeIdColumn='{}', 
subscribeId='{}'.",
+                    jdbcUrl,
+                    tableName,
+                    columnName,
+                    subscribeIdColumn,
+                    subscribeId);
+        }
+
+        connection = DriverManager.getConnection(jdbcUrl, username, password);
+    }
+
+    @Override
+    public Set<TableId> discover() throws Exception {
+        Set<TableId> result = new LinkedHashSet<>();
+        if (subscribeId != null) {
+            try (PreparedStatement ps = connection.prepareStatement(sql)) {
+                ps.setString(1, subscribeId);
+                try (ResultSet rs = ps.executeQuery()) {
+                    collect(rs, result);
+                }
+            }
+        } else {
+            try (Statement stmt = connection.createStatement();
+                    ResultSet rs = stmt.executeQuery(sql)) {
+                collect(rs, result);
+            }
+        }
+        LOG.info("JdbcTableDiscoverer discovered {} tables.", result.size());
+        return result;
+    }
+
+    private void collect(ResultSet rs, Set<TableId> result) throws Exception {
+        while (rs.next()) {
+            String value = rs.getString(1);
+            if (value == null || value.isEmpty()) {
+                continue;
+            }
+            try {
+                result.add(TableId.parse(value));
+            } catch (IllegalArgumentException e) {
+                LOG.warn(
+                        "Skipping invalid table name '{}' returned by 
JdbcTableDiscoverer.", value);
+            }

Review Comment:
   When skipping an invalid table id, the warning log drops the exception, 
which makes it hard to debug why parsing failed (e.g., unexpected format vs. 
null byte). Logging the caught exception as the last argument preserves the 
stack trace at WARN level.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to