DanielLeens opened a new issue, #11031:
URL: https://github.com/apache/seatunnel/issues/11031
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# [STIP-29] [Feature][Connector-V2] Support Newly Added Tables in MySQL CDC
## Background
SeaTunnel MySQL CDC already supports reading snapshot data and incremental
binlog data for the tables discovered when the source enumerator starts. It
also
has partial restore-time reconciliation: when a job is restored and the
current
configuration captures tables that were not present in the checkpointed
state,
the source can add those tables to the remaining snapshot table set.
However, users commonly use MySQL CDC for full-database synchronization with
`database-pattern` and `table-pattern`. In that mode, a table created after
the
CDC job has entered the binlog phase is currently not handled as a
first-class
table. The source may see Debezium records for the table, but the SeaTunnel
CDC
reader, schema cache, deserializer, and multi-table sink routing are
initialized
from the startup table list. This can cause newly added table records to be
ignored or rejected by the sink.
Flink CDC exposes two separate options for this problem:
- `scan.newly-added-table.enabled`: re-snapshot newly added tables when the
job
is restored from a savepoint/checkpoint.
- `scan.binlog.newly-added-table.enabled`: read DDL/DML for newly added
tables
during the binlog reading phase, without doing a historical snapshot in the
already-running job.
SeaTunnel should provide explicit controls for both cases while keeping the
existing restore behavior fully backward compatible.
## Motivation
Full-database CDC synchronization is expected to follow the shape of the
source
database over time. Without newly added table support, users have to stop and
reconfigure jobs or create additional pipelines whenever a business team
creates
a new table. This is operationally expensive and easy to get wrong.
At the same time, newly added table support is a correctness-sensitive
feature.
There are two different user expectations:
1. Add a table to an existing job, restore from a checkpoint/savepoint, and
let
the source snapshot the table history before continuing binlog
consumption.
2. Create a new empty table while the CDC job is already in binlog mode and
capture its create-table DDL plus subsequent data changes.
These two expectations should not be collapsed into one ambiguous option. The
first one requires snapshot planning and checkpoint restoration. The second
one
requires dynamic schema registration and downstream table creation, but it
does
not reconstruct historical rows that existed before the table was captured.
## Goals
- Support restore-time newly added tables for MySQL CDC with an explicit
option.
- Support binlog-phase newly created tables for MySQL CDC with an explicit
option.
- Keep restore-time newly added table scanning enabled by default to preserve
existing SeaTunnel restore behavior.
- Keep binlog-phase newly added table scanning disabled by default.
- Reuse existing checkpoint and schema evolution mechanisms where possible.
- Make newly added table schema registration explicit in the CDC reader and
MySQL schema cache.
- Register newly added table metadata before rows of that table are
deserialized.
- Document that downstream sinks must already be able to route or create the
target table for binlog-phase dynamic tables.
- Document the exact semantics and limitations in English and Chinese docs.
## Non-Goals
- Do not enable binlog-phase newly added table capture by default.
- Do not automatically capture tables outside the configured
`database-pattern` / `table-pattern` or explicitly configured table list.
- Do not implement running-job historical snapshot for an already existing
table
in the first implementation phase.
- Do not automatically handle drop-table or rename-table topology changes in
this proposal.
- Do not guarantee dynamic table creation for every sink connector in the
first
implementation. Dynamic multi-table sink writer creation should be
introduced
by a follow-up API-level STIP.
## Design
### New Source Options
Add two MySQL CDC options:
```hocon
scan.newly-added-table.enabled = true
scan.binlog.newly-added-table.enabled = false
```
Semantics:
- `scan.newly-added-table.enabled`
- Effective when a job is restored from checkpoint/savepoint.
- The source compares the current captured tables with checkpointed tables.
- New tables are planned as snapshot tables first, then enter incremental
reading.
- `scan.binlog.newly-added-table.enabled`
- Effective during binlog reading.
- The source accepts create-table/schema records and DML records for tables
that match the configured capture patterns.
- The source does not snapshot historical rows for those tables in the
already-running job.
- This mode is intended for newly created tables or tables where users
accept
binlog-only semantics from the point the table appears in binlog.
Both options are independent because their consistency semantics are
different.
If a future implementation supports running-job snapshot + binlog handoff, it
should be introduced as a separate option or a stricter mode of
`scan.newly-added-table.enabled`.
### Restore-Time Newly Added Tables
The existing `IncrementalSource.restore(...)` already detects tables that are
captured by the current config but absent from the checkpoint state. This
path
should be made an explicitly documented feature controlled by
`scan.newly-added-table.enabled`.
When the option is disabled, restore preserves the checkpointed table set and
avoids implicitly adding new tables just because a broader pattern now
matches
them.
When the option is enabled:
1. Discover current captured tables from MySQL metadata.
2. Compare them with checkpointed captured tables.
3. Add new tables to the snapshot assigner `remainingTables`.
4. Remove deleted tables only from pending snapshot planning; do not emit
drop-table semantics in this proposal.
### Binlog-Phase Newly Added Tables
When `scan.binlog.newly-added-table.enabled=true`, the binlog reader must
handle
tables that were not part of the startup `CatalogTable` list but match the
configured capture pattern.
Required changes:
1. MySQL schema cache
- `MySqlDialect` should support lazy schema lookup for a table cache miss.
- `getPrimaryKey` and `getConstraintKeys` must not assume the table
exists in
the startup `tableMap`.
- `CatalogTableUtils.mergeCatalogTableConfig(...)` should tolerate missing
user table config and use the actual database schema.
2. CDC deserialization
- `SeaTunnelRowDebeziumDeserializeSchema` should expose a method to
register
a newly discovered `CatalogTable` and build its row converter.
- Unknown table records should still be ignored when the binlog newly
added
table option is disabled.
- When enabled, unknown matching tables should be resolved through the
MySQL
schema cache, registered, and then deserialized.
3. Downstream behavior
- The first implementation registers source-side table metadata before the
first row of the newly added table is deserialized.
- Dynamic sink writer creation is not part of the first patch because
SeaTunnel does not currently expose a create-table schema event type in
the
CDC schema evolution API.
- If the selected sink cannot route or create the new target table, the
pipeline can still fail downstream. A follow-up STIP can introduce
explicit
create-table schema events and dynamic multi-table sink writer
registration.
### Future Running Snapshot Mode
Running-job historical snapshot for a newly discovered table requires a
stronger
protocol:
1. Enumerator discovers a matching table.
2. Enumerator sends a prepare event to the owner binlog reader.
3. Reader installs pending table filters and buffers matching binlog records.
4. Enumerator snapshots the new table.
5. After snapshot completion and checkpoint completion, enumerator sends an
activate event.
6. Reader discards buffered records covered by snapshot high watermarks and
emits records after the high watermark.
This protocol should be implemented in a follow-up phase because it changes
source-reader coordination and checkpoint state shape more deeply than binlog
phase capture.
## Compatibility, Deprecation, and Migration Plan
This proposal is backward compatible.
- `scan.newly-added-table.enabled` defaults to `true` to preserve the current
SeaTunnel behavior on restore.
- `scan.binlog.newly-added-table.enabled` defaults to `false`.
- Existing jobs keep their current static table behavior during normal binlog
reading unless they explicitly enable the binlog option.
- Existing exact `table-names` jobs do not start capturing unrelated tables.
- Jobs that want to disable restore-time table addition can explicitly set
`scan.newly-added-table.enabled=false`.
- Jobs that want binlog-phase capture for newly created tables must
explicitly
enable `scan.binlog.newly-added-table.enabled` and use a downstream path
that
can route or create the new target table.
There is no deprecation in this proposal.
## Test Plan
Unit tests:
- Verify MySQL option parsing defaults and explicit values.
- Verify restore-time table reconciliation is gated by
`scan.newly-added-table.enabled`.
- Verify MySQL schema cache can resolve a table missing from startup
`tableMap`.
- Verify CDC deserializer can dynamically register a newly discovered table.
- Verify unknown table records are ignored when the binlog option is
disabled.
- Verify the source registers newly added table metadata before deserializing
data rows for that table.
- Verify unknown table records are still ignored or fail downstream when the
selected downstream path cannot route them.
Integration/E2E tests:
- Restore-time test:
- Start a MySQL CDC job with table A.
- Stop with checkpoint/savepoint.
- Update config to include table B and enable
`scan.newly-added-table.enabled`.
- Restore and verify table B snapshot and binlog records are synchronized.
- Binlog-phase test:
- Start a MySQL CDC job with a table pattern.
- Wait until the job enters binlog phase.
- Create a new table matching the pattern and insert rows.
- Verify downstream receives rows inserted after the create-table binlog
position when the selected downstream path can route the table.
## Alternatives Considered
### One Option for All Newly Added Table Cases
Rejected. Restore-time snapshot and binlog-phase new-table capture have
different correctness semantics. A single option would make it unclear
whether
historical rows are guaranteed.
### Running-Job Snapshot in the First Patch
Rejected for the first implementation phase. It is the most complete
behavior,
but it requires pending table buffering, source event coordination, and new
checkpoint state. Implementing this before the simpler Flink-compatible
semantics would increase review risk and make correctness harder to validate.
### Rely Only on Debezium Dynamic Table Filtering
Rejected. Even if Debezium can read matching binlog records, SeaTunnel still
needs dynamic schema registration, create-table event propagation, and
multi-table sink routing.
## References
- SeaTunnel STIP guide:
https://seatunnel.apache.org/community/contribution_guide/STIP
- Flink CDC MySQL pipeline connector newly added table options:
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/connectors/pipeline-connectors/mysql/
- Flink CDC MySQL source scan newly added tables:
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.6/docs/connectors/flink-sources/mysql-cdc/
- Flink CDC 3.6.0 release note for FLINK-38218:
https://flink.apache.org/2026/03/30/apache-flink-cdc-3.6.0-release-announcement/
- SeaTunnel CDC connector design:
https://github.com/apache/seatunnel/issues/3175
- SeaTunnel CDC DDL sync design:
https://github.com/apache/seatunnel/issues/7930
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]