This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7e1fa38 [FLINK-27157] Add version field to manifest entries
7e1fa38 is described below
commit 7e1fa387474a4a2983d5c46422f53636e64aa478
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 11 10:55:12 2022 +0800
[FLINK-27157] Add version field to manifest entries
This closes #85
---
.../file/manifest/ManifestEntrySerializer.java | 16 ++++--
.../table/store/file/manifest/ManifestFile.java | 5 +-
.../file/manifest/ManifestFileMetaSerializer.java | 16 ++++--
.../table/store/file/manifest/ManifestList.java | 4 +-
.../file/utils/VersionedObjectSerializer.java | 66 ++++++++++++++++++++++
5 files changed, 97 insertions(+), 10 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
index 51736f6..4cce368 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
@@ -23,11 +23,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
-import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
import org.apache.flink.table.types.logical.RowType;
/** Serializer for {@link ManifestEntry}. */
-public class ManifestEntrySerializer extends ObjectSerializer<ManifestEntry> {
+public class ManifestEntrySerializer extends
VersionedObjectSerializer<ManifestEntry> {
private static final long serialVersionUID = 1L;
@@ -41,7 +41,12 @@ public class ManifestEntrySerializer extends
ObjectSerializer<ManifestEntry> {
}
@Override
- public RowData toRow(ManifestEntry entry) {
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public RowData convertTo(ManifestEntry entry) {
GenericRowData row = new GenericRowData(5);
row.setField(0, entry.kind().toByteValue());
row.setField(1, entry.partition());
@@ -52,7 +57,10 @@ public class ManifestEntrySerializer extends
ObjectSerializer<ManifestEntry> {
}
@Override
- public ManifestEntry fromRow(RowData row) {
+ public ManifestEntry convertFrom(int version, RowData row) {
+ if (version != 1) {
+ throw new IllegalArgumentException("Unsupported version: " +
version);
+ }
return new ManifestEntry(
ValueKind.fromByteValue(row.getByte(0)),
partitionSerializer
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 0fa2132..55d60ea 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -30,6 +30,7 @@ import
org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RollingFile;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
@@ -193,7 +194,9 @@ public class ManifestFile {
}
public ManifestFile create() {
- RowType entryType = ManifestEntry.schema(partitionType, keyType,
valueType);
+ RowType entryType =
+ VersionedObjectSerializer.versionType(
+ ManifestEntry.schema(partitionType, keyType,
valueType));
return new ManifestFile(
partitionType,
new ManifestEntrySerializer(partitionType, keyType,
valueType),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
index f0d2dd0..9ad3e6a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
@@ -22,11 +22,11 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
import org.apache.flink.table.types.logical.RowType;
/** Serializer for {@link ManifestFileMeta}. */
-public class ManifestFileMetaSerializer extends
ObjectSerializer<ManifestFileMeta> {
+public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<ManifestFileMeta> {
private static final long serialVersionUID = 1L;
@@ -38,7 +38,12 @@ public class ManifestFileMetaSerializer extends
ObjectSerializer<ManifestFileMet
}
@Override
- public RowData toRow(ManifestFileMeta meta) {
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public RowData convertTo(ManifestFileMeta meta) {
GenericRowData row = new GenericRowData(5);
row.setField(0, StringData.fromString(meta.fileName()));
row.setField(1, meta.fileSize());
@@ -49,7 +54,10 @@ public class ManifestFileMetaSerializer extends
ObjectSerializer<ManifestFileMet
}
@Override
- public ManifestFileMeta fromRow(RowData row) {
+ public ManifestFileMeta convertFrom(int version, RowData row) {
+ if (version != 1) {
+ throw new IllegalArgumentException("Unsupported version: " +
version);
+ }
return new ManifestFileMeta(
row.getString(0).toString(),
row.getLong(1),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 82b64c0..1d0e78a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
@@ -114,7 +115,8 @@ public class ManifestList {
}
public ManifestList create() {
- RowType metaType = ManifestFileMeta.schema(partitionType);
+ RowType metaType =
+
VersionedObjectSerializer.versionType(ManifestFileMeta.schema(partitionType));
return new ManifestList(
new ManifestFileMetaSerializer(partitionType),
fileFormat.createReaderFactory(metaType),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/VersionedObjectSerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/VersionedObjectSerializer.java
new file mode 100644
index 0000000..a07c474
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/VersionedObjectSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link ObjectSerializer} for versioned serialization. */
+public abstract class VersionedObjectSerializer<T> extends ObjectSerializer<T>
{
+
+ private static final long serialVersionUID = 1L;
+
+ public VersionedObjectSerializer(RowType rowType) {
+ super(versionType(rowType));
+ }
+
+ public static RowType versionType(RowType rowType) {
+ List<RowType.RowField> fields = new ArrayList<>();
+ fields.add(new RowType.RowField("_VERSION", new IntType(false)));
+ fields.addAll(rowType.getFields());
+ return new RowType(fields);
+ }
+
+ /**
+ * Gets the version with which this serializer serializes.
+ *
+ * @return The version of the serialization schema.
+ */
+ public abstract int getVersion();
+
+ public abstract RowData convertTo(T record);
+
+ public abstract T convertFrom(int version, RowData row);
+
+ @Override
+ public final RowData toRow(T record) {
+ return new JoinedRowData().replace(GenericRowData.of(getVersion()),
convertTo(record));
+ }
+
+ @Override
+ public final T fromRow(RowData row) {
+ return convertFrom(row.getInt(0), new OffsetRowData(row.getArity() -
1, 1).replace(row));
+ }
+}