davidzollo commented on PR #10432:
URL: https://github.com/apache/seatunnel/pull/10432#issuecomment-3832533497
Hello @AshharAhmadKhan,
First of all, welcome to the Apache SeaTunnel community! 🎉 Thank you for
taking the time to implement multi-table support for the Socket connector. It's
a very useful feature for CDC and Batch scenarios.
I've reviewed your changes, and while the direction is correct (implementing
`SupportMultiTableSink`), there is a critical logical gap in the current
implementation that needs to be addressed before merging.
Since this seems to be your first contribution, I'll explain the issue in
detail to help you fix it.
## 1. Critical Issue: Single Schema Binding in Writer
### The Problem
You have added `implements SupportMultiTableSink` to `SocketSink`, which
tells the engine: *"I can accept data from multiple different tables in a
single instance."*
However, looking at your `SocketSinkWriter`:
```java
// In SocketSink.java
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) throws IOException {
// You are initializing the writer using the schema of 'catalogTable'
(which is just one specific table)
return new SocketSinkWriter(socketConfig,
catalogTable.getSeaTunnelRowType());
}
// In SocketSinkWriter.java
SocketSinkWriter(SocketConfig socketConfig, SeaTunnelRowType
seaTunnelRowType) throws IOException {
// You create a single JsonSerializationSchema based on that ONE table's
schema
this.socketClient = new SocketClient(socketConfig, new
JsonSerializationSchema(seaTunnelRowType));
}
```
### Why it breaks
Imagine a scenario with two tables:
* **Table A**: `[id: int, name: string]` (2 columns)
* **Table B**: `[id: int, age: int, address: string]` (3 columns)
If the engine merges these streams into your Sink:
1. The Sink is initialized with Table A's schema.
2. The `JsonSerializationSchema` expects 2 columns.
3. When a row from **Table B** arrives (3 columns), the serializer will try
to map it using Table A's definition, or fail.
* It might throw an `IndexOutOfBoundsException`.
* Or worse, it might map "address" to "name" if types match, causing
data corruption.
### How to fix
To truly support multi-table writes with different schemas, the Writer needs
to be **schema-aware** or **schema-agnostic**:
* **Option 1 (Dynamic)**: The Writer should maintain a `Map<String,
SerializationSchema>` caching a serializer for each Table ID. However,
currently `SocketSink` is simple.
* **Option 2 (Generic)**: Since `SocketSink` is often used for debugging,
verify if we can genericize the serialization (e.g. just using `toString()` or
a generic Object mapper that doesn't rely on strict field ordering from a
single `SeaTunnelRowType`).
* **Option 3 (Context)**: If you stick to `JsonSerializationSchema`, you
might need to handle the schema evolution or multiple table schemas passed in
the context (if available).
*Recommendation:* For a robust implementation, check how `LocalFileSink` or
`JdbcSink` handles multiple tables. They often separate the writing contexts
based on the row's table identifier.
## 2. Missing Tests
Your PR description mentions "Integration Test Scenarios" and checkmarks
like:
> [x] Integration test with multiple tables passes
However, **there are no test files in the PR** (only `.java` source and
`.md` docs).
* Please add an E2E test case in the `seatunnel-e2e` module (e.g.,
`SocketSinkIT`).
* The test should simulate a multi-table source (you can use `FakeSource`
with multiple table configs) writing to the SocketSink.
* This integration test is required to prove that the code actually works
for the scenario you described.
## Summary
This is a great start! Please refrain from merging until:
1. The `SocketSinkWriter` is updated to handle rows from different tables
correctly (not just the first table's schema).
2. Actual test code is added to the PR to verify the fix.
Looking forward to your updates!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]