davidzollo commented on PR #10799: URL: https://github.com/apache/seatunnel/pull/10799#issuecomment-4459024184
Hi @vsantonastaso — first off, **thank you so much for tackling this**! Per-connector Debezium version management is a real pain point in SeaTunnel CDC, and the direction this PR is heading is exactly what the project needs long-term. The amount of careful work here — the SPI design, the `MigrationObjectInputStream`/`LegacyTableId` migration path, the dedicated `CheckpointCompatibilityTest` suite, and the consistency across all four connectors — is really appreciated. I'd love to help land this. After reading through the diff end-to-end, I have a few observations I'd like to surface gently, along with a concrete proposal for how to break the change up so we can land the value safely. ### Quick observations on the current diff A few items I think we'll want to address before this lands as-is: 1. **`IncrementalSplitSerializer` drops two fields on write.** `historyTableChanges` and `checkpointTables` aren't written in `serialize()` and aren't restored in `deserializeV2()` either (the 5-arg constructor reinitializes them to empty collections). Since `historyTableChanges` carries the DDL/schema history needed to decode binlog/WAL events, an incremental-phase checkpoint→restart would lose the schema history. Happy to send a follow-up commit adding the missing fields plus a round-trip test that asserts a non-empty `historyTableChanges` survives. 2. **The compatibility tests don't fully exercise the real \"old SeaTunnel writes bytes → new SeaTunnel reads bytes\" path.** `serializeWithTableIdReplacement(...)` writes using the *current* class shapes and only swaps `TableIdentifierImpl` for a fake `TableId`. But e.g. `CompletedSnapshotSplitInfo` now declares an explicit `serialVersionUID = -8083313244534202065L`, while the version on `dev` doesn't declare one (the JVM auto-computes it from the old `TableId tableId` field layout). Unless that magic value was derived via `serialver` against the dev-branch class, a real upgrade may hit `InvalidClassException` *before* `readObject()` ever runs. The most robust verification I can think of is to **commit a small base64 resource fixture captured from an actual `dev` run** and have the test deserialize that. I'd be glad to help capture those fixtures. 3. **The patched `io.debezium.relational.TableId` moved from `src/main/java` to `src/test/java`.** That class made Debezium's `TableId` `Serializable` so prior SeaTunnel versions could put it into checkpoints. With it out of the runtime classpath, any legacy savepoint reference that doesn't flow through `MigrationObjectInputStream` won't deserialize. Either keeping it in `src/main/java` for one more release cycle, or documenting it in `docs/en/introduction/concepts/incompatible-changes.md` with explicit migration guidance, would make the upgrade much safer. 4. **`JdbcSourceFetchTaskContext`'s public abstract signatures changed** (`getDispatcher`, `getOffsetContext`, `getPartition`, `getErrorHandler`, `getDatabaseSchema`). External connectors extending this class will need a recompile — worth a note in `incompatible-changes.md`, and worth ticking the corresponding boxes in the PR description checklist so reviewers can match expectations. ### Stepping back: the bigger architectural question I want to surface the design question gently, because I think it's the most important thing to align on before polishing details. **Imagine the scenario we're really aiming for** — say MySQL 5.x/8.x staying on Debezium 1.9.x while MySQL 9.x needs Debezium 3.x. For that to actually work in one JVM, two `io.debezium.relational.TableId` classes need to coexist. The only ways to make that happen are: - **ClassLoader isolation per connector plugin**, with `io.debezium.*` strictly child-first; or - **Shading/relocating** one of the Debezium versions to a different package. The current PR adds an SPI layer, which is a great first step, but `connector-cdc-base` still: - imports `io.debezium.relational.TableId` directly in `TableIdentifierImpl`, `JdbcDataSourceDialect.discoverDataCollections(...)`, `JdbcSourceFetchTaskContext.buildTableSchemaHistory(...)`, and the split assigners; - ships patched `io.debezium.*` classes inside `src/main/java` (`HistorizedRelationalDatabaseConnectorConfig`, `ChangeEventQueue`, `HeartbeatFactory`, `DefaultHeartbeatConnectionProvider`). As long as `connector-cdc-base` is loaded by the parent classloader and carries any `io.debezium.*` class, *every* connector plugin will be forced to use that parent-loaded version, which defeats the per-connector independence we're going for. ### A proposal for landing this incrementally Could we split the work along these lines? It would let us merge value quickly while keeping risk small: **PR A — pure-API base (no behavior change).** Introduce `DebeziumAdapter`, `DebeziumRuntime` (a black-box runtime exposing `runSnapshot` / `runStream` / `currentOffset(): byte[]`), `TableIdentifier`, `ChangeEvent`, `Row` — all as plain POJOs with **zero `io.debezium.*` imports**. Don't change existing connectors yet. **PR B — internal refactor of `connector-cdc-mysql`.** Make MySQL CDC implement `DebeziumAdapter`, but keep `SnapshotSplit` / `IncrementalSplit` / `CompletedSnapshotSplitInfo` / `SnapshotPhaseState` field layouts unchanged so checkpoint bytes stay binary-compatible. The `TableIdentifier`-everywhere migration is still valuable internally; we just don't want to touch the serialized shape yet. **PR C — strip `io.debezium.*` out of `connector-cdc-base`.** Move patched Debezium classes into a `connector-cdc-debezium-patch-v1` sibling module, and teach Zeta's `SeaTunnelChildFirstClassLoader` to treat `io.debezium.**` as strictly child-first. **PR D — `connector-cdc-mysql-v3` greenfield.** Add a new plugin (separate `plugin-mapping.properties` entry, separate fat-jar) using Debezium 3.x. Existing `MySQL-CDC` keeps pointing at v1. Users on MySQL 9 opt into `MySQL-CDC-V3`. No migration needed for existing users. **PR E — checkpoint-migration tool.** An offline `bin/cdc-checkpoint-migrate.sh` to handle anyone coming from pre-PR-A checkpoint formats, so we never do risky in-place deserialization on the hot path. The nice property of this split is that **PR A and B are essentially zero-risk** (no wire / checkpoint changes), and once they land, adding any future Debezium major version (Postgres 2.x, Oracle LogMiner-X, etc.) becomes a copy of the PR D recipe. ### Other small things (non-blocking) - The \"V1 format - no version marker\" branch in `PendingSplitsStateSerializer.deserialize` looks unreachable in practice — the only legacy format on `dev` is the raw `DefaultSerializer` stream starting with `0xAC 0xED`. Probably safe to drop to avoid future confusion. - The four `*SchemaHistoryAdapter` + `*EmbeddedDatabaseHistory` classes are near-identical copies; once we have classloader isolation, they could likely collapse into one parameterized class per Debezium major version. - `IncrementalSourceStreamFetcher.shouldEmit(...)` calls `SourceRecordUtils.getTableId(record)` per record, which now allocates a `TableIdentifierImpl` per row — worth a small object cache on this hot path. - `TableIdentifierImpl.readObject` uses reflection to write to a non-final field; a direct `this.delegate = new TableId(...)` would be simpler. ### Wrapping up I'm absolutely not saying \"this PR shouldn't land\" — the *intent* is exactly right, and a lot of the SPI scaffolding is reusable. I just want to make sure we don't accidentally break existing users' checkpoints or paint ourselves into a corner before the classloader isolation story is solid. Happy to pair on PR A/B if it helps, and would love to hear your thoughts on the breakdown above. Thanks again for the energy you've put into this — looking forward to iterating together! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
