DanielLeens commented on PR #10799:
URL: https://github.com/apache/seatunnel/pull/10799#issuecomment-4417099337
Sorry for missing this in the last review - that's on me. I re-reviewed the
latest head from scratch, and I want to flag one more blocker that I should
have caught earlier. This is a carryover from the previous version, not a newly
introduced issue.
Thanks for the update here. I pulled the latest head locally, re-read the
full diff against `upstream/dev`, and re-traced the current code path plus the
failing Build evidence. I did not run Maven locally in this pass; this is a
source-level review plus CI log validation.
# What This PR Fixes
- User pain: today the CDC connectors still share too much Debezium runtime
surface, so upgrading one connector can leak into others, and old
checkpoints/savepoints are fragile when Debezium internal types evolve.
- Fix approach: this PR introduces a connector-specific adapter layer and
adds serializer migration logic so old `TableId`-based state can be restored as
the new `TableIdentifier` abstraction.
- One-line summary: the direction is right, and the checkpoint compatibility
layer is much better now, but the current Oracle CDC source path still carries
a raw Debezium `TableId` into the Flink coordinator serialization boundary.
# What I Rechecked On The Latest Head
- The previous adapter-cache blocker is fixed: `DebeziumAdapterFactory` no
longer keeps a static connector-type cache.
- The previous production `io.debezium.relational.TableId` copy blocker is
also fixed in the narrow sense I raised before: that class is gone from
`src/main` and only remains as a test-only compatibility fixture.
- The serializer migration work is now much stronger:
- `MigrationObjectInputStream` remaps old `io.debezium.relational.TableId`
stream descriptors to `LegacyTableId`
- `SourceSplitSerializer` and `PendingSplitsStateSerializer` detect old
Java object-stream bytes and route them through migration
- `CheckpointCompatibilityTest` now covers real legacy-byte cases instead
of only shallow round trips
# Runtime Chain I Checked
```text
Oracle CDC source construction
-> OracleIncrementalSourceFactory.restoreSource() returns new
OracleIncrementalSource(...)
-> IncrementalSource.<init>()
-> this.dataSourceDialect = createDataSourceDialect(readonlyConfig)
-> OracleIncrementalSource.createDataSourceDialect()
-> new OracleDialect((OracleSourceConfigFactory) configFactory,
catalogTables)
-> OracleDialect.<init>()
-> this.tableMap = CatalogTableUtils.convertTables(catalogTables)
-> key type is still io.debezium.relational.TableId
Flink translation / submission path
-> FlinkSource keeps the SeaTunnel source in a final field
-> Flink job graph generation serializes the coordinator provider for the
source node
-> the source object graph still contains
IncrementalSource.dataSourceDialect
-> OracleDialect.tableMap is serialized
-> HashMap.writeObject(...) hits raw io.debezium.relational.TableId
-> Build fails before the job even starts running
```
The current Build failure lines up exactly with that path. The failing
Oracle CDC job graph generation reports that the coordinator provider for
`Source: Oracle-CDC-Source` is not serializable, and the nested cause is
`java.io.NotSerializableException: io.debezium.relational.TableId`.
# Findings
Issue 1: Oracle CDC still stores raw Debezium `TableId` inside the source
object graph, so the normal Flink submission path still fails serialization
- Location:
- `connector-cdc-oracle/.../OracleIncrementalSource.java:120-121`
- `connector-cdc-base/.../IncrementalSource.java:120-130`
- `connector-cdc-oracle/.../OracleDialect.java:53-63`
- `seatunnel-translation-flink-common/.../FlinkSource.java:66-72`
- Why this is a blocker:
this is not only a restore/savepoint corner case anymore. On the normal
Oracle CDC path, the source constructs `OracleDialect` up front, and
`OracleDialect` keeps a `Map<TableId, CatalogTable>` in `tableMap`. That object
stays inside `IncrementalSource.dataSourceDialect`, and the Flink translation
layer keeps the source itself as part of the source node object graph. So when
Flink serializes the coordinator provider during job graph generation, the raw
Debezium `TableId` is still present and the submission path fails.
- Risk:
1. Oracle CDC jobs can fail during submission on the normal path, before
execution even begins.
2. The PR goal is per-connector Debezium isolation, but this still leaks a
Debezium runtime type back into the shared source/coordinator boundary.
3. The current Build failure is already a real repro, not a theoretical
concern.
- Better fix:
- Option A (preferred): stop storing Debezium-native identifiers in shared
source/common-layer state and move those cross-boundary keys to
`TableIdentifier` (or another connector-owned serializable abstraction),
converting back to Debezium types only inside the connector-private adapter
boundary.
- Option B: as a smaller patch, at least change `OracleDialect.tableMap`
so it no longer uses Debezium `TableId` as the serialized key type.
- Severity: High
# Compatibility / Side Effects
- API / config / defaults: no direct public contract break here.
- Checkpoint compatibility: clearly improved on this revision.
- Runtime compatibility: still not safe to call this fully compatible,
because the Oracle CDC normal submission path regresses at the coordinator
serialization boundary.
- Performance / resource side effects: removing the adapter cache is
actually an improvement from a classloader-lifetime perspective; the current
blocker is correctness, not performance.
# Tests / CI
- The serializer compatibility tests are much better on this revision.
- The remaining test gap is that there is still no regression guard for
Oracle source serialization at the Flink submission boundary, so this escaped
into Build instead of being caught earlier by a focused test.
### Conclusion: can merge after fixes
1. Blocking items
- Issue 1: remove the raw Debezium `TableId` from the Oracle CDC source
object graph that crosses the Flink coordinator serialization boundary.
2. Suggested non-blocking follow-up
- After fixing Issue 1, add a focused regression check for the Oracle/Flink
source serialization path so this boundary does not reopen later.
The overall direction is much closer now than the previous revision, and the
two blockers I called out last time are fixed. But the current latest head
still breaks the normal Oracle CDC submission path, so I would not merge this
revision yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]