LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767988


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer 
getSplitSerializer() {
     public PendingSplitsCheckpointSerializer 
getEnumeratorCheckpointSerializer() {
         return new PendingSplitsCheckpointSerializer(getSplitSerializer());
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (specifiedManifestEntries != null) {
+            BinaryRowDataSerializer partSerializer =
+                    new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+            DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+            view.writeInt(specifiedManifestEntries.size());
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                    specifiedManifestEntries.entrySet()) {
+                partSerializer.serialize(partEntry.getKey(), view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = 
partEntry.getValue();
+                view.writeInt(bucketEntry.size());
+                for (Map.Entry<Integer, List<DataFileMeta>> entry : 
bucketEntry.entrySet()) {
+                    view.writeInt(entry.getKey());
+                    view.writeInt(entry.getValue().size());
+                    for (DataFileMeta meta : entry.getValue()) {
+                        metaSerializer.serialize(meta, view);
+                    }
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        if (in.available() > 0) {
+            BinaryRowDataSerializer partSerializer =
+                    new 
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+            DataFileMetaSerializer metaSerializer =
+                    new DataFileMetaSerializer(fileStore.keyType(), 
fileStore.valueType());
+            DataInputViewStreamWrapper view = new 
DataInputViewStreamWrapper(in);
+            specifiedManifestEntries = new HashMap<>();
+            int partitionCtr = view.readInt();
+            while (partitionCtr > 0) {
+                BinaryRowData partition = partSerializer.deserialize(view);
+                Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+                int bucketCtr = view.readInt();
+                while (bucketCtr > 0) {
+                    int bucket = view.readInt();
+                    int entryCtr = view.readInt();
+                    if (entryCtr == 0) {
+                        bucketEntry.put(bucket, Collections.emptyList());
+                    } else {
+                        List<DataFileMeta> metas = new ArrayList<>();
+                        while (entryCtr > 0) {
+                            metas.add(metaSerializer.deserialize(view));
+                            entryCtr--;
+                        }
+                        bucketEntry.put(bucket, metas);
+                    }
+                    bucketCtr--;
+                }
+                specifiedManifestEntries.put(partition, bucketEntry);
+                partitionCtr--;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FileStoreSource)) {
+            return false;
+        }
+        FileStoreSource that = (FileStoreSource) o;
+        return valueCountMode == that.valueCountMode
+                && isContinuous == that.isContinuous
+                && discoveryInterval == that.discoveryInterval
+                && latestContinuous == that.latestContinuous
+                && fileStore.equals(that.fileStore)
+                && Arrays.equals(projectedFields, that.projectedFields)

Review Comment:
   Thanks for the reporting. This code snippet is auto-generated by Intellij. I 
should check more carefully. Blame on me, and I'll fix it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to