This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e1fa38  [FLINK-27157] Add version field to manifest entries
7e1fa38 is described below

commit 7e1fa387474a4a2983d5c46422f53636e64aa478
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 11 10:55:12 2022 +0800

    [FLINK-27157] Add version field to manifest entries
    
    This closes #85
---
 .../file/manifest/ManifestEntrySerializer.java     | 16 ++++--
 .../table/store/file/manifest/ManifestFile.java    |  5 +-
 .../file/manifest/ManifestFileMetaSerializer.java  | 16 ++++--
 .../table/store/file/manifest/ManifestList.java    |  4 +-
 .../file/utils/VersionedObjectSerializer.java      | 66 ++++++++++++++++++++++
 5 files changed, 97 insertions(+), 10 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
index 51736f6..4cce368 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
@@ -23,11 +23,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
-import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
 /** Serializer for {@link ManifestEntry}. */
-public class ManifestEntrySerializer extends ObjectSerializer<ManifestEntry> {
+public class ManifestEntrySerializer extends 
VersionedObjectSerializer<ManifestEntry> {
 
     private static final long serialVersionUID = 1L;
 
@@ -41,7 +41,12 @@ public class ManifestEntrySerializer extends 
ObjectSerializer<ManifestEntry> {
     }
 
     @Override
-    public RowData toRow(ManifestEntry entry) {
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public RowData convertTo(ManifestEntry entry) {
         GenericRowData row = new GenericRowData(5);
         row.setField(0, entry.kind().toByteValue());
         row.setField(1, entry.partition());
@@ -52,7 +57,10 @@ public class ManifestEntrySerializer extends 
ObjectSerializer<ManifestEntry> {
     }
 
     @Override
-    public ManifestEntry fromRow(RowData row) {
+    public ManifestEntry convertFrom(int version, RowData row) {
+        if (version != 1) {
+            throw new IllegalArgumentException("Unsupported version: " + 
version);
+        }
         return new ManifestEntry(
                 ValueKind.fromByteValue(row.getByte(0)),
                 partitionSerializer
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 0fa2132..55d60ea 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RollingFile;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
@@ -193,7 +194,9 @@ public class ManifestFile {
         }
 
         public ManifestFile create() {
-            RowType entryType = ManifestEntry.schema(partitionType, keyType, 
valueType);
+            RowType entryType =
+                    VersionedObjectSerializer.versionType(
+                            ManifestEntry.schema(partitionType, keyType, 
valueType));
             return new ManifestFile(
                     partitionType,
                     new ManifestEntrySerializer(partitionType, keyType, 
valueType),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
index f0d2dd0..9ad3e6a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
@@ -22,11 +22,11 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
 /** Serializer for {@link ManifestFileMeta}. */
-public class ManifestFileMetaSerializer extends 
ObjectSerializer<ManifestFileMeta> {
+public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<ManifestFileMeta> {
 
     private static final long serialVersionUID = 1L;
 
@@ -38,7 +38,12 @@ public class ManifestFileMetaSerializer extends 
ObjectSerializer<ManifestFileMet
     }
 
     @Override
-    public RowData toRow(ManifestFileMeta meta) {
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public RowData convertTo(ManifestFileMeta meta) {
         GenericRowData row = new GenericRowData(5);
         row.setField(0, StringData.fromString(meta.fileName()));
         row.setField(1, meta.fileSize());
@@ -49,7 +54,10 @@ public class ManifestFileMetaSerializer extends 
ObjectSerializer<ManifestFileMet
     }
 
     @Override
-    public ManifestFileMeta fromRow(RowData row) {
+    public ManifestFileMeta convertFrom(int version, RowData row) {
+        if (version != 1) {
+            throw new IllegalArgumentException("Unsupported version: " + 
version);
+        }
         return new ManifestFileMeta(
                 row.getString(0).toString(),
                 row.getLong(1),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 82b64c0..1d0e78a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
@@ -114,7 +115,8 @@ public class ManifestList {
         }
 
         public ManifestList create() {
-            RowType metaType = ManifestFileMeta.schema(partitionType);
+            RowType metaType =
+                    
VersionedObjectSerializer.versionType(ManifestFileMeta.schema(partitionType));
             return new ManifestList(
                     new ManifestFileMetaSerializer(partitionType),
                     fileFormat.createReaderFactory(metaType),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/VersionedObjectSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/VersionedObjectSerializer.java
new file mode 100644
index 0000000..a07c474
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/VersionedObjectSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link ObjectSerializer} for versioned serialization. */
+public abstract class VersionedObjectSerializer<T> extends ObjectSerializer<T> 
{
+
+    private static final long serialVersionUID = 1L;
+
+    public VersionedObjectSerializer(RowType rowType) {
+        super(versionType(rowType));
+    }
+
+    public static RowType versionType(RowType rowType) {
+        List<RowType.RowField> fields = new ArrayList<>();
+        fields.add(new RowType.RowField("_VERSION", new IntType(false)));
+        fields.addAll(rowType.getFields());
+        return new RowType(fields);
+    }
+
+    /**
+     * Gets the version with which this serializer serializes.
+     *
+     * @return The version of the serialization schema.
+     */
+    public abstract int getVersion();
+
+    public abstract RowData convertTo(T record);
+
+    public abstract T convertFrom(int version, RowData row);
+
+    @Override
+    public final RowData toRow(T record) {
+        return new JoinedRowData().replace(GenericRowData.of(getVersion()), 
convertTo(record));
+    }
+
+    @Override
+    public final T fromRow(RowData row) {
+        return convertFrom(row.getInt(0), new OffsetRowData(row.getArity() - 
1, 1).replace(row));
+    }
+}

Reply via email to