This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 41df78a9b [flink] Fix compatibility for ManifestCommittableSerializer 
(#3404)
41df78a9b is described below

commit 41df78a9b9d32c6264610a5ecdf7c4a078ff9812
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 28 15:46:03 2024 +0800

    [flink] Fix compatibility for ManifestCommittableSerializer (#3404)
---
 .../main/java/org/apache/paimon/utils/IOUtils.java |   8 +
 .../apache/paimon/io/DataFileMeta08Serializer.java | 113 +++++++++++++
 .../manifest/ManifestCommittableSerializer.java    |   9 +-
 .../paimon/table/sink/CommitMessageSerializer.java |  38 +++--
 ...festCommittableSerializerCompatibilityTest.java | 176 +++++++++++++++++++++
 .../table/sink/CommitMessageSerializerTest.java    |   2 +-
 .../compatibility/manifest-committable-v2          | Bin 0 -> 2853 bytes
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |   3 +-
 .../flink/sink/CombinedTableCompactorSink.java     |   3 +-
 .../apache/paimon/flink/sink/FlinkWriteSink.java   |   4 +-
 .../RestoreAndFailCommittableStateManager.java     |  10 +-
 .../sink/BatchWriteGeneratorTagOperatorTest.java   |   5 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   |   9 +-
 .../paimon/flink/sink/StoreMultiCommitterTest.java |   5 +-
 14 files changed, 338 insertions(+), 47 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
index 9878dec36..b2688f8e6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -93,6 +94,13 @@ public final class IOUtils {
     //  Stream input skipping
     // ------------------------------------------------------------------------
 
+    /** Reads all into a bytes. */
+    public static byte[] readFully(InputStream in, boolean close) throws 
IOException {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        copyBytes(in, output, BLOCKSIZE, close);
+        return output.toByteArray();
+    }
+
     /**
      * Reads len bytes in a loop.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
new file mode 100644
index 000000000..15a8df420
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Serializer for {@link DataFileMeta}. */
+public class DataFileMeta08Serializer extends ObjectSerializer<DataFileMeta> {
+
+    private static final long serialVersionUID = 1L;
+
+    public DataFileMeta08Serializer() {
+        super(schemaFor08());
+    }
+
+    private static RowType schemaFor08() {
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
+        fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
+        fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
+        fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
+        fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
+        fields.add(new DataField(5, "_KEY_STATS", 
SimpleStatsConverter.schema()));
+        fields.add(new DataField(6, "_VALUE_STATS", 
SimpleStatsConverter.schema()));
+        fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new 
BigIntType(false)));
+        fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new 
BigIntType(false)));
+        fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
+        fields.add(new DataField(10, "_LEVEL", new IntType(false)));
+        fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, 
newStringType(false))));
+        fields.add(new DataField(12, "_CREATION_TIME", 
DataTypes.TIMESTAMP_MILLIS()));
+        fields.add(new DataField(13, "_DELETE_ROW_COUNT", new 
BigIntType(true)));
+        fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", 
newBytesType(true)));
+        return new RowType(fields);
+    }
+
+    @Override
+    public InternalRow toRow(DataFileMeta meta) {
+        return GenericRow.of(
+                BinaryString.fromString(meta.fileName()),
+                meta.fileSize(),
+                meta.rowCount(),
+                serializeBinaryRow(meta.minKey()),
+                serializeBinaryRow(meta.maxKey()),
+                meta.keyStats().toRow(),
+                meta.valueStats().toRow(),
+                meta.minSequenceNumber(),
+                meta.maxSequenceNumber(),
+                meta.schemaId(),
+                meta.level(),
+                toStringArrayData(meta.extraFiles()),
+                meta.creationTime(),
+                meta.deleteRowCount().orElse(null),
+                meta.embeddedIndex());
+    }
+
+    @Override
+    public DataFileMeta fromRow(InternalRow row) {
+        return new DataFileMeta(
+                row.getString(0).toString(),
+                row.getLong(1),
+                row.getLong(2),
+                deserializeBinaryRow(row.getBinary(3)),
+                deserializeBinaryRow(row.getBinary(4)),
+                SimpleStats.fromRow(row.getRow(5, 3)),
+                SimpleStats.fromRow(row.getRow(6, 3)),
+                row.getLong(7),
+                row.getLong(8),
+                row.getLong(9),
+                row.getInt(10),
+                fromStringArrayData(row.getArray(11)),
+                row.getTimestamp(12, 3),
+                row.isNullAt(13) ? null : row.getLong(13),
+                row.isNullAt(14) ? null : row.getBinary(14),
+                null);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
index 3ae51b7aa..c73c12ffa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
@@ -33,7 +33,7 @@ import java.util.Map;
 /** {@link VersionedSerializer} for {@link ManifestCommittable}. */
 public class ManifestCommittableSerializer implements 
VersionedSerializer<ManifestCommittable> {
 
-    private static final int CURRENT_VERSION = 2;
+    private static final int CURRENT_VERSION = 3;
 
     private final CommitMessageSerializer commitMessageSerializer;
 
@@ -75,14 +75,13 @@ public class ManifestCommittableSerializer implements 
VersionedSerializer<Manife
 
     @Override
     public ManifestCommittable deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version != CURRENT_VERSION) {
+        if (version > CURRENT_VERSION) {
             throw new UnsupportedOperationException(
-                    "Expecting ManifestCommittable version to be "
+                    "Expecting ManifestCommittableSerializer version to be 
smaller or equal than "
                             + CURRENT_VERSION
                             + ", but found "
                             + version
-                            + ".\nManifestCommittable is not a compatible data 
structure. "
-                            + "Please restart the job afresh (do not recover 
from savepoint).");
+                            + ".");
         }
 
         DataInputDeserializer view = new DataInputDeserializer(serialized);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index a7b566b32..53a1f9455 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.data.serializer.VersionedSerializer;
 import org.apache.paimon.index.IndexFileMetaSerializer;
 import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.DataInputDeserializer;
@@ -28,10 +30,12 @@ import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.utils.ObjectSerializer;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -40,11 +44,13 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 2;
+    private static final int CURRENT_VERSION = 3;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
 
+    private DataFileMeta08Serializer dataFile08Serializer;
+
     public CommitMessageSerializer() {
         this.dataFileSerializer = new DataFileMetaSerializer();
         this.indexEntrySerializer = new IndexFileMetaSerializer();
@@ -86,34 +92,36 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
 
     @Override
     public CommitMessage deserialize(int version, byte[] serialized) throws 
IOException {
-        checkVersion(version);
         DataInputDeserializer view = new DataInputDeserializer(serialized);
-        return deserialize(view);
+        return deserialize(version, view);
     }
 
     public List<CommitMessage> deserializeList(int version, DataInputView 
view) throws IOException {
-        checkVersion(version);
         int length = view.readInt();
         List<CommitMessage> list = new ArrayList<>(length);
         for (int i = 0; i < length; i++) {
-            list.add(deserialize(view));
+            list.add(deserialize(version, view));
         }
         return list;
     }
 
-    private void checkVersion(int version) {
-        if (version != CURRENT_VERSION) {
+    private CommitMessage deserialize(int version, DataInputView view) throws 
IOException {
+        ObjectSerializer<DataFileMeta> dataFileSerializer;
+        if (version == CURRENT_VERSION) {
+            dataFileSerializer = this.dataFileSerializer;
+        } else if (version <= 2) {
+            if (dataFile08Serializer == null) {
+                dataFile08Serializer = new DataFileMeta08Serializer();
+            }
+            dataFileSerializer = dataFile08Serializer;
+        } else {
             throw new UnsupportedOperationException(
-                    "Expecting FileCommittable version to be "
+                    "Expecting CommitMessageSerializer version to be smaller 
or equal than "
                             + CURRENT_VERSION
                             + ", but found "
                             + version
-                            + ".\nFileCommittable is not a compatible data 
structure. "
-                            + "Please restart the job afresh (do not recover 
from savepoint).");
+                            + ".");
         }
-    }
-
-    private CommitMessage deserialize(DataInputView view) throws IOException {
         return new CommitMessageImpl(
                 deserializeBinaryRow(view),
                 view.readInt(),
@@ -127,6 +135,8 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
                         dataFileSerializer.deserializeList(view)),
                 new IndexIncrement(
                         indexEntrySerializer.deserializeList(view),
-                        indexEntrySerializer.deserializeList(view)));
+                        version <= 2
+                                ? Collections.emptyList()
+                                : indexEntrySerializer.deserializeList(view)));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
new file mode 100644
index 000000000..13b9d1184
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.paimon.data.BinaryArray.fromLongArray;
+import static org.apache.paimon.data.BinaryRow.singleColumn;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Compatibility Test for {@link ManifestCommittableSerializer}. */
+public class ManifestCommittableSerializerCompatibilityTest {
+
+    @Test
+    public void testProduction() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>();
+        dvRanges.put("dv_key1", Pair.of(1, 2));
+        dvRanges.put("dv_key2", Pair.of(3, 4));
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvRanges);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
+    @Test
+    public void testCompatibilityToVersion2() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        null);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>();
+        dvRanges.put("dv_key1", Pair.of(1, 2));
+        dvRanges.put("dv_key2", Pair.of(3, 4));
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvRanges);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v2"),
+                        true);
+        deserialized = serializer.deserialize(2, v2Bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 47a210742..eb9105189 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -48,7 +48,7 @@ public class CommitMessageSerializerTest {
         CommitMessageImpl committable =
                 new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement, indexIncrement);
         CommitMessageImpl newCommittable =
-                (CommitMessageImpl) serializer.deserialize(2, 
serializer.serialize(committable));
+                (CommitMessageImpl) serializer.deserialize(3, 
serializer.serialize(committable));
         
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
         
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
         
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v2 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2
new file mode 100644
index 000000000..6522f5aed
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2 differ
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index d5d99d4c2..aa833ef00 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.sink.CommittableStateManager;
 import org.apache.paimon.flink.sink.Committer;
 import org.apache.paimon.flink.sink.CommitterOperator;
@@ -160,6 +159,6 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
         return new RestoreAndFailCommittableStateManager<>(
-                () -> new VersionedSerializerWrapper<>(new 
WrappedManifestCommittableSerializer()));
+                WrappedManifestCommittableSerializer::new);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index ba7c1bb44..02351a037 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.Options;
 
@@ -181,6 +180,6 @@ public class CombinedTableCompactorSink implements 
Serializable {
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
         return new RestoreAndFailCommittableStateManager<>(
-                () -> new VersionedSerializerWrapper<>(new 
WrappedManifestCommittableSerializer()));
+                WrappedManifestCommittableSerializer::new);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 0d6f245ba..f36dae4a8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.table.FileStoreTable;
@@ -56,7 +55,6 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
 
     @Override
     protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
-        return new RestoreAndFailCommittableStateManager<>(
-                () -> new VersionedSerializerWrapper<>(new 
ManifestCommittableSerializer()));
+        return new 
RestoreAndFailCommittableStateManager<>(ManifestCommittableSerializer::new);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index 32a9cd7be..be795fbab 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -18,13 +18,14 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.serializer.VersionedSerializer;
+import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.utils.SerializableSupplier;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
@@ -48,14 +49,13 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
     private static final long serialVersionUID = 1L;
 
     /** The committable's serializer. */
-    private final 
SerializableSupplier<SimpleVersionedSerializer<GlobalCommitT>>
-            committableSerializer;
+    private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer;
 
     /** GlobalCommitT state of this job. Used to filter out previous 
successful commits. */
     private ListState<GlobalCommitT> streamingCommitterState;
 
     public RestoreAndFailCommittableStateManager(
-            SerializableSupplier<SimpleVersionedSerializer<GlobalCommitT>> 
committableSerializer) {
+            SerializableSupplier<VersionedSerializer<GlobalCommitT>> 
committableSerializer) {
         this.committableSerializer = committableSerializer;
     }
 
@@ -70,7 +70,7 @@ public class 
RestoreAndFailCommittableStateManager<GlobalCommitT>
                                         new ListStateDescriptor<>(
                                                 
"streaming_committer_raw_states",
                                                 
BytePrimitiveArraySerializer.INSTANCE)),
-                        committableSerializer.get());
+                        new 
VersionedSerializerWrapper<>(committableSerializer.get()));
         List<GlobalCommitT> restored = new ArrayList<>();
         streamingCommitterState.get().forEach(restored::add);
         streamingCommitterState.clear();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index d6d7f434b..024a99c3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.table.FileStoreTable;
@@ -60,9 +59,7 @@ public class BatchWriteGeneratorTagOperatorTest extends 
CommitterOperatorTest {
                         table,
                         initialCommitUser,
                         new RestoreAndFailCommittableStateManager<>(
-                                () ->
-                                        new VersionedSerializerWrapper<>(
-                                                new 
ManifestCommittableSerializer())));
+                                ManifestCommittableSerializer::new));
         committerOperator.open();
 
         TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 672ccfd39..58a7d0dce 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.utils.TestingMetricUtils;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.CompactIncrement;
@@ -580,9 +579,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         table,
                         null,
                         new RestoreAndFailCommittableStateManager<>(
-                                () ->
-                                        new VersionedSerializerWrapper<>(
-                                                new 
ManifestCommittableSerializer())));
+                                ManifestCommittableSerializer::new));
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
                 createTestHarness(operator);
         testHarness.open();
@@ -678,9 +675,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         table,
                         null,
                         new RestoreAndFailCommittableStateManager<>(
-                                () ->
-                                        new VersionedSerializerWrapper<>(
-                                                new 
ManifestCommittableSerializer())));
+                                ManifestCommittableSerializer::new));
         return createTestHarness(operator);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 94cfb3670..693e66e93 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.utils.TestingMetricUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -644,9 +643,7 @@ class StoreMultiCommitterTest {
                         initialCommitUser,
                         context -> new StoreMultiCommitter(catalogLoader, 
context),
                         new RestoreAndFailCommittableStateManager<>(
-                                () ->
-                                        new VersionedSerializerWrapper<>(
-                                                new 
WrappedManifestCommittableSerializer())));
+                                WrappedManifestCommittableSerializer::new));
         return createTestHarness(operator);
     }
 


Reply via email to