davidzollo commented on PR #10432:
URL: https://github.com/apache/seatunnel/pull/10432#issuecomment-3832533497

   Hello @AshharAhmadKhan,
   
   First of all, welcome to the Apache SeaTunnel community! 🎉  Thank you for 
taking the time to implement multi-table support for the Socket connector. It's 
a very useful feature for CDC and Batch scenarios.
   
   I've reviewed your changes, and while the direction is correct (implementing 
`SupportMultiTableSink`), there is a critical logical gap in the current 
implementation that needs to be addressed before merging.
   
   Since this seems to be your first contribution, I'll explain the issue in 
detail to help you fix it.
   
   ## 1. Critical Issue: Single Schema Binding in Writer
   
   ### The Problem
   You have added `implements SupportMultiTableSink` to `SocketSink`, which 
tells the engine: *"I can accept data from multiple different tables in a 
single instance."*
   
   However, looking at your `SocketSinkWriter`:
   
   ```java
   // In SocketSink.java
   public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
       // You are initializing the writer using the schema of 'catalogTable' 
(which is just one specific table)
       return new SocketSinkWriter(socketConfig, 
catalogTable.getSeaTunnelRowType());
   }
   
   // In SocketSinkWriter.java
   SocketSinkWriter(SocketConfig socketConfig, SeaTunnelRowType 
seaTunnelRowType) throws IOException {
       // You create a single JsonSerializationSchema based on that ONE table's 
schema
       this.socketClient = new SocketClient(socketConfig, new 
JsonSerializationSchema(seaTunnelRowType));
   }
   ```
   
   ### Why it breaks
   Imagine a scenario with two tables:
   *   **Table A**: `[id: int, name: string]` (2 columns)
   *   **Table B**: `[id: int, age: int, address: string]` (3 columns)
   
   If the engine merges these streams into your Sink:
   1.  The Sink is initialized with Table A's schema.
   2.  The `JsonSerializationSchema` expects 2 columns.
   3.  When a row from **Table B** arrives (3 columns), the serializer will try 
to map it using Table A's definition, or fail.
       *   It might throw an `IndexOutOfBoundsException`.
       *   Or worse, it might map "address" to "name" if types match, causing 
data corruption.
   
   ### How to fix
   To truly support multi-table writes with different schemas, the Writer needs 
to be **schema-aware** or **schema-agnostic**:
   
   *   **Option 1 (Dynamic)**:  The Writer should maintain a `Map<String, 
SerializationSchema>` caching a serializer for each Table ID. However, 
currently `SocketSink` is simple.
   *   **Option 2 (Generic)**: Since `SocketSink` is often used for debugging, 
verify if we can genericize the serialization (e.g. just using `toString()` or 
a generic Object mapper that doesn't rely on strict field ordering from a 
single `SeaTunnelRowType`).
   *   **Option 3 (Context)**: If you stick to `JsonSerializationSchema`, you 
might need to handle the schema evolution or multiple table schemas passed in 
the context (if available).
   
   *Recommendation:* For a robust implementation, check how `LocalFileSink` or 
`JdbcSink` handles multiple tables. They often separate the writing contexts 
based on the row's table identifier.
   
   ## 2. Missing Tests
   
   Your PR description mentions "Integration Test Scenarios" and checkmarks 
like:
   > [x] Integration test with multiple tables passes
   
   However, **there are no test files in the PR** (only `.java` source and 
`.md` docs).
   
   *   Please add an E2E test case in the `seatunnel-e2e` module (e.g., 
`SocketSinkIT`).
   *   The test should simulate a multi-table source (you can use `FakeSource` 
with multiple table configs) writing to the SocketSink.
   *   This integration test is required to prove that the code actually works 
for the scenario you described.
   
   ## Summary
   
   This is a great start! Please refrain from merging until:
   1.  The `SocketSinkWriter` is updated to handle rows from different tables 
correctly (not just the first table's schema).
   2.  Actual test code is added to the PR to verify the fix.
   
   Looking forward to your updates!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to