Copilot commented on code in PR #4246: URL: https://github.com/apache/flink-cdc/pull/4246#discussion_r2791208865
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/binlog_test.sql: ########## @@ -0,0 +1,78 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: binlog_test +-- ---------------------------------------------------------------------------------------------------------------- +-- This database is used for testing binlog-only newly added table capture functionality. +-- +-- The test validates that: +-- 1. Tables created dynamically during binlog reading phase are automatically captured +-- 2. Data changes in newly added tables are captured as binlog events (not snapshots) +-- 3. Table pattern matching works correctly for newly added tables +-- 4. Non-matching tables are not captured +-- +-- IMPORTANT: This SQL file defines the initial schema for reference and documentation. +-- The actual test creates tables dynamically during execution to validate binlog-only capture. +-- The initial_table is crea1ted in @BeforeEach to ensure binlog is active before CDC source starts. Review Comment: There is a typo in the comment. "crea1ted" should be "created". ```suggestion -- The initial_table is created in @BeforeEach to ensure binlog is active before CDC source starts. ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java: ########## @@ -446,4 +481,46 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { useLegacyJsonFormat, assignUnboundedChunkFirst); } + + /** + * Convert Flink CDC style table pattern to Debezium style. + * + * <p>In CDC-style table matching, table names are separated by commas and use `\.` for regex + * matching. In Debezium style, table names are separated by pipes and use `.` for regex + * matching while `\.` is used as database.table separator. + * + * <p>Examples: + * + * <ul> + * <li>{@code "db1.table_\.*,db2.user_\.*"} -> {@code "db1\.table_.*|db2\.user_.*"} + * <li>{@code "test_db.orders"} -> {@code "test_db\.orders"} + * </ul> + * + * @param tables Flink CDC style table pattern + * @return Debezium style table pattern + */ + private static String convertToDebeziumStyle(String tables) { + LOG.debug("Converting table pattern to Debezium style: {}", tables); + + // Step 1: Replace comma separator with pipe (OR semantics) + tables = + Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|")); + LOG.debug("After replacing comma with pipe separator: {}", tables); + + // Step 2: Replace escaped dot \. with placeholder + // In Flink CDC, \. means any character in regex, in Debezium it should be . Review Comment: The comment is incorrect. In standard regex notation (including Flink CDC), the backslash escapes the dot: `\.` means a literal dot character, while `.` (unescaped) means any single character. The comment should say "In Flink CDC, \. means a literal dot in regex, in Debezium it should be . (any character)". ```suggestion // In Flink CDC, \. means a literal dot in regex, in Debezium it should be . (any character). ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java: ########## @@ -397,8 +413,26 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { if (databaseList != null) { props.setProperty("database.include.list", String.join(",", databaseList)); } + // Validate: Two modes are mutually exclusive + if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) { + throw new IllegalArgumentException( + "Cannot enable both 'scan.binlog.newly-added-table.enabled' and " + + "'scan.newly-added-table.enabled' as they may cause duplicate data"); + } Review Comment: The PR description mentions that validation logic should ensure binlog-only mode works only with stream-only startup modes (latest, earliest, specific offset, timestamp). However, this validation is missing. When scanBinlogNewlyAddedTableEnabled is true but the startup mode is not stream-only (e.g., initial or snapshot), the behavior may be incorrect. Add validation to check if startupOptions.isStreamOnly() when scanBinlogNewlyAddedTableEnabled is enabled. ```suggestion } // Validate: binlog-only newly-added-table scan requires a stream-only startup mode if (scanBinlogNewlyAddedTableEnabled && !startupOptions.isStreamOnly()) { throw new IllegalArgumentException( "'scan.binlog.newly-added-table.enabled' can only be enabled when using a " + "stream-only startup mode (latest-offset, earliest-offset, specific-offset, or timestamp)."); } ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/BinlogOnlyNewlyAddedTableITCase.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.table.MetadataConverter; +import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowUtils; + +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * IT tests for binlog-only newly added table capture functionality using {@link + * MySqlSource.scanBinlogNewlyAddedTableEnabled}. + * + * <p>This test validates that tables matching the configured pattern are automatically captured + * when they are created during binlog reading phase, without triggering snapshot phase. + */ +@Timeout(value = 300, unit = TimeUnit.SECONDS) +class BinlogOnlyNewlyAddedTableITCase extends MySqlSourceTestBase { + + private final UniqueDatabase testDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "binlog_test", "mysqluser", "mysqlpw"); + + @BeforeEach + public void before() throws SQLException { + testDatabase.createAndInitialize(); + + try (MySqlConnection connection = getConnection()) { + connection.setAutoCommit(false); + // Create an initial table to ensure binlog is active + String tableId = testDatabase.getDatabaseName() + ".initial_table"; + connection.execute( + format( + "CREATE TABLE %s (id BIGINT PRIMARY KEY, value VARCHAR(100));", + tableId)); + connection.execute(format("INSERT INTO %s VALUES (1, 'initial');", tableId)); + connection.commit(); + } + } + + @AfterEach + public void after() { + testDatabase.dropDatabase(); + } + + @Test + void testBinlogOnlyCaptureSingleNewTable() throws Exception { + testBinlogOnlyCapture("products_2024"); + } + + @Test + void testBinlogOnlyCaptureMultipleNewTables() throws Exception { + testBinlogOnlyCapture("orders_2024", "orders_2025"); + } + + @Test + void testBinlogOnlyCaptureWithPatternMatching() throws Exception { + // Test with wildcard pattern: capture tables like user_* + testBinlogOnlyCaptureWithPattern( + testDatabase.getDatabaseName() + ".user_.*", + "user_profiles", + "user_settings", + "user_logs"); + } + + @Test + void testBinlogOnlyCaptureWithDatabasePattern() throws Exception { + // Test with database.* pattern + testBinlogOnlyCaptureWithPattern( + testDatabase.getDatabaseName() + ".*", "product_inventory", "product_catalog"); Review Comment: There is an inconsistency between the test patterns and the documented pattern format. The javadoc in MySqlSourceBuilder (lines 231-235) shows patterns like "db\\.*" and "db\\.user_\\.*" (which become "db\.*" and "db\.user_\.*" as string values), but these tests use unescaped patterns like ".user_.*" and ".*". The pattern conversion function in MySqlSourceConfigFactory.convertToDebeziumStyle() expects escaped patterns (with backslash-dot), so these test patterns may not work correctly. Either the tests should use the documented format (e.g., "\\.user_\\.*"), or the conversion function needs to be updated to handle both escaped and unescaped patterns. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java: ########## @@ -222,6 +222,16 @@ else if (!isRemainingTablesCheckpointed } private void captureNewlyAddedTables() { + // Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it + if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) { + LOG.info( + "Binlog-only newly added table capture is enabled. " + + "New tables matching the pattern will be automatically captured " + + "in binlog phase without snapshot."); + // No action needed here, BinlogSplitReader will handle the auto-capture + return; + } + // Don't scan newly added table in snapshot mode. if (sourceConfig.isScanNewlyAddedTableEnabled() Review Comment: The PR description states "Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern", and the comment here claims "BinlogSplitReader will handle the auto-capture", but there are no changes to BinlogSplitReader.java in this PR. The existing logic in BinlogSplitReader.hasEnterPureBinlogPhase() (line 318-323) only checks isScanNewlyAddedTableEnabled(), not isScanBinlogNewlyAddedTableEnabled(). This means the feature will not actually work as described. BinlogSplitReader needs to be enhanced to check isScanBinlogNewlyAddedTableEnabled() and handle newly added tables accordingly. ```suggestion // Enable newly-added table discovery when either snapshot-based or binlog-only // newly-added table capture is configured. final boolean scanNewlyAddedTablesEnabled = sourceConfig.isScanNewlyAddedTableEnabled() || sourceConfig.isScanBinlogNewlyAddedTableEnabled(); if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) { LOG.info( "Binlog-only newly added table capture is enabled. " + "New tables matching the pattern will be automatically captured."); } // Don't scan newly added table in snapshot-only startup mode. if (scanNewlyAddedTablesEnabled ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
