This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9ff5192a09 [core] Add _EXTERNAL_PATH in DataFileMeta
9ff5192a09 is described below
commit 9ff5192a09856aff318597e589f8e8c8f6ff325b
Author: HouliangQi <[email protected]>
AuthorDate: Thu Dec 19 13:09:18 2024 +0800
[core] Add _EXTERNAL_PATH in DataFileMeta
This closes #4751
---
.../java/org/apache/paimon/io/DataFileMeta.java | 50 +++++++++----
.../apache/paimon/io/DataFileMeta08Serializer.java | 1 +
.../apache/paimon/io/DataFileMeta09Serializer.java | 1 +
...er.java => DataFileMeta10LegacySerializer.java} | 14 ++--
.../apache/paimon/io/DataFileMetaSerializer.java | 6 +-
.../sink/CommitMessageLegacyV2Serializer.java | 1 +
.../paimon/table/sink/CommitMessageSerializer.java | 11 ++-
.../org/apache/paimon/table/source/DataSplit.java | 8 ++-
.../paimon/crosspartition/IndexBootstrapTest.java | 1 +
.../org/apache/paimon/io/DataFileTestUtils.java | 1 +
...festCommittableSerializerCompatibilityTest.java | 79 ++++++++++++++++++++-
.../paimon/manifest/ManifestFileMetaTestBase.java | 1 +
.../mergetree/compact/IntervalPartitionTest.java | 1 +
.../paimon/operation/ExpireSnapshotsTest.java | 1 +
.../org/apache/paimon/table/source/SplitTest.java | 73 ++++++++++++++++++-
.../src/test/resources/compatibility/datasplit-v4 | Bin 0 -> 934 bytes
.../compatibility/manifest-committable-v5 | Bin 0 -> 3449 bytes
17 files changed, 221 insertions(+), 28 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index bb9e45ff00..3be09ea6c2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -82,7 +82,8 @@ public class DataFileMeta {
new DataField(
16,
"_VALUE_STATS_COLS",
-
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
+
DataTypes.ARRAY(DataTypes.STRING().notNull())),
+ new DataField(17, "_EXTERNAL_PATH",
newStringType(true))));
public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
@@ -120,6 +121,9 @@ public class DataFileMeta {
private final @Nullable List<String> valueStatsCols;
+ /** external path of file, if it is null, it is in the default warehouse
path. */
+ private final @Nullable String externalPath;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -149,7 +153,8 @@ public class DataFileMeta {
0L,
embeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ null);
}
public DataFileMeta(
@@ -186,7 +191,8 @@ public class DataFileMeta {
deleteRowCount,
embeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ null);
}
public DataFileMeta(
@@ -222,7 +228,8 @@ public class DataFileMeta {
deleteRowCount,
embeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ null);
}
public DataFileMeta(
@@ -242,7 +249,8 @@ public class DataFileMeta {
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols) {
+ @Nullable List<String> valueStatsCols,
+ @Nullable String externalPath) {
this.fileName = fileName;
this.fileSize = fileSize;
@@ -264,6 +272,7 @@ public class DataFileMeta {
this.deleteRowCount = deleteRowCount;
this.fileSource = fileSource;
this.valueStatsCols = valueStatsCols;
+ this.externalPath = externalPath;
}
public String fileName() {
@@ -357,6 +366,11 @@ public class DataFileMeta {
return split[split.length - 1];
}
+ @Nullable
+ public String externalPath() {
+ return externalPath;
+ }
+
public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}
@@ -385,7 +399,8 @@ public class DataFileMeta {
deleteRowCount,
embeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ externalPath);
}
public DataFileMeta rename(String newFileName) {
@@ -406,7 +421,8 @@ public class DataFileMeta {
deleteRowCount,
embeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ externalPath);
}
public DataFileMeta copyWithoutStats() {
@@ -427,7 +443,8 @@ public class DataFileMeta {
deleteRowCount,
embeddedIndex,
fileSource,
- Collections.emptyList());
+ Collections.emptyList(),
+ externalPath);
}
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -455,7 +472,8 @@ public class DataFileMeta {
deleteRowCount,
embeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ externalPath);
}
public DataFileMeta copy(byte[] newEmbeddedIndex) {
@@ -476,7 +494,8 @@ public class DataFileMeta {
deleteRowCount,
newEmbeddedIndex,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ externalPath);
}
@Override
@@ -504,7 +523,8 @@ public class DataFileMeta {
&& Objects.equals(creationTime, that.creationTime)
&& Objects.equals(deleteRowCount, that.deleteRowCount)
&& Objects.equals(fileSource, that.fileSource)
- && Objects.equals(valueStatsCols, that.valueStatsCols);
+ && Objects.equals(valueStatsCols, that.valueStatsCols)
+ && Objects.equals(externalPath, that.externalPath);
}
@Override
@@ -526,7 +546,8 @@ public class DataFileMeta {
creationTime,
deleteRowCount,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ externalPath);
}
@Override
@@ -536,7 +557,7 @@ public class DataFileMeta {
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats:
%s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s,
creationTime: %s, "
- + "deleteRowCount: %d, fileSource: %s, valueStatsCols:
%s}",
+ + "deleteRowCount: %d, fileSource: %s, valueStatsCols:
%s, externalPath: %s}",
fileName,
fileSize,
rowCount,
@@ -553,7 +574,8 @@ public class DataFileMeta {
creationTime,
deleteRowCount,
fileSource,
- valueStatsCols);
+ valueStatsCols,
+ externalPath);
}
public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index 03e4ed51f4..e6c10f1534 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -133,6 +133,7 @@ public class DataFileMeta08Serializer implements
Serializable {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
index 2f8d89f5b1..36d1ad260f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
@@ -139,6 +139,7 @@ public class DataFileMeta09Serializer implements
Serializable {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
similarity index 92%
copy from
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
copy to
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 2f8d89f5b1..68ccba6ea3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -47,7 +47,7 @@ import static
org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** Serializer for {@link DataFileMeta} with 0.9 version. */
-public class DataFileMeta09Serializer implements Serializable {
+public class DataFileMeta10LegacySerializer implements Serializable {
private static final long serialVersionUID = 1L;
@@ -71,11 +71,15 @@ public class DataFileMeta09Serializer implements
Serializable {
new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()),
new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)),
new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)),
- new DataField(15, "_FILE_SOURCE", new
TinyIntType(true))));
+ new DataField(15, "_FILE_SOURCE", new
TinyIntType(true)),
+ new DataField(
+ 16,
+ "_VALUE_STATS_COLS",
+
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
protected final InternalRowSerializer rowSerializer;
- public DataFileMeta09Serializer() {
+ public DataFileMeta10LegacySerializer() {
this.rowSerializer = InternalSerializers.create(SCHEMA);
}
@@ -105,7 +109,8 @@ public class DataFileMeta09Serializer implements
Serializable {
meta.creationTime(),
meta.deleteRowCount().orElse(null),
meta.embeddedIndex(),
-
meta.fileSource().map(FileSource::toByteValue).orElse(null));
+
meta.fileSource().map(FileSource::toByteValue).orElse(null),
+ toStringArrayData(meta.valueStatsCols()));
rowSerializer.serialize(row, target);
}
@@ -139,6 +144,7 @@ public class DataFileMeta09Serializer implements
Serializable {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
+ row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)),
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index 626201ca30..c8a5e326b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -58,7 +58,8 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
meta.deleteRowCount().orElse(null),
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
- toStringArrayData(meta.valueStatsCols()));
+ toStringArrayData(meta.valueStatsCols()),
+ BinaryString.fromString(meta.externalPath()));
}
@Override
@@ -80,6 +81,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
- row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)));
+ row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)),
+ row.isNullAt(17) ? null : row.getString(17).toString());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 3e351cd1da..5da96da765 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -155,6 +155,7 @@ public class CommitMessageLegacyV2Serializer {
null,
null,
null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 9fc251c366..c65f8302aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
+import org.apache.paimon.io.DataFileMeta10LegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
@@ -47,11 +48,12 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 5;
+ private static final int CURRENT_VERSION = 6;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
+ private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
private DataFileMeta09Serializer dataFile09Serializer;
private DataFileMeta08Serializer dataFile08Serializer;
private IndexFileMeta09Serializer indexEntry09Serializer;
@@ -129,8 +131,13 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
int version, DataInputView view) {
- if (version >= 4) {
+ if (version >= 5) {
return () -> dataFileSerializer.deserializeList(view);
+ } else if (version == 4) {
+ if (dataFileMeta10LegacySerializer == null) {
+ dataFileMeta10LegacySerializer = new
DataFileMeta10LegacySerializer();
+ }
+ return () -> dataFileMeta10LegacySerializer.deserializeList(view);
} else if (version == 3) {
if (dataFile09Serializer == null) {
dataFile09Serializer = new DataFileMeta09Serializer();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index b9460f28b4..40673ee788 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
+import org.apache.paimon.io.DataFileMeta10LegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
@@ -51,7 +52,7 @@ public class DataSplit implements Split {
private static final long serialVersionUID = 7L;
private static final long MAGIC = -2394839472490812314L;
- private static final int VERSION = 4;
+ private static final int VERSION = 5;
private long snapshotId = 0;
private BinaryRow partition;
@@ -362,7 +363,10 @@ public class DataSplit implements Split {
} else if (version == 2) {
DataFileMeta09Serializer serializer = new
DataFileMeta09Serializer();
return serializer::deserialize;
- } else if (version >= 3) {
+ } else if (version == 3) {
+ DataFileMeta10LegacySerializer serializer = new
DataFileMeta10LegacySerializer();
+ return serializer::deserialize;
+ } else if (version >= 4) {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
return serializer::deserialize;
} else {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index be41477356..27fa311ddb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -160,6 +160,7 @@ public class IndexBootstrapTest extends TableTestBase {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index 48c8d44876..a44ef9a530 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -57,6 +57,7 @@ public class DataFileTestUtils {
maxSeq - minSeq + 1,
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index fbc02b2d73..34af551659 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -75,7 +75,8 @@ public class ManifestCommittableSerializerCompatibilityTest {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
- Arrays.asList("field1", "field2", "field3"));
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs://localhost:9000/path/to/file");
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
@@ -106,6 +107,76 @@ public class
ManifestCommittableSerializerCompatibilityTest {
assertThat(deserialized).isEqualTo(manifestCommittable);
}
+ @Test
+ public void testCompatibilityToVersion5() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs://localhost:9000/path/to/file");
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v5"),
+ true);
+ deserialized = serializer.deserialize(2, v2Bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
@Test
public void testCompatibilityToVersion4() throws IOException {
SimpleStats keyStats =
@@ -136,7 +207,8 @@ public class ManifestCommittableSerializerCompatibilityTest
{
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
- Arrays.asList("field1", "field2", "field3"));
+ Arrays.asList("field1", "field2", "field3"),
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
@@ -206,6 +278,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -276,6 +349,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
11L,
new byte[] {1, 2, 4},
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -346,6 +420,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
null,
null,
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 52d82e76be..19bd6a856b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -95,6 +95,7 @@ public abstract class ManifestFileMetaTestBase {
0L, // not used
embeddedIndex, // not used
FileSource.APPEND,
+ null,
null));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index bdee5c5f75..94c11498c5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -184,6 +184,7 @@ public class IntervalPartitionTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 9dc9834373..abff820b2c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -214,6 +214,7 @@ public class ExpireSnapshotsTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 0219941a0a..88394d2dc3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -139,7 +139,8 @@ public class SplitTest {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
- Arrays.asList("field1", "field2", "field3"));
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs:///path/to/warehouse");
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
@@ -194,6 +195,7 @@ public class SplitTest {
11L,
new byte[] {1, 2, 4},
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -254,6 +256,7 @@ public class SplitTest {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -314,7 +317,8 @@ public class SplitTest {
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
- Arrays.asList("field1", "field2", "field3"));
+ Arrays.asList("field1", "field2", "field3"),
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
null);
@@ -347,6 +351,71 @@ public class SplitTest {
assertThat(actual).isEqualTo(split);
}
+ @Test
+ public void testSerializerCompatibleV4() throws Exception {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs:///path/to/warehouse");
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
null);
+ List<DeletionFile> deletionFiles =
Collections.singletonList(deletionFile);
+
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+ binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+ binaryRowWriter.complete();
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(18)
+ .withPartition(partition)
+ .withBucket(20)
+ .withDataFiles(dataFiles)
+ .withDataDeletionFiles(deletionFiles)
+ .withBucketPath("my path")
+ .build();
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ SplitTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/datasplit-v4"),
+ true);
+
+ DataSplit actual =
+ InstantiationUtil.deserializeObject(v2Bytes,
DataSplit.class.getClassLoader());
+ assertThat(actual).isEqualTo(split);
+ }
+
private DataFileMeta newDataFile(long rowCount) {
return DataFileMeta.forAppend(
"my_data_file.parquet",
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v4
b/paimon-core/src/test/resources/compatibility/datasplit-v4
new file mode 100644
index 0000000000..6ccef002b1
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/datasplit-v4 differ
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v5
b/paimon-core/src/test/resources/compatibility/manifest-committable-v5
new file mode 100644
index 0000000000..8b2b05869b
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 differ