This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 41df78a9b [flink] Fix compatibility for ManifestCommittableSerializer
(#3404)
41df78a9b is described below
commit 41df78a9b9d32c6264610a5ecdf7c4a078ff9812
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 28 15:46:03 2024 +0800
[flink] Fix compatibility for ManifestCommittableSerializer (#3404)
---
.../main/java/org/apache/paimon/utils/IOUtils.java | 8 +
.../apache/paimon/io/DataFileMeta08Serializer.java | 113 +++++++++++++
.../manifest/ManifestCommittableSerializer.java | 9 +-
.../paimon/table/sink/CommitMessageSerializer.java | 38 +++--
...festCommittableSerializerCompatibilityTest.java | 176 +++++++++++++++++++++
.../table/sink/CommitMessageSerializerTest.java | 2 +-
.../compatibility/manifest-committable-v2 | Bin 0 -> 2853 bytes
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 3 +-
.../flink/sink/CombinedTableCompactorSink.java | 3 +-
.../apache/paimon/flink/sink/FlinkWriteSink.java | 4 +-
.../RestoreAndFailCommittableStateManager.java | 10 +-
.../sink/BatchWriteGeneratorTagOperatorTest.java | 5 +-
.../paimon/flink/sink/CommitterOperatorTest.java | 9 +-
.../paimon/flink/sink/StoreMultiCommitterTest.java | 5 +-
14 files changed, 338 insertions(+), 47 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
index 9878dec36..b2688f8e6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -93,6 +94,13 @@ public final class IOUtils {
// Stream input skipping
// ------------------------------------------------------------------------
+ /** Reads all into a bytes. */
+ public static byte[] readFully(InputStream in, boolean close) throws
IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ copyBytes(in, output, BLOCKSIZE, close);
+ return output.toByteArray();
+ }
+
/**
* Reads len bytes in a loop.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
new file mode 100644
index 000000000..15a8df420
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Serializer for {@link DataFileMeta}. */
+public class DataFileMeta08Serializer extends ObjectSerializer<DataFileMeta> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DataFileMeta08Serializer() {
+ super(schemaFor08());
+ }
+
+ private static RowType schemaFor08() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
+ fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
+ fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
+ fields.add(new DataField(5, "_KEY_STATS",
SimpleStatsConverter.schema()));
+ fields.add(new DataField(6, "_VALUE_STATS",
SimpleStatsConverter.schema()));
+ fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
+ fields.add(new DataField(10, "_LEVEL", new IntType(false)));
+ fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))));
+ fields.add(new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()));
+ fields.add(new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)));
+ fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)));
+ return new RowType(fields);
+ }
+
+ @Override
+ public InternalRow toRow(DataFileMeta meta) {
+ return GenericRow.of(
+ BinaryString.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ serializeBinaryRow(meta.minKey()),
+ serializeBinaryRow(meta.maxKey()),
+ meta.keyStats().toRow(),
+ meta.valueStats().toRow(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ toStringArrayData(meta.extraFiles()),
+ meta.creationTime(),
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex());
+ }
+
+ @Override
+ public DataFileMeta fromRow(InternalRow row) {
+ return new DataFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getLong(2),
+ deserializeBinaryRow(row.getBinary(3)),
+ deserializeBinaryRow(row.getBinary(4)),
+ SimpleStats.fromRow(row.getRow(5, 3)),
+ SimpleStats.fromRow(row.getRow(6, 3)),
+ row.getLong(7),
+ row.getLong(8),
+ row.getLong(9),
+ row.getInt(10),
+ fromStringArrayData(row.getArray(11)),
+ row.getTimestamp(12, 3),
+ row.isNullAt(13) ? null : row.getLong(13),
+ row.isNullAt(14) ? null : row.getBinary(14),
+ null);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
index 3ae51b7aa..c73c12ffa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
@@ -33,7 +33,7 @@ import java.util.Map;
/** {@link VersionedSerializer} for {@link ManifestCommittable}. */
public class ManifestCommittableSerializer implements
VersionedSerializer<ManifestCommittable> {
- private static final int CURRENT_VERSION = 2;
+ private static final int CURRENT_VERSION = 3;
private final CommitMessageSerializer commitMessageSerializer;
@@ -75,14 +75,13 @@ public class ManifestCommittableSerializer implements
VersionedSerializer<Manife
@Override
public ManifestCommittable deserialize(int version, byte[] serialized)
throws IOException {
- if (version != CURRENT_VERSION) {
+ if (version > CURRENT_VERSION) {
throw new UnsupportedOperationException(
- "Expecting ManifestCommittable version to be "
+ "Expecting ManifestCommittableSerializer version to be
smaller or equal than "
+ CURRENT_VERSION
+ ", but found "
+ version
- + ".\nManifestCommittable is not a compatible data
structure. "
- + "Please restart the job afresh (do not recover
from savepoint).");
+ + ".");
}
DataInputDeserializer view = new DataInputDeserializer(serialized);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index a7b566b32..53a1f9455 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
@@ -28,10 +30,12 @@ import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.utils.ObjectSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -40,11 +44,13 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 2;
+ private static final int CURRENT_VERSION = 3;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
+ private DataFileMeta08Serializer dataFile08Serializer;
+
public CommitMessageSerializer() {
this.dataFileSerializer = new DataFileMetaSerializer();
this.indexEntrySerializer = new IndexFileMetaSerializer();
@@ -86,34 +92,36 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
@Override
public CommitMessage deserialize(int version, byte[] serialized) throws
IOException {
- checkVersion(version);
DataInputDeserializer view = new DataInputDeserializer(serialized);
- return deserialize(view);
+ return deserialize(version, view);
}
public List<CommitMessage> deserializeList(int version, DataInputView
view) throws IOException {
- checkVersion(version);
int length = view.readInt();
List<CommitMessage> list = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
- list.add(deserialize(view));
+ list.add(deserialize(version, view));
}
return list;
}
- private void checkVersion(int version) {
- if (version != CURRENT_VERSION) {
+ private CommitMessage deserialize(int version, DataInputView view) throws
IOException {
+ ObjectSerializer<DataFileMeta> dataFileSerializer;
+ if (version == CURRENT_VERSION) {
+ dataFileSerializer = this.dataFileSerializer;
+ } else if (version <= 2) {
+ if (dataFile08Serializer == null) {
+ dataFile08Serializer = new DataFileMeta08Serializer();
+ }
+ dataFileSerializer = dataFile08Serializer;
+ } else {
throw new UnsupportedOperationException(
- "Expecting FileCommittable version to be "
+ "Expecting CommitMessageSerializer version to be smaller
or equal than "
+ CURRENT_VERSION
+ ", but found "
+ version
- + ".\nFileCommittable is not a compatible data
structure. "
- + "Please restart the job afresh (do not recover
from savepoint).");
+ + ".");
}
- }
-
- private CommitMessage deserialize(DataInputView view) throws IOException {
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
@@ -127,6 +135,8 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
dataFileSerializer.deserializeList(view)),
new IndexIncrement(
indexEntrySerializer.deserializeList(view),
- indexEntrySerializer.deserializeList(view)));
+ version <= 2
+ ? Collections.emptyList()
+ : indexEntrySerializer.deserializeList(view)));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
new file mode 100644
index 000000000..13b9d1184
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.paimon.data.BinaryArray.fromLongArray;
+import static org.apache.paimon.data.BinaryRow.singleColumn;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Compatibility Test for {@link ManifestCommittableSerializer}. */
+public class ManifestCommittableSerializerCompatibilityTest {
+
+ @Test
+ public void testProduction() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>();
+ dvRanges.put("dv_key1", Pair.of(1, 2));
+ dvRanges.put("dv_key2", Pair.of(3, 4));
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvRanges);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
+ @Test
+ public void testCompatibilityToVersion2() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ null);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>();
+ dvRanges.put("dv_key1", Pair.of(1, 2));
+ dvRanges.put("dv_key2", Pair.of(3, 4));
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvRanges);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v2"),
+ true);
+ deserialized = serializer.deserialize(2, v2Bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 47a210742..eb9105189 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -48,7 +48,7 @@ public class CommitMessageSerializerTest {
CommitMessageImpl committable =
new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement, indexIncrement);
CommitMessageImpl newCommittable =
- (CommitMessageImpl) serializer.deserialize(2,
serializer.serialize(committable));
+ (CommitMessageImpl) serializer.deserialize(3,
serializer.serialize(committable));
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v2
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2
new file mode 100644
index 000000000..6522f5aed
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v2 differ
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index d5d99d4c2..aa833ef00 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
@@ -160,6 +159,6 @@ public class FlinkCdcMultiTableSink implements Serializable
{
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
return new RestoreAndFailCommittableStateManager<>(
- () -> new VersionedSerializerWrapper<>(new
WrappedManifestCommittableSerializer()));
+ WrappedManifestCommittableSerializer::new);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index ba7c1bb44..02351a037 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
@@ -181,6 +180,6 @@ public class CombinedTableCompactorSink implements
Serializable {
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
return new RestoreAndFailCommittableStateManager<>(
- () -> new VersionedSerializerWrapper<>(new
WrappedManifestCommittableSerializer()));
+ WrappedManifestCommittableSerializer::new);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 0d6f245ba..f36dae4a8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
@@ -56,7 +55,6 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
@Override
protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
- return new RestoreAndFailCommittableStateManager<>(
- () -> new VersionedSerializerWrapper<>(new
ManifestCommittableSerializer()));
+ return new
RestoreAndFailCommittableStateManager<>(ManifestCommittableSerializer::new);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index 32a9cd7be..be795fbab 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -18,13 +18,14 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.serializer.VersionedSerializer;
+import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
@@ -48,14 +49,13 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
private static final long serialVersionUID = 1L;
/** The committable's serializer. */
- private final
SerializableSupplier<SimpleVersionedSerializer<GlobalCommitT>>
- committableSerializer;
+ private final SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer;
/** GlobalCommitT state of this job. Used to filter out previous
successful commits. */
private ListState<GlobalCommitT> streamingCommitterState;
public RestoreAndFailCommittableStateManager(
- SerializableSupplier<SimpleVersionedSerializer<GlobalCommitT>>
committableSerializer) {
+ SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer) {
this.committableSerializer = committableSerializer;
}
@@ -70,7 +70,7 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
new ListStateDescriptor<>(
"streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE)),
- committableSerializer.get());
+ new
VersionedSerializerWrapper<>(committableSerializer.get()));
List<GlobalCommitT> restored = new ArrayList<>();
streamingCommitterState.get().forEach(restored::add);
streamingCommitterState.clear();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index d6d7f434b..024a99c3e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
@@ -60,9 +59,7 @@ public class BatchWriteGeneratorTagOperatorTest extends
CommitterOperatorTest {
table,
initialCommitUser,
new RestoreAndFailCommittableStateManager<>(
- () ->
- new VersionedSerializerWrapper<>(
- new
ManifestCommittableSerializer())));
+ ManifestCommittableSerializer::new));
committerOperator.open();
TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 672ccfd39..58a7d0dce 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
@@ -580,9 +579,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
table,
null,
new RestoreAndFailCommittableStateManager<>(
- () ->
- new VersionedSerializerWrapper<>(
- new
ManifestCommittableSerializer())));
+ ManifestCommittableSerializer::new));
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
createTestHarness(operator);
testHarness.open();
@@ -678,9 +675,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
table,
null,
new RestoreAndFailCommittableStateManager<>(
- () ->
- new VersionedSerializerWrapper<>(
- new
ManifestCommittableSerializer())));
+ ManifestCommittableSerializer::new));
return createTestHarness(operator);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 94cfb3670..693e66e93 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -644,9 +643,7 @@ class StoreMultiCommitterTest {
initialCommitUser,
context -> new StoreMultiCommitter(catalogLoader,
context),
new RestoreAndFailCommittableStateManager<>(
- () ->
- new VersionedSerializerWrapper<>(
- new
WrappedManifestCommittableSerializer())));
+ WrappedManifestCommittableSerializer::new));
return createTestHarness(operator);
}