loserwang1024 commented on code in PR #2321:
URL: https://github.com/apache/fluss/pull/2321#discussion_r2668184567


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -78,24 +100,29 @@ public byte[] serialize(SourceEnumeratorState state) 
throws IOException {
             out.writeInt(tableBucket.getBucket());
         }
         // write assigned partitions
-        out.writeInt(state.getAssignedPartitions().size());
-        for (Map.Entry<Long, String> entry : 
state.getAssignedPartitions().entrySet()) {
+        out.writeInt(assignedPartitions.size());
+        for (Map.Entry<Long, String> entry : assignedPartitions.entrySet()) {
             out.writeLong(entry.getKey());
             out.writeUTF(entry.getValue());
         }
+    }
 
+    @VisibleForTesting
+    protected byte[] serializeV0(SourceEnumeratorState state) throws 
IOException {
+        final DataOutputSerializer out = SERIALIZER_CACHE.get();
+        serializeAssignBucketAndPartitions(
+                out, state.getAssignedBuckets(), 
state.getAssignedPartitions());
         if (lakeSource != null) {
             serializeRemainingHybridLakeFlussSplits(out, state);
         }
-
         final byte[] result = out.getCopyOfBuffer();
         out.clear();
         return result;
     }
 
     @Override
     public SourceEnumeratorState deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version != VERSION_0) {
+        if (version != VERSION_0 && version != CURRENT_VERSION) {

Review Comment:
   > version != VERSION_0 && version != CURRENT_VERSION 
   
   When we support version 3 later, state with version 3 cannot downgrade. We 
have met this problem in many connectors.  @leonardBang , CC, should we stop 
higher version's state?
   
   And I also found a problem : we can never downgrade later to version 1. 
Because in later code will version 1 state: version != VERSION_0 is always 
true. Maybe we should add this to upgrade document later.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -37,17 +38,21 @@
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+

Review Comment:
   Would like to add what's the different with different version? such as:
   <img width="905" height="232" alt="image" 
src="https://github.com/user-attachments/assets/222891da-0190-4455-9cdb-be19aba45a58";
 />
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to