LadyForest commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r868767988
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -182,4 +219,102 @@ public FileStoreSourceSplitSerializer
getSplitSerializer() {
public PendingSplitsCheckpointSerializer
getEnumeratorCheckpointSerializer() {
return new PendingSplitsCheckpointSerializer(getSplitSerializer());
}
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ if (specifiedManifestEntries != null) {
+ BinaryRowDataSerializer partSerializer =
+ new
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+ DataFileMetaSerializer metaSerializer =
+ new DataFileMetaSerializer(fileStore.keyType(),
fileStore.valueType());
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ view.writeInt(specifiedManifestEntries.size());
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
partEntry :
+ specifiedManifestEntries.entrySet()) {
+ partSerializer.serialize(partEntry.getKey(), view);
+ Map<Integer, List<DataFileMeta>> bucketEntry =
partEntry.getValue();
+ view.writeInt(bucketEntry.size());
+ for (Map.Entry<Integer, List<DataFileMeta>> entry :
bucketEntry.entrySet()) {
+ view.writeInt(entry.getKey());
+ view.writeInt(entry.getValue().size());
+ for (DataFileMeta meta : entry.getValue()) {
+ metaSerializer.serialize(meta, view);
+ }
+ }
+ }
+ }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ if (in.available() > 0) {
+ BinaryRowDataSerializer partSerializer =
+ new
BinaryRowDataSerializer(fileStore.partitionType().getFieldCount());
+ DataFileMetaSerializer metaSerializer =
+ new DataFileMetaSerializer(fileStore.keyType(),
fileStore.valueType());
+ DataInputViewStreamWrapper view = new
DataInputViewStreamWrapper(in);
+ specifiedManifestEntries = new HashMap<>();
+ int partitionCtr = view.readInt();
+ while (partitionCtr > 0) {
+ BinaryRowData partition = partSerializer.deserialize(view);
+ Map<Integer, List<DataFileMeta>> bucketEntry = new HashMap<>();
+ int bucketCtr = view.readInt();
+ while (bucketCtr > 0) {
+ int bucket = view.readInt();
+ int entryCtr = view.readInt();
+ if (entryCtr == 0) {
+ bucketEntry.put(bucket, Collections.emptyList());
+ } else {
+ List<DataFileMeta> metas = new ArrayList<>();
+ while (entryCtr > 0) {
+ metas.add(metaSerializer.deserialize(view));
+ entryCtr--;
+ }
+ bucketEntry.put(bucket, metas);
+ }
+ bucketCtr--;
+ }
+ specifiedManifestEntries.put(partition, bucketEntry);
+ partitionCtr--;
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FileStoreSource)) {
+ return false;
+ }
+ FileStoreSource that = (FileStoreSource) o;
+ return valueCountMode == that.valueCountMode
+ && isContinuous == that.isContinuous
+ && discoveryInterval == that.discoveryInterval
+ && latestContinuous == that.latestContinuous
+ && fileStore.equals(that.fileStore)
+ && Arrays.equals(projectedFields, that.projectedFields)
Review Comment:
Thanks for the reporting. This code snippet is auto-generated by Intellij. I
should check more carefully. Blame on me, and I'll fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]