LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868777111
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer
getSplitSerializer() {
public PendingSplitsCheckpointSerializer
getEnumeratorCheckpointSerializer() {
return new PendingSplitsCheckpointSerializer(getSplitSerializer());
}
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ if (specifiedManifestEntries != null) {
+ BinaryRowDataSerializer partSerializer =
+ new
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+ DataFileMetaSerializer metaSerializer =
+ new DataFileMetaSerializer(fileStore.keyType(),
fileStore.valueType());
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ view.writeInt(specifiedManifestEntries.size());
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
partEntry :
+ specifiedManifestEntries.entrySet()) {
+ partSerializer.serialize(partEntry.getKey(), view);
+ Map<Integer, List<DataFileMeta>> bucketEntry =
partEntry.getValue();
+ view.writeInt(bucketEntry.size());
+ for (Map.Entry<Integer, List<DataFileMeta>> entry :
bucketEntry.entrySet()) {
+ view.writeInt(entry.getKey());
+ view.writeInt(entry.getValue().size());
+ for (DataFileMeta meta : entry.getValue()) {
+ metaSerializer.serialize(meta, view);
+ }
+ }
+ }
+ }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ if (in.available() > 0) {
+ BinaryRowDataSerializer partSerializer =
+ new
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+ DataFileMetaSerializer metaSerializer =
+ new DataFileMetaSerializer(fileStore.keyType(),
fileStore.valueType());
+ DataInputViewStreamWrapper view = new
DataInputViewStreamWrapper(in);
+ specifiedManifestEntries = new HashMap<>();
+ int partitionCtr = view.readInt();
Review Comment:
> What's the meaning of suffix ctr ? partition number or partition count ?
So why not just use partitionNum ?
`ctr` is abbr. for the counter, stands for the size of the
`Map<BinaryRowData, Map<Integer, List<DataFileMeta>>>` .
(<partition, <bucket, meta-list>>). Put `ctr` as a suffix is due to this
value is mutable, while `partitionNum` should be a kind of immutable concept I
think. Do you feel this is better?
```java
int partitionNum = view.readInt();
int partitionCtr = partitionNum;
while(partitionCtr > 0) {
...
partitionCtr--;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]