StephanEwen edited a comment on pull request #15397:
URL: https://github.com/apache/flink/pull/15397#issuecomment-809632266


   ## SplitAssignmentTracker Removal
   
   I would suggest to go a step further to leave the code clean for the future.
   
   The code `SourceCoordinatorSerdeUtils.writeAssignmentsByCheckpointId(...)` 
is dead code, and to some extend also 
`SourceCoordinatorSerdeUtils.readAssignmentsByCheckpointId(...)`. Would be good 
to remove this.
   
   I would suggest to do the following:
     - Bump the format version in `SourceCoordinatorSerdeUtils`
     - In `SourceCoordinatordeserializeCheckpointAndRestoreContext(...)`, 
remove the line `context.restoreState(splitSerializer, in);`. This is the last 
bit of the deserialization, so it doesn't matter if some trailing bytes are 
left un-interpreted.
     - To be safe, let's change the input stream away from `DataInputStream(new 
ByteArrayInputStream())` to a `new DataInputDeserializer(bytes)` (which has 
good performance) and allows us to check whether more bytes are available.
   
   The code for `SourceCoordinatordeserializeCheckpointAndRestoreContext(...)` 
should look the following:
   
   ```java
       private EnumChkT deserializeCheckpointAndRestoreContext(byte[] bytes) 
throws Exception {
           final DataInputDeserializer in = new DataInputDeserializer(bytes);
   
           final int version = readAndVerifyCoordinatorSerdeVersion(in);
   
           final int enumSerializerVersion = in.readInt();
           final int serializedEnumChkptSize = in.readInt();
           final byte[] serializedEnumChkpt = readBytes(in, 
serializedEnumChkptSize);
   
           if (version != SourceCoordinatorSerdeUtils.VERSION_0 && 
in.available() > 0) {
               // the earlier version had data for the registered readers and 
split-to-reader tracking here, which
               // we ignore in the newer versions.
               throw new IOException("Unexpected trailing bytes in Enumerator 
Checkpoint data");
           }
   
           return enumCheckpointSerializer.deserialize(enumSerializerVersion, 
serializedEnumChkpt);
       }
   ```
   (Some utility methods need to change the input parameter they accept from 
`DataInputStream` to `DataInput`.)
   
   Then we can also
     - Remove `SplitAssignmentTracker.restoreState()`
     - Rename `SplitAssignmentTracker.snapshotState(...)` to 
`.onCheckpoint(long checkpointId)`
     - Remove `SourceCoordinatorSerdeUtils.readRegisteredReaders(...)`, 
`.writeAssignmentsByCheckpointId(...)`, `.readAssignmentsByCheckpointId(...)`.
   
   A lot of overall cleanup and removal, keeping what remains easy and more 
understandable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to