DanielLeens commented on PR #10799:
URL: https://github.com/apache/seatunnel/pull/10799#issuecomment-4417099337

   Sorry for missing this in the last review - that's on me. I re-reviewed the 
latest head from scratch, and I want to flag one more blocker that I should 
have caught earlier. This is a carryover from the previous version, not a newly 
introduced issue.
   
   Thanks for the update here. I pulled the latest head locally, re-read the 
full diff against `upstream/dev`, and re-traced the current code path plus the 
failing Build evidence. I did not run Maven locally in this pass; this is a 
source-level review plus CI log validation.
   
   # What This PR Fixes
   - User pain: today the CDC connectors still share too much Debezium runtime 
surface, so upgrading one connector can leak into others, and old 
checkpoints/savepoints are fragile when Debezium internal types evolve.
   - Fix approach: this PR introduces a connector-specific adapter layer and 
adds serializer migration logic so old `TableId`-based state can be restored as 
the new `TableIdentifier` abstraction.
   - One-line summary: the direction is right, and the checkpoint compatibility 
layer is much better now, but the current Oracle CDC source path still carries 
a raw Debezium `TableId` into the Flink coordinator serialization boundary.
   
   # What I Rechecked On The Latest Head
   - The previous adapter-cache blocker is fixed: `DebeziumAdapterFactory` no 
longer keeps a static connector-type cache.
   - The previous production `io.debezium.relational.TableId` copy blocker is 
also fixed in the narrow sense I raised before: that class is gone from 
`src/main` and only remains as a test-only compatibility fixture.
   - The serializer migration work is now much stronger:
     - `MigrationObjectInputStream` remaps old `io.debezium.relational.TableId` 
stream descriptors to `LegacyTableId`
     - `SourceSplitSerializer` and `PendingSplitsStateSerializer` detect old 
Java object-stream bytes and route them through migration
     - `CheckpointCompatibilityTest` now covers real legacy-byte cases instead 
of only shallow round trips
   
   # Runtime Chain I Checked
   ```text
   Oracle CDC source construction
     -> OracleIncrementalSourceFactory.restoreSource() returns new 
OracleIncrementalSource(...)
     -> IncrementalSource.<init>()
         -> this.dataSourceDialect = createDataSourceDialect(readonlyConfig)
     -> OracleIncrementalSource.createDataSourceDialect()
         -> new OracleDialect((OracleSourceConfigFactory) configFactory, 
catalogTables)
     -> OracleDialect.<init>()
         -> this.tableMap = CatalogTableUtils.convertTables(catalogTables)
         -> key type is still io.debezium.relational.TableId
   
   Flink translation / submission path
     -> FlinkSource keeps the SeaTunnel source in a final field
     -> Flink job graph generation serializes the coordinator provider for the 
source node
     -> the source object graph still contains 
IncrementalSource.dataSourceDialect
     -> OracleDialect.tableMap is serialized
     -> HashMap.writeObject(...) hits raw io.debezium.relational.TableId
     -> Build fails before the job even starts running
   ```
   
   The current Build failure lines up exactly with that path. The failing 
Oracle CDC job graph generation reports that the coordinator provider for 
`Source: Oracle-CDC-Source` is not serializable, and the nested cause is 
`java.io.NotSerializableException: io.debezium.relational.TableId`.
   
   # Findings
   
   Issue 1: Oracle CDC still stores raw Debezium `TableId` inside the source 
object graph, so the normal Flink submission path still fails serialization
   - Location:
     - `connector-cdc-oracle/.../OracleIncrementalSource.java:120-121`
     - `connector-cdc-base/.../IncrementalSource.java:120-130`
     - `connector-cdc-oracle/.../OracleDialect.java:53-63`
     - `seatunnel-translation-flink-common/.../FlinkSource.java:66-72`
   - Why this is a blocker:
     this is not only a restore/savepoint corner case anymore. On the normal 
Oracle CDC path, the source constructs `OracleDialect` up front, and 
`OracleDialect` keeps a `Map<TableId, CatalogTable>` in `tableMap`. That object 
stays inside `IncrementalSource.dataSourceDialect`, and the Flink translation 
layer keeps the source itself as part of the source node object graph. So when 
Flink serializes the coordinator provider during job graph generation, the raw 
Debezium `TableId` is still present and the submission path fails.
   - Risk:
     1. Oracle CDC jobs can fail during submission on the normal path, before 
execution even begins.
     2. The PR goal is per-connector Debezium isolation, but this still leaks a 
Debezium runtime type back into the shared source/coordinator boundary.
     3. The current Build failure is already a real repro, not a theoretical 
concern.
   - Better fix:
     - Option A (preferred): stop storing Debezium-native identifiers in shared 
source/common-layer state and move those cross-boundary keys to 
`TableIdentifier` (or another connector-owned serializable abstraction), 
converting back to Debezium types only inside the connector-private adapter 
boundary.
     - Option B: as a smaller patch, at least change `OracleDialect.tableMap` 
so it no longer uses Debezium `TableId` as the serialized key type.
   - Severity: High
   
   # Compatibility / Side Effects
   - API / config / defaults: no direct public contract break here.
   - Checkpoint compatibility: clearly improved on this revision.
   - Runtime compatibility: still not safe to call this fully compatible, 
because the Oracle CDC normal submission path regresses at the coordinator 
serialization boundary.
   - Performance / resource side effects: removing the adapter cache is 
actually an improvement from a classloader-lifetime perspective; the current 
blocker is correctness, not performance.
   
   # Tests / CI
   - The serializer compatibility tests are much better on this revision.
   - The remaining test gap is that there is still no regression guard for 
Oracle source serialization at the Flink submission boundary, so this escaped 
into Build instead of being caught earlier by a focused test.
   
   ### Conclusion: can merge after fixes
   
   1. Blocking items
   - Issue 1: remove the raw Debezium `TableId` from the Oracle CDC source 
object graph that crosses the Flink coordinator serialization boundary.
   
   2. Suggested non-blocking follow-up
   - After fixing Issue 1, add a focused regression check for the Oracle/Flink 
source serialization path so this boundary does not reopen later.
   
   The overall direction is much closer now than the previous revision, and the 
two blockers I called out last time are fixed. But the current latest head 
still breaks the normal Oracle CDC submission path, so I would not merge this 
revision yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to