DanielLeens opened a new issue, #11031:
URL: https://github.com/apache/seatunnel/issues/11031

   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
   
        http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   
   # [STIP-29] [Feature][Connector-V2] Support Newly Added Tables in MySQL CDC
   
   ## Background
   
   SeaTunnel MySQL CDC already supports reading snapshot data and incremental
   binlog data for the tables discovered when the source enumerator starts. It 
also
   has partial restore-time reconciliation: when a job is restored and the 
current
   configuration captures tables that were not present in the checkpointed 
state,
   the source can add those tables to the remaining snapshot table set.
   
   However, users commonly use MySQL CDC for full-database synchronization with
   `database-pattern` and `table-pattern`. In that mode, a table created after 
the
   CDC job has entered the binlog phase is currently not handled as a 
first-class
   table. The source may see Debezium records for the table, but the SeaTunnel 
CDC
   reader, schema cache, deserializer, and multi-table sink routing are 
initialized
   from the startup table list. This can cause newly added table records to be
   ignored or rejected by the sink.
   
   Flink CDC exposes two separate options for this problem:
   
   - `scan.newly-added-table.enabled`: re-snapshot newly added tables when the 
job
     is restored from a savepoint/checkpoint.
   - `scan.binlog.newly-added-table.enabled`: read DDL/DML for newly added 
tables
     during the binlog reading phase, without doing a historical snapshot in the
     already-running job.
   
   SeaTunnel should provide explicit controls for both cases while keeping the
   existing restore behavior fully backward compatible.
   
   ## Motivation
   
   Full-database CDC synchronization is expected to follow the shape of the 
source
   database over time. Without newly added table support, users have to stop and
   reconfigure jobs or create additional pipelines whenever a business team 
creates
   a new table. This is operationally expensive and easy to get wrong.
   
   At the same time, newly added table support is a correctness-sensitive 
feature.
   There are two different user expectations:
   
   1. Add a table to an existing job, restore from a checkpoint/savepoint, and 
let
      the source snapshot the table history before continuing binlog 
consumption.
   2. Create a new empty table while the CDC job is already in binlog mode and
      capture its create-table DDL plus subsequent data changes.
   
   These two expectations should not be collapsed into one ambiguous option. The
   first one requires snapshot planning and checkpoint restoration. The second 
one
   requires dynamic schema registration and downstream table creation, but it 
does
   not reconstruct historical rows that existed before the table was captured.
   
   ## Goals
   
   - Support restore-time newly added tables for MySQL CDC with an explicit 
option.
   - Support binlog-phase newly created tables for MySQL CDC with an explicit
     option.
   - Keep restore-time newly added table scanning enabled by default to preserve
     existing SeaTunnel restore behavior.
   - Keep binlog-phase newly added table scanning disabled by default.
   - Reuse existing checkpoint and schema evolution mechanisms where possible.
   - Make newly added table schema registration explicit in the CDC reader and
     MySQL schema cache.
   - Register newly added table metadata before rows of that table are
     deserialized.
   - Document that downstream sinks must already be able to route or create the
     target table for binlog-phase dynamic tables.
   - Document the exact semantics and limitations in English and Chinese docs.
   
   ## Non-Goals
   
   - Do not enable binlog-phase newly added table capture by default.
   - Do not automatically capture tables outside the configured
     `database-pattern` / `table-pattern` or explicitly configured table list.
   - Do not implement running-job historical snapshot for an already existing 
table
     in the first implementation phase.
   - Do not automatically handle drop-table or rename-table topology changes in
     this proposal.
   - Do not guarantee dynamic table creation for every sink connector in the 
first
     implementation. Dynamic multi-table sink writer creation should be 
introduced
     by a follow-up API-level STIP.
   
   ## Design
   
   ### New Source Options
   
   Add two MySQL CDC options:
   
   ```hocon
   scan.newly-added-table.enabled = true
   scan.binlog.newly-added-table.enabled = false
   ```
   
   Semantics:
   
   - `scan.newly-added-table.enabled`
     - Effective when a job is restored from checkpoint/savepoint.
     - The source compares the current captured tables with checkpointed tables.
     - New tables are planned as snapshot tables first, then enter incremental
       reading.
   
   - `scan.binlog.newly-added-table.enabled`
     - Effective during binlog reading.
     - The source accepts create-table/schema records and DML records for tables
       that match the configured capture patterns.
     - The source does not snapshot historical rows for those tables in the
       already-running job.
     - This mode is intended for newly created tables or tables where users 
accept
       binlog-only semantics from the point the table appears in binlog.
   
   Both options are independent because their consistency semantics are 
different.
   If a future implementation supports running-job snapshot + binlog handoff, it
   should be introduced as a separate option or a stricter mode of
   `scan.newly-added-table.enabled`.
   
   ### Restore-Time Newly Added Tables
   
   The existing `IncrementalSource.restore(...)` already detects tables that are
   captured by the current config but absent from the checkpoint state. This 
path
   should be made an explicitly documented feature controlled by
   `scan.newly-added-table.enabled`.
   
   When the option is disabled, restore preserves the checkpointed table set and
   avoids implicitly adding new tables just because a broader pattern now 
matches
   them.
   
   When the option is enabled:
   
   1. Discover current captured tables from MySQL metadata.
   2. Compare them with checkpointed captured tables.
   3. Add new tables to the snapshot assigner `remainingTables`.
   4. Remove deleted tables only from pending snapshot planning; do not emit
      drop-table semantics in this proposal.
   
   ### Binlog-Phase Newly Added Tables
   
   When `scan.binlog.newly-added-table.enabled=true`, the binlog reader must 
handle
   tables that were not part of the startup `CatalogTable` list but match the
   configured capture pattern.
   
   Required changes:
   
   1. MySQL schema cache
      - `MySqlDialect` should support lazy schema lookup for a table cache miss.
      - `getPrimaryKey` and `getConstraintKeys` must not assume the table 
exists in
        the startup `tableMap`.
      - `CatalogTableUtils.mergeCatalogTableConfig(...)` should tolerate missing
        user table config and use the actual database schema.
   
   2. CDC deserialization
      - `SeaTunnelRowDebeziumDeserializeSchema` should expose a method to 
register
        a newly discovered `CatalogTable` and build its row converter.
      - Unknown table records should still be ignored when the binlog newly 
added
        table option is disabled.
      - When enabled, unknown matching tables should be resolved through the 
MySQL
        schema cache, registered, and then deserialized.
   
   3. Downstream behavior
      - The first implementation registers source-side table metadata before the
        first row of the newly added table is deserialized.
      - Dynamic sink writer creation is not part of the first patch because
        SeaTunnel does not currently expose a create-table schema event type in 
the
        CDC schema evolution API.
      - If the selected sink cannot route or create the new target table, the
        pipeline can still fail downstream. A follow-up STIP can introduce 
explicit
        create-table schema events and dynamic multi-table sink writer 
registration.
   
   ### Future Running Snapshot Mode
   
   Running-job historical snapshot for a newly discovered table requires a 
stronger
   protocol:
   
   1. Enumerator discovers a matching table.
   2. Enumerator sends a prepare event to the owner binlog reader.
   3. Reader installs pending table filters and buffers matching binlog records.
   4. Enumerator snapshots the new table.
   5. After snapshot completion and checkpoint completion, enumerator sends an
      activate event.
   6. Reader discards buffered records covered by snapshot high watermarks and
      emits records after the high watermark.
   
   This protocol should be implemented in a follow-up phase because it changes
   source-reader coordination and checkpoint state shape more deeply than binlog
   phase capture.
   
   ## Compatibility, Deprecation, and Migration Plan
   
   This proposal is backward compatible.
   
   - `scan.newly-added-table.enabled` defaults to `true` to preserve the current
     SeaTunnel behavior on restore.
   - `scan.binlog.newly-added-table.enabled` defaults to `false`.
   - Existing jobs keep their current static table behavior during normal binlog
     reading unless they explicitly enable the binlog option.
   - Existing exact `table-names` jobs do not start capturing unrelated tables.
   - Jobs that want to disable restore-time table addition can explicitly set
     `scan.newly-added-table.enabled=false`.
   - Jobs that want binlog-phase capture for newly created tables must 
explicitly
     enable `scan.binlog.newly-added-table.enabled` and use a downstream path 
that
     can route or create the new target table.
   
   There is no deprecation in this proposal.
   
   ## Test Plan
   
   Unit tests:
   
   - Verify MySQL option parsing defaults and explicit values.
   - Verify restore-time table reconciliation is gated by
     `scan.newly-added-table.enabled`.
   - Verify MySQL schema cache can resolve a table missing from startup
     `tableMap`.
   - Verify CDC deserializer can dynamically register a newly discovered table.
   - Verify unknown table records are ignored when the binlog option is 
disabled.
   - Verify the source registers newly added table metadata before deserializing
     data rows for that table.
   - Verify unknown table records are still ignored or fail downstream when the
     selected downstream path cannot route them.
   
   Integration/E2E tests:
   
   - Restore-time test:
     - Start a MySQL CDC job with table A.
     - Stop with checkpoint/savepoint.
     - Update config to include table B and enable
       `scan.newly-added-table.enabled`.
     - Restore and verify table B snapshot and binlog records are synchronized.
   
   - Binlog-phase test:
     - Start a MySQL CDC job with a table pattern.
     - Wait until the job enters binlog phase.
     - Create a new table matching the pattern and insert rows.
     - Verify downstream receives rows inserted after the create-table binlog
       position when the selected downstream path can route the table.
   
   ## Alternatives Considered
   
   ### One Option for All Newly Added Table Cases
   
   Rejected. Restore-time snapshot and binlog-phase new-table capture have
   different correctness semantics. A single option would make it unclear 
whether
   historical rows are guaranteed.
   
   ### Running-Job Snapshot in the First Patch
   
   Rejected for the first implementation phase. It is the most complete 
behavior,
   but it requires pending table buffering, source event coordination, and new
   checkpoint state. Implementing this before the simpler Flink-compatible
   semantics would increase review risk and make correctness harder to validate.
   
   ### Rely Only on Debezium Dynamic Table Filtering
   
   Rejected. Even if Debezium can read matching binlog records, SeaTunnel still
   needs dynamic schema registration, create-table event propagation, and
   multi-table sink routing.
   
   ## References
   
   - SeaTunnel STIP guide:
     https://seatunnel.apache.org/community/contribution_guide/STIP
   - Flink CDC MySQL pipeline connector newly added table options:
     
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/connectors/pipeline-connectors/mysql/
   - Flink CDC MySQL source scan newly added tables:
     
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/connectors/flink-sources/mysql-cdc/
   - Flink CDC 3.6.0 release note for FLINK-38218:
     
https://flink.apache.org/2026/03/30/apache-flink-cdc-3.6.0-release-announcement/
   - SeaTunnel CDC connector design:
     https://github.com/apache/seatunnel/issues/3175
   - SeaTunnel CDC DDL sync design:
     https://github.com/apache/seatunnel/issues/7930
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to