vsantonastaso commented on PR #10799:
URL: https://github.com/apache/seatunnel/pull/10799#issuecomment-4461544303
> Hi @vsantonastaso — first off, **thank you so much for tackling this**!
Per-connector Debezium version management is a real pain point in SeaTunnel
CDC, and the direction this PR is heading is exactly what the project needs
long-term. The amount of careful work here — the SPI design, the
`MigrationObjectInputStream`/`LegacyTableId` migration path, the dedicated
`CheckpointCompatibilityTest` suite, and the consistency across all four
connectors — is really appreciated.
>
> I'd love to help land this. After reading through the diff end-to-end, I
have a few observations I'd like to surface gently, along with a concrete
proposal for how to break the change up so we can land the value safely.
>
> ### Quick observations on the current diff
> A few items I think we'll want to address before this lands as-is:
>
> 1. **`IncrementalSplitSerializer` drops two fields on write.**
`historyTableChanges` and `checkpointTables` aren't written in `serialize()`
and aren't restored in `deserializeV2()` either (the 5-arg constructor
reinitializes them to empty collections). Since `historyTableChanges` carries
the DDL/schema history needed to decode binlog/WAL events, an incremental-phase
checkpoint→restart would lose the schema history. Happy to send a follow-up
commit adding the missing fields plus a round-trip test that asserts a
non-empty `historyTableChanges` survives.
> 2. **The compatibility tests don't fully exercise the real "old SeaTunnel
writes bytes → new SeaTunnel reads bytes" path.**
`serializeWithTableIdReplacement(...)` writes using the _current_ class shapes
and only swaps `TableIdentifierImpl` for a fake `TableId`. But e.g.
`CompletedSnapshotSplitInfo` now declares an explicit `serialVersionUID =
-8083313244534202065L`, while the version on `dev` doesn't declare one (the JVM
auto-computes it from the old `TableId tableId` field layout). Unless that
magic value was derived via `serialver` against the dev-branch class, a real
upgrade may hit `InvalidClassException` _before_ `readObject()` ever runs. The
most robust verification I can think of is to **commit a small base64 resource
fixture captured from an actual `dev` run** and have the test deserialize that.
I'd be glad to help capture those fixtures.
> 3. **The patched `io.debezium.relational.TableId` moved from
`src/main/java` to `src/test/java`.** That class made Debezium's `TableId`
`Serializable` so prior SeaTunnel versions could put it into checkpoints. With
it out of the runtime classpath, any legacy savepoint reference that doesn't
flow through `MigrationObjectInputStream` won't deserialize. Either keeping it
in `src/main/java` for one more release cycle, or documenting it in
`docs/en/introduction/concepts/incompatible-changes.md` with explicit migration
guidance, would make the upgrade much safer.
> 4. **`JdbcSourceFetchTaskContext`'s public abstract signatures changed**
(`getDispatcher`, `getOffsetContext`, `getPartition`, `getErrorHandler`,
`getDatabaseSchema`). External connectors extending this class will need a
recompile — worth a note in `incompatible-changes.md`, and worth ticking the
corresponding boxes in the PR description checklist so reviewers can match
expectations.
>
> ### Stepping back: the bigger architectural question
> I want to surface the design question gently, because I think it's the
most important thing to align on before polishing details.
>
> **Imagine the scenario we're really aiming for** — say MySQL 5.x/8.x
staying on Debezium 1.9.x while MySQL 9.x needs Debezium 3.x. For that to
actually work in one JVM, two `io.debezium.relational.TableId` classes need to
coexist. The only ways to make that happen are:
>
> * **ClassLoader isolation per connector plugin**, with `io.debezium.*`
strictly child-first; or
> * **Shading/relocating** one of the Debezium versions to a different
package.
>
> The current PR adds an SPI layer, which is a great first step, but
`connector-cdc-base` still:
>
> * imports `io.debezium.relational.TableId` directly in
`TableIdentifierImpl`, `JdbcDataSourceDialect.discoverDataCollections(...)`,
`JdbcSourceFetchTaskContext.buildTableSchemaHistory(...)`, and the split
assigners;
> * ships patched `io.debezium.*` classes inside `src/main/java`
(`HistorizedRelationalDatabaseConnectorConfig`, `ChangeEventQueue`,
`HeartbeatFactory`, `DefaultHeartbeatConnectionProvider`).
>
> As long as `connector-cdc-base` is loaded by the parent classloader and
carries any `io.debezium.*` class, _every_ connector plugin will be forced to
use that parent-loaded version, which defeats the per-connector independence
we're going for.
>
> ### A proposal for landing this incrementally
> Could we split the work along these lines? It would let us merge value
quickly while keeping risk small:
>
> **PR A — pure-API base (no behavior change).** Introduce
`DebeziumAdapter`, `DebeziumRuntime` (a black-box runtime exposing
`runSnapshot` / `runStream` / `currentOffset(): byte[]`), `TableIdentifier`,
`ChangeEvent`, `Row` — all as plain POJOs with **zero `io.debezium.*`
imports**. Don't change existing connectors yet.
>
> **PR B — internal refactor of `connector-cdc-mysql`.** Make MySQL CDC
implement `DebeziumAdapter`, but keep `SnapshotSplit` / `IncrementalSplit` /
`CompletedSnapshotSplitInfo` / `SnapshotPhaseState` field layouts unchanged so
checkpoint bytes stay binary-compatible. The `TableIdentifier`-everywhere
migration is still valuable internally; we just don't want to touch the
serialized shape yet.
>
> **PR C — strip `io.debezium.*` out of `connector-cdc-base`.** Move patched
Debezium classes into a `connector-cdc-debezium-patch-v1` sibling module, and
teach Zeta's `SeaTunnelChildFirstClassLoader` to treat `io.debezium.**` as
strictly child-first.
>
> **PR D — `connector-cdc-mysql-v3` greenfield.** Add a new plugin (separate
`plugin-mapping.properties` entry, separate fat-jar) using Debezium 3.x.
Existing `MySQL-CDC` keeps pointing at v1. Users on MySQL 9 opt into
`MySQL-CDC-V3`. No migration needed for existing users.
>
> **PR E — checkpoint-migration tool.** An offline
`bin/cdc-checkpoint-migrate.sh` to handle anyone coming from pre-PR-A
checkpoint formats, so we never do risky in-place deserialization on the hot
path.
>
> The nice property of this split is that **PR A and B are essentially
zero-risk** (no wire / checkpoint changes), and once they land, adding any
future Debezium major version (Postgres 2.x, Oracle LogMiner-X, etc.) becomes a
copy of the PR D recipe.
>
> ### Other small things (non-blocking)
> * The "V1 format - no version marker" branch in
`PendingSplitsStateSerializer.deserialize` looks unreachable in practice — the
only legacy format on `dev` is the raw `DefaultSerializer` stream starting with
`0xAC 0xED`. Probably safe to drop to avoid future confusion.
> * The four `*SchemaHistoryAdapter` + `*EmbeddedDatabaseHistory` classes
are near-identical copies; once we have classloader isolation, they could
likely collapse into one parameterized class per Debezium major version.
> * `IncrementalSourceStreamFetcher.shouldEmit(...)` calls
`SourceRecordUtils.getTableId(record)` per record, which now allocates a
`TableIdentifierImpl` per row — worth a small object cache on this hot path.
> * `TableIdentifierImpl.readObject` uses reflection to write to a non-final
field; a direct `this.delegate = new TableId(...)` would be simpler.
>
> ### Wrapping up
> I'm absolutely not saying "this PR shouldn't land" — the _intent_ is
exactly right, and a lot of the SPI scaffolding is reusable. I just want to
make sure we don't accidentally break existing users' checkpoints or paint
ourselves into a corner before the classloader isolation story is solid. Happy
to pair on PR A/B if it helps, and would love to hear your thoughts on the
breakdown above.
>
> Thanks again for the energy you've put into this — looking forward to
iterating together!
@davidzollo thanks so much for the review
Let me go through your points one by one.
- IncrementalSplitSerializer dropping fields: You were absolutely right,
this was a bug. I've fixed it: serialize() now writes both checkpointTables and
historyTableChanges, and deserializeV2() reads them back and uses the
constructor
- The serialVersionUID on CompletedSnapshotSplitInfo : also a valid concern.
I ran serialver against a stub matching the dev branch's class layout (with the
original TableId tableId field) and got the correct value:
-8494112929780456092L. That's what's in the code now, so the
MigrationObjectInputStream path can actually reach readObject() without hitting
InvalidClassException first.
- The patched TableId moving out of src/main: I went with documenting this
in incompatible-changes.md (both English and Chinese) rather than keeping the
patched class in production code. The migration serializers
(MigrationObjectInputStream + LegacyTableId) handle all the restore paths that
go through
SeaTunnel's own serializer framework, which should cover normal upgrade
scenarios. I've made it clear in the docs that a clean restart is the safest
upgrade path, and that any custom tooling reading checkpoint bytes directly
would need updating.
- JdbcSourceFetchTaskContext signature changes : also documented in
incompatible-changes.md now, listing the five methods that changed and pointing
to the built-in connector implementations as migration examples.
- The golden-byte fixture tests : I'd honestly like to take you up on that
offer. Capturing real bytes from an actual dev run would give us much stronger
confidence than the synthetic V1 fixtures I have now. If you're still up for
it, that would be great.
- On the bigger architectural question and your PR A-E proposal: I think
you're spot on about the end state. ClassLoader isolation (or shading) is the
only way to truly run two Debezium versions in one JVM, and I completely agree
that connector-cdc-base still has too many io.debezium.* imports for that to
work today.
That said, I see this PR as roughly your "PR A+B" combined, the SPI layer,
the adapter wiring on the real runtime path, the TableIdentifier migration, and
the checkpoint compatibility layer across all four connectors. It's not the
full isolation story, but it's the prerequisite infrastructure that makes
everything else possible.
I'd like to pair on your "PR C" (stripping io.debezium.* out of
connector-cdc-base and wiring up child-first classloading) and "PR D" (a
greenfield connector on Debezium 3.x) as follow-ups. Those feel like the
natural next steps once this foundation lands.
I look forward to your feedback.
Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]