luoyuxia commented on code in PR #1703:
URL: https://github.com/apache/fluss/pull/1703#discussion_r2352300269


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java:
##########
@@ -51,6 +51,7 @@ public 
LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> sourceSplitSeria
     }
 
     public void serialize(DataOutputSerializer out, SourceSplitBase split) 
throws IOException {
+        out.writeInt(getVersion());

Review Comment:
   we don't need to write `LakeSplitSerializer`. It's a part of 
`SourceSplitSerializer`, whose version is already tracked by flink framework.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java:
##########
@@ -51,6 +51,7 @@ public 
LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> sourceSplitSeria
     }
 
     public void serialize(DataOutputSerializer out, SourceSplitBase split) 
throws IOException {
+        out.writeInt(getVersion());

Review Comment:
   ```suggestion
           out.writeInt(sourceSplitSerializer.getVersion());
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java:
##########
@@ -90,25 +91,22 @@ public void serialize(DataOutputSerializer out, 
SourceSplitBase split) throws IO
     }
 
     public SourceSplitBase deserialize(
-            int version,
             byte splitKind,
             TableBucket tableBucket,
             @Nullable String partition,
             DataInputDeserializer input)
             throws IOException {
+        int version = input.readInt();
+        if (version != CURRENT_VERSION) {

Review Comment:
   don't need to check this.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java:
##########
@@ -111,7 +111,7 @@ private void serializeSourceSplitBase(DataOutputSerializer 
out, SourceSplitBase
 
     @Override
     public SourceSplitBase deserialize(int version, byte[] serialized) throws 
IOException {
-        if (version != VERSION_0) {
+        if (version != CURRENT_VERSION) {

Review Comment:
   no need to change this



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -142,12 +142,12 @@ private void serializeRemainingHybridLakeFlussSplits(
             out.writeBoolean(true);
             out.writeInt(remainingHybridLakeFlussSplits.size());
             SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            out.writeInt(sourceSplitSerializer.getVersion());

Review Comment:
   don't need to write the version



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java:
##########
@@ -61,8 +61,9 @@ class LakeSplitSerializerTest {
     @Test
     void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {

Review Comment:
   Add a test for the back compatibility. 
   Create  test Serializer implementing `SimpleVersionedSerializer`.
   We should have two serializer, V1Serializer and V2Serializer.
   V2Serializer can  deserialize the data serialized via V1Serializer
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java:
##########
@@ -90,25 +91,22 @@ public void serialize(DataOutputSerializer out, 
SourceSplitBase split) throws IO
     }
 
     public SourceSplitBase deserialize(
-            int version,
             byte splitKind,
             TableBucket tableBucket,
             @Nullable String partition,
             DataInputDeserializer input)
             throws IOException {
+        int version = input.readInt();
+        if (version != CURRENT_VERSION) {

Review Comment:
   Pass the gotten version  
   
https://github.com/apache/fluss/pull/1703/files#diff-734b03ffe66dac2aacc2b028e37a5f0a7f96719ad07cf2e5c3cb1c8b60e03012R109



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -161,13 +161,12 @@ private List<SourceSplitBase> 
deserializeRemainingHybridLakeFlussSplits(
             int numSplits = in.readInt();
             List<SourceSplitBase> splits = new ArrayList<>(numSplits);
             SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            int version = in.readInt();

Review Comment:
   dito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to