luoyuxia commented on code in PR #2321:
URL: https://github.com/apache/fluss/pull/2321#discussion_r2668370342
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java:
##########
@@ -78,24 +100,29 @@ public byte[] serialize(SourceEnumeratorState state)
throws IOException {
out.writeInt(tableBucket.getBucket());
}
// write assigned partitions
- out.writeInt(state.getAssignedPartitions().size());
- for (Map.Entry<Long, String> entry :
state.getAssignedPartitions().entrySet()) {
+ out.writeInt(assignedPartitions.size());
+ for (Map.Entry<Long, String> entry : assignedPartitions.entrySet()) {
out.writeLong(entry.getKey());
out.writeUTF(entry.getValue());
}
+ }
+ @VisibleForTesting
+ protected byte[] serializeV0(SourceEnumeratorState state) throws
IOException {
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ serializeAssignBucketAndPartitions(
+ out, state.getAssignedBuckets(),
state.getAssignedPartitions());
if (lakeSource != null) {
serializeRemainingHybridLakeFlussSplits(out, state);
}
-
final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
}
@Override
public SourceEnumeratorState deserialize(int version, byte[] serialized)
throws IOException {
- if (version != VERSION_0) {
+ if (version != VERSION_0 && version != CURRENT_VERSION) {
Review Comment:
1: so, what's the best pratice in here? Always make the version check pass
and make sure version3 compitable with v2? From kafka connector code, seems the
kafka connector will still have the problem?
2: what do you mean by saying `we can never downgrade later to version 1`?
Seems it still the same problem to 1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]