LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868777111


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer 
getSplitSerializer() {
     public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+            DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = 
partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : 
bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+            DataInputViewStreamWrapper view = new 
DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();

Review Comment:
   > What's the meaning of suffix ctr ? partition number or partition count ? 
So why not just use partitionNum ?
   
   `ctr` is abbr. for the counter, stands for the size of the 
`Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>` .
   (<partition, <bucket, meta-list>>). Put `ctr` as a suffix is due to this 
value is mutable, while `partitionNum` should be a kind of immutable concept I 
think. Do you feel this is better?
   ```java
   int partitionNum = view.readInt();
   int partitionCtr = partitionNum;
   while(partitionCtr > 0) {
       ...
       partitionCtr--;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to