This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 47a8871aeb [core] Store `cardinality` in the deletion vector meta
(#4699)
47a8871aeb is described below
commit 47a8871aeb5cba3267192d4971ac82e340a30956
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Dec 13 22:15:46 2024 +0800
[core] Store `cardinality` in the deletion vector meta (#4699)
---
docs/content/concepts/spec/manifest.md | 8 +-
.../deletionvectors/BitmapDeletionVector.java | 5 +
.../DeletionVectorIndexFileWriter.java | 15 +--
.../deletionvectors/DeletionVectorsIndexFile.java | 19 ++--
.../apache/paimon/index/DeletionVectorMeta.java | 103 ++++++++++++++++++++
.../org/apache/paimon/index/IndexFileHandler.java | 17 ++--
.../org/apache/paimon/index/IndexFileMeta.java | 29 +++---
.../paimon/index/IndexFileMeta09Serializer.java | 104 +++++++++++++++++++++
.../paimon/index/IndexFileMetaSerializer.java | 39 ++++----
.../apache/paimon/manifest/IndexManifestEntry.java | 8 +-
.../manifest/IndexManifestEntrySerializer.java | 12 ++-
.../paimon/table/sink/CommitMessageSerializer.java | 78 ++++++++--------
.../org/apache/paimon/table/source/DataSplit.java | 29 ++++--
.../apache/paimon/table/source/DeletionFile.java | 57 +++++++++--
.../table/source/snapshot/SnapshotReaderImpl.java | 18 ++--
.../append/AppendDeletionFileMaintainerTest.java | 15 +--
.../paimon/index/IndexFileMetaSerializerTest.java | 15 ++-
...festCommittableSerializerCompatibilityTest.java | 96 ++++++++++++++++---
.../table/sink/CommitMessageSerializerTest.java | 2 +-
.../org/apache/paimon/table/source/SplitTest.java | 68 ++++++++++++++
.../src/test/resources/compatibility/datasplit-v3 | Bin 0 -> 886 bytes
.../compatibility/manifest-committable-v4 | Bin 0 -> 3145 bytes
.../paimon/spark/sql/DeletionVectorTest.scala | 21 +++++
23 files changed, 595 insertions(+), 163 deletions(-)
diff --git a/docs/content/concepts/spec/manifest.md
b/docs/content/concepts/spec/manifest.md
index bc7318331a..8460febf78 100644
--- a/docs/content/concepts/spec/manifest.md
+++ b/docs/content/concepts/spec/manifest.md
@@ -111,5 +111,9 @@ The index file meta is:
2. fileName: file name.
3. fileSize: file size.
4. rowCount: total number of rows.
-5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", Stores
offset and length of each data file,
- The schema is `ARRAY<ROW<f0: STRING, f1: INT, f2: INT>>`.
+5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", is an
array of deletion vector meta, the schema of each deletion vector meta is:
+ 1. f0: the data file name corresponding to this deletion vector.
+ 2. f1: the starting offset of this deletion vector in the index file.
+ 3. f2: the length of this deletion vector in the index file.
+ 4. cardinality: the number of deleted rows.
+
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
index a2c5925966..51ae729c21 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -117,4 +117,9 @@ public class BitmapDeletionVector implements DeletionVector
{
BitmapDeletionVector that = (BitmapDeletionVector) o;
return Objects.equals(this.roaringBitmap, that.roaringBitmap);
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(roaringBitmap);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index f8c8330f19..5246d35d4b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -20,9 +20,9 @@ package org.apache.paimon.deletionvectors;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.Preconditions;
@@ -104,13 +104,13 @@ public class DeletionVectorIndexFileWriter {
private final Path path;
private final DataOutputStream dataOutputStream;
- private final LinkedHashMap<String, Pair<Integer, Integer>> dvRanges;
+ private final LinkedHashMap<String, DeletionVectorMeta> dvMetas;
private SingleIndexFileWriter() throws IOException {
this.path = indexPathFactory.newPath();
this.dataOutputStream = new
DataOutputStream(fileIO.newOutputStream(path, true));
dataOutputStream.writeByte(VERSION_ID_V1);
- this.dvRanges = new LinkedHashMap<>();
+ this.dvMetas = new LinkedHashMap<>();
}
private long writtenSizeInBytes() {
@@ -121,7 +121,10 @@ public class DeletionVectorIndexFileWriter {
Preconditions.checkNotNull(dataOutputStream);
byte[] data = deletionVector.serializeToBytes();
int size = data.length;
- dvRanges.put(key, Pair.of(dataOutputStream.size(), size));
+ dvMetas.put(
+ key,
+ new DeletionVectorMeta(
+ key, dataOutputStream.size(), size,
deletionVector.getCardinality()));
dataOutputStream.writeInt(size);
dataOutputStream.write(data);
dataOutputStream.writeInt(calculateChecksum(data));
@@ -132,8 +135,8 @@ public class DeletionVectorIndexFileWriter {
DELETION_VECTORS_INDEX,
path.getName(),
writtenSizeInBytes(),
- dvRanges.size(),
- dvRanges);
+ dvMetas.size(),
+ dvMetas);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 798404e001..77abb2d729 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -21,11 +21,11 @@ package org.apache.paimon.deletionvectors;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFile;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import java.io.DataInputStream;
@@ -63,9 +63,9 @@ public class DeletionVectorsIndexFile extends IndexFile {
* @throws UncheckedIOException If an I/O error occurs while reading from
the file.
*/
public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta
fileMeta) {
- LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
- fileMeta.deletionVectorsRanges();
- checkNotNull(deletionVectorRanges);
+ LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas =
+ fileMeta.deletionVectorMetas();
+ checkNotNull(deletionVectorMetas);
String indexFileName = fileMeta.fileName();
Map<String, DeletionVector> deletionVectors = new HashMap<>();
@@ -73,18 +73,17 @@ public class DeletionVectorsIndexFile extends IndexFile {
try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
- for (Map.Entry<String, Pair<Integer, Integer>> entry :
- deletionVectorRanges.entrySet()) {
+ for (DeletionVectorMeta deletionVectorMeta :
deletionVectorMetas.values()) {
deletionVectors.put(
- entry.getKey(),
- readDeletionVector(dataInputStream,
entry.getValue().getRight()));
+ deletionVectorMeta.dataFileName(),
+ readDeletionVector(dataInputStream,
deletionVectorMeta.length()));
}
} catch (Exception e) {
throw new RuntimeException(
"Unable to read deletion vectors from file: "
+ filePath
- + ", deletionVectorRanges: "
- + deletionVectorRanges,
+ + ", deletionVectorMetas: "
+ + deletionVectorMetas,
e);
}
return deletionVectors;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
new file mode 100644
index 0000000000..9eb38818f6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Metadata of deletion vector. */
+public class DeletionVectorMeta {
+
+ public static final RowType SCHEMA =
+ RowType.of(
+ new DataField(0, "f0", newStringType(false)),
+ new DataField(1, "f1", new IntType(false)),
+ new DataField(2, "f2", new IntType(false)),
+ new DataField(3, "_CARDINALITY", new BigIntType(true)));
+
+ private final String dataFileName;
+ private final int offset;
+ private final int length;
+ @Nullable private final Long cardinality;
+
+ public DeletionVectorMeta(
+ String dataFileName, int start, int size, @Nullable Long
cardinality) {
+ this.dataFileName = dataFileName;
+ this.offset = start;
+ this.length = size;
+ this.cardinality = cardinality;
+ }
+
+ public String dataFileName() {
+ return dataFileName;
+ }
+
+ public int offset() {
+ return offset;
+ }
+
+ public int length() {
+ return length;
+ }
+
+ @Nullable
+ public Long cardinality() {
+ return cardinality;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DeletionVectorMeta that = (DeletionVectorMeta) o;
+ return offset == that.offset
+ && length == that.length
+ && Objects.equals(dataFileName, that.dataFileName)
+ && Objects.equals(cardinality, that.cardinality);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataFileName, offset, length, cardinality);
+ }
+
+ @Override
+ public String toString() {
+ return "DeletionVectorMeta{"
+ + "dataFileName='"
+ + dataFileName
+ + '\''
+ + ", offset="
+ + offset
+ + ", length="
+ + length
+ + ", cardinality="
+ + cardinality
+ + '}';
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 7e5efccdd8..8b0e5c5021 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -100,15 +100,16 @@ public class IndexFileHandler {
if (meta.indexType().equals(DELETION_VECTORS_INDEX)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
- LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
- meta.deletionVectorsRanges();
- checkNotNull(dvRanges);
- for (String dataFile : dvRanges.keySet()) {
- Pair<Integer, Integer> pair = dvRanges.get(dataFile);
- DeletionFile deletionFile =
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
meta.deletionVectorMetas();
+ checkNotNull(dvMetas);
+ for (DeletionVectorMeta dvMeta : dvMetas.values()) {
+ result.put(
+ dvMeta.dataFileName(),
new DeletionFile(
- filePath(meta).toString(), pair.getLeft(),
pair.getRight());
- result.put(dataFile, deletionFile);
+ filePath(meta).toString(),
+ dvMeta.offset(),
+ dvMeta.length(),
+ dvMeta.cardinality()));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
index 24ba6992a5..aae4f8c473 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -23,9 +23,7 @@ import
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Pair;
import javax.annotation.Nullable;
@@ -54,12 +52,7 @@ public class IndexFileMeta {
new DataField(
4,
"_DELETIONS_VECTORS_RANGES",
- new ArrayType(
- true,
- RowType.of(
- newStringType(false),
- new IntType(false),
- new IntType(false))))));
+ new ArrayType(true,
DeletionVectorMeta.SCHEMA))));
private final String indexType;
private final String fileName;
@@ -68,9 +61,9 @@ public class IndexFileMeta {
/**
* Metadata only used by {@link DeletionVectorsIndexFile}, use
LinkedHashMap to ensure that the
- * order of DeletionVectorRanges and the written DeletionVectors is
consistent.
+ * order of DeletionVectorMetas and the written DeletionVectors is
consistent.
*/
- private final @Nullable LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges;
+ private final @Nullable LinkedHashMap<String, DeletionVectorMeta>
deletionVectorMetas;
public IndexFileMeta(String indexType, String fileName, long fileSize,
long rowCount) {
this(indexType, fileName, fileSize, rowCount, null);
@@ -81,12 +74,12 @@ public class IndexFileMeta {
String fileName,
long fileSize,
long rowCount,
- @Nullable LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges) {
+ @Nullable LinkedHashMap<String, DeletionVectorMeta>
deletionVectorMetas) {
this.indexType = indexType;
this.fileName = fileName;
this.fileSize = fileSize;
this.rowCount = rowCount;
- this.deletionVectorsRanges = deletionVectorsRanges;
+ this.deletionVectorMetas = deletionVectorMetas;
}
public String indexType() {
@@ -105,8 +98,8 @@ public class IndexFileMeta {
return rowCount;
}
- public @Nullable LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges() {
- return deletionVectorsRanges;
+ public @Nullable LinkedHashMap<String, DeletionVectorMeta>
deletionVectorMetas() {
+ return deletionVectorMetas;
}
@Override
@@ -122,12 +115,12 @@ public class IndexFileMeta {
&& Objects.equals(fileName, that.fileName)
&& fileSize == that.fileSize
&& rowCount == that.rowCount
- && Objects.equals(deletionVectorsRanges,
that.deletionVectorsRanges);
+ && Objects.equals(deletionVectorMetas,
that.deletionVectorMetas);
}
@Override
public int hashCode() {
- return Objects.hash(indexType, fileName, fileSize, rowCount,
deletionVectorsRanges);
+ return Objects.hash(indexType, fileName, fileSize, rowCount,
deletionVectorMetas);
}
@Override
@@ -142,8 +135,8 @@ public class IndexFileMeta {
+ fileSize
+ ", rowCount="
+ rowCount
- + ", deletionVectorsRanges="
- + deletionVectorsRanges
+ + ", deletionVectorMetas="
+ + deletionVectorMetas
+ '}';
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java
new file mode 100644
index 0000000000..915d904569
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Serializer for {@link IndexFileMeta} with 0.9 version. */
+public class IndexFileMeta09Serializer implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final RowType SCHEMA =
+ new RowType(
+ false,
+ Arrays.asList(
+ new DataField(0, "_INDEX_TYPE",
newStringType(false)),
+ new DataField(1, "_FILE_NAME",
newStringType(false)),
+ new DataField(2, "_FILE_SIZE", new
BigIntType(false)),
+ new DataField(3, "_ROW_COUNT", new
BigIntType(false)),
+ new DataField(
+ 4,
+ "_DELETIONS_VECTORS_RANGES",
+ new ArrayType(
+ true,
+ RowType.of(
+ newStringType(false),
+ new IntType(false),
+ new IntType(false))))));
+
+ protected final InternalRowSerializer rowSerializer;
+
+ public IndexFileMeta09Serializer() {
+ this.rowSerializer = InternalSerializers.create(SCHEMA);
+ }
+
+ public IndexFileMeta fromRow(InternalRow row) {
+ return new IndexFileMeta(
+ row.getString(0).toString(),
+ row.getString(1).toString(),
+ row.getLong(2),
+ row.getLong(3),
+ row.isNullAt(4) ? null :
rowArrayDataToDvMetas(row.getArray(4)));
+ }
+
+ public final List<IndexFileMeta> deserializeList(DataInputView source)
throws IOException {
+ int size = source.readInt();
+ List<IndexFileMeta> records = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ records.add(deserialize(source));
+ }
+ return records;
+ }
+
+ public IndexFileMeta deserialize(DataInputView in) throws IOException {
+ return fromRow(rowSerializer.deserialize(in));
+ }
+
+ public static LinkedHashMap<String, DeletionVectorMeta>
rowArrayDataToDvMetas(
+ InternalArray arrayData) {
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>(arrayData.size());
+ for (int i = 0; i < arrayData.size(); i++) {
+ InternalRow row = arrayData.getRow(i, 3);
+ dvMetas.put(
+ row.getString(0).toString(),
+ new DeletionVectorMeta(
+ row.getString(0).toString(), row.getInt(1),
row.getInt(2), null));
+ }
+ return dvMetas;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 4b52932623..db4a44838f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -24,9 +24,9 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.ObjectSerializer;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.VersionedObjectSerializer;
+import java.util.Collection;
import java.util.LinkedHashMap;
/** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
@@ -43,9 +43,9 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
BinaryString.fromString(record.fileName()),
record.fileSize(),
record.rowCount(),
- record.deletionVectorsRanges() == null
+ record.deletionVectorMetas() == null
? null
- :
dvRangesToRowArrayData(record.deletionVectorsRanges()));
+ :
dvMetasToRowArrayData(record.deletionVectorMetas().values()));
}
@Override
@@ -55,30 +55,35 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
row.getString(1).toString(),
row.getLong(2),
row.getLong(3),
- row.isNullAt(4) ? null :
rowArrayDataToDvRanges(row.getArray(4)));
+ row.isNullAt(4) ? null :
rowArrayDataToDvMetas(row.getArray(4)));
}
- public static InternalArray dvRangesToRowArrayData(
- LinkedHashMap<String, Pair<Integer, Integer>> dvRanges) {
+ public static InternalArray
dvMetasToRowArrayData(Collection<DeletionVectorMeta> dvMetas) {
return new GenericArray(
- dvRanges.entrySet().stream()
+ dvMetas.stream()
.map(
- entry ->
+ dvMeta ->
GenericRow.of(
-
BinaryString.fromString(entry.getKey()),
- entry.getValue().getLeft(),
- entry.getValue().getRight()))
+
BinaryString.fromString(dvMeta.dataFileName()),
+ dvMeta.offset(),
+ dvMeta.length(),
+ dvMeta.cardinality()))
.toArray(GenericRow[]::new));
}
- public static LinkedHashMap<String, Pair<Integer, Integer>>
rowArrayDataToDvRanges(
+ public static LinkedHashMap<String, DeletionVectorMeta>
rowArrayDataToDvMetas(
InternalArray arrayData) {
- LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
- new LinkedHashMap<>(arrayData.size());
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>(arrayData.size());
for (int i = 0; i < arrayData.size(); i++) {
- InternalRow row = arrayData.getRow(i, 3);
- dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1),
row.getInt(2)));
+ InternalRow row = arrayData.getRow(i,
DeletionVectorMeta.SCHEMA.getFieldCount());
+ dvMetas.put(
+ row.getString(0).toString(),
+ new DeletionVectorMeta(
+ row.getString(0).toString(),
+ row.getInt(1),
+ row.getInt(2),
+ row.isNullAt(3) ? null : row.getLong(3)));
}
- return dvRanges;
+ return dvMetas;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
index a52d9e8af4..2431a1c264 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -20,6 +20,7 @@ package org.apache.paimon.manifest;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -57,12 +58,7 @@ public class IndexManifestEntry {
new DataField(
7,
"_DELETIONS_VECTORS_RANGES",
- new ArrayType(
- true,
- RowType.of(
- newStringType(false),
- new IntType(false),
- new IntType(false))))));
+ new ArrayType(true,
DeletionVectorMeta.SCHEMA))));
private final FileKind kind;
private final BinaryRow partition;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index 574e935550..6f2ec17dda 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -22,10 +22,9 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.utils.VersionedObjectSerializer;
-import static
org.apache.paimon.index.IndexFileMetaSerializer.dvRangesToRowArrayData;
-import static
org.apache.paimon.index.IndexFileMetaSerializer.rowArrayDataToDvRanges;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
@@ -52,9 +51,10 @@ public class IndexManifestEntrySerializer extends
VersionedObjectSerializer<Inde
BinaryString.fromString(indexFile.fileName()),
indexFile.fileSize(),
indexFile.rowCount(),
- record.indexFile().deletionVectorsRanges() == null
+ record.indexFile().deletionVectorMetas() == null
? null
- :
dvRangesToRowArrayData(record.indexFile().deletionVectorsRanges()));
+ : IndexFileMetaSerializer.dvMetasToRowArrayData(
+
record.indexFile().deletionVectorMetas().values()));
}
@Override
@@ -72,6 +72,8 @@ public class IndexManifestEntrySerializer extends
VersionedObjectSerializer<Inde
row.getString(4).toString(),
row.getLong(5),
row.getLong(6),
- row.isNullAt(7) ? null :
rowArrayDataToDvRanges(row.getArray(7))));
+ row.isNullAt(7)
+ ? null
+ :
IndexFileMetaSerializer.rowArrayDataToDvMetas(row.getArray(7))));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 7918914b2c..9fc251c366 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -19,6 +19,8 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.data.serializer.VersionedSerializer;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMeta09Serializer;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
@@ -45,12 +47,14 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 4;
+ private static final int CURRENT_VERSION = 5;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
+ private DataFileMeta09Serializer dataFile09Serializer;
private DataFileMeta08Serializer dataFile08Serializer;
+ private IndexFileMeta09Serializer indexEntry09Serializer;
public CommitMessageSerializer() {
this.dataFileSerializer = new DataFileMetaSerializer();
@@ -107,48 +111,48 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
}
private CommitMessage deserialize(int version, DataInputView view) throws
IOException {
- if (version >= 3) {
- IOExceptionSupplier<List<DataFileMeta>> fileDeserializer =
- () -> dataFileSerializer.deserializeList(view);
- if (version == 3) {
- DataFileMeta09Serializer serializer = new
DataFileMeta09Serializer();
- fileDeserializer = () -> serializer.deserializeList(view);
- }
- return new CommitMessageImpl(
- deserializeBinaryRow(view),
- view.readInt(),
- new DataIncrement(
- fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
- new CompactIncrement(
- fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
- new IndexIncrement(
- indexEntrySerializer.deserializeList(view),
- indexEntrySerializer.deserializeList(view)));
- } else {
- return deserialize08(version, view);
- }
- }
-
- private CommitMessage deserialize08(int version, DataInputView view)
throws IOException {
- if (dataFile08Serializer == null) {
- dataFile08Serializer = new DataFileMeta08Serializer();
- }
+ IOExceptionSupplier<List<DataFileMeta>> fileDeserializer =
fileDeserializer(version, view);
+ IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer =
+ indexEntryDeserializer(version, view);
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
new DataIncrement(
- dataFile08Serializer.deserializeList(view),
- dataFile08Serializer.deserializeList(view),
- dataFile08Serializer.deserializeList(view)),
+ fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
new CompactIncrement(
- dataFile08Serializer.deserializeList(view),
- dataFile08Serializer.deserializeList(view),
- dataFile08Serializer.deserializeList(view)),
+ fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
new IndexIncrement(
- indexEntrySerializer.deserializeList(view),
- version <= 2
- ? Collections.emptyList()
- : indexEntrySerializer.deserializeList(view)));
+ indexEntryDeserializer.get(),
+ version <= 2 ? Collections.emptyList() :
indexEntryDeserializer.get()));
+ }
+
+ private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
+ int version, DataInputView view) {
+ if (version >= 4) {
+ return () -> dataFileSerializer.deserializeList(view);
+ } else if (version == 3) {
+ if (dataFile09Serializer == null) {
+ dataFile09Serializer = new DataFileMeta09Serializer();
+ }
+ return () -> dataFile09Serializer.deserializeList(view);
+ } else {
+ if (dataFile08Serializer == null) {
+ dataFile08Serializer = new DataFileMeta08Serializer();
+ }
+ return () -> dataFile08Serializer.deserializeList(view);
+ }
+ }
+
+ private IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer(
+ int version, DataInputView view) {
+ if (version >= 5) {
+ return () -> indexEntrySerializer.deserializeList(view);
+ } else {
+ if (indexEntry09Serializer == null) {
+ indexEntry09Serializer = new IndexFileMeta09Serializer();
+ }
+ return () -> indexEntry09Serializer.deserializeList(view);
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 1dac6584d6..29405466b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -50,7 +50,7 @@ public class DataSplit implements Split {
private static final long serialVersionUID = 7L;
private static final long MAGIC = -2394839472490812314L;
- private static final int VERSION = 3;
+ private static final int VERSION = 4;
private long snapshotId = 0;
private BinaryRow partition;
@@ -272,13 +272,16 @@ public class DataSplit implements Split {
FunctionWithIOException<DataInputView, DataFileMeta> dataFileSer =
getFileMetaSerde(version);
+ FunctionWithIOException<DataInputView, DeletionFile> deletionFileSerde
=
+ getDeletionFileSerde(version);
int beforeNumber = in.readInt();
List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
for (int i = 0; i < beforeNumber; i++) {
beforeFiles.add(dataFileSer.apply(in));
}
- List<DeletionFile> beforeDeletionFiles =
DeletionFile.deserializeList(in);
+ List<DeletionFile> beforeDeletionFiles =
+ DeletionFile.deserializeList(in, deletionFileSerde);
int fileNumber = in.readInt();
List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
@@ -286,7 +289,7 @@ public class DataSplit implements Split {
dataFiles.add(dataFileSer.apply(in));
}
- List<DeletionFile> dataDeletionFiles =
DeletionFile.deserializeList(in);
+ List<DeletionFile> dataDeletionFiles =
DeletionFile.deserializeList(in, deletionFileSerde);
boolean isStreaming = in.readBoolean();
boolean rawConvertible = in.readBoolean();
@@ -319,16 +322,22 @@ public class DataSplit implements Split {
} else if (version == 2) {
DataFileMeta09Serializer serializer = new
DataFileMeta09Serializer();
return serializer::deserialize;
- } else if (version == 3) {
+ } else if (version >= 3) {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
return serializer::deserialize;
} else {
- throw new UnsupportedOperationException(
- "Expecting DataSplit version to be smaller or equal than "
- + VERSION
- + ", but found "
- + version
- + ".");
+ throw new UnsupportedOperationException("Unsupported version: " +
version);
+ }
+ }
+
+ private static FunctionWithIOException<DataInputView, DeletionFile>
getDeletionFileSerde(
+ int version) {
+ if (version >= 1 && version <= 3) {
+ return DeletionFile::deserializeV3;
+ } else if (version >= 4) {
+ return DeletionFile::deserialize;
+ } else {
+ throw new UnsupportedOperationException("Unsupported version: " +
version);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
index 94dfc61572..5bcf6898ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Public;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.utils.FunctionWithIOException;
import javax.annotation.Nullable;
@@ -52,11 +53,13 @@ public class DeletionFile implements Serializable {
private final String path;
private final long offset;
private final long length;
+ @Nullable private final Long cardinality;
- public DeletionFile(String path, long offset, long length) {
+ public DeletionFile(String path, long offset, long length, @Nullable Long
cardinality) {
this.path = path;
this.offset = offset;
this.length = length;
+ this.cardinality = cardinality;
}
/** Path of the file. */
@@ -74,6 +77,12 @@ public class DeletionFile implements Serializable {
return length;
}
+ /** the number of deleted rows. */
+ @Nullable
+ public Long cardinality() {
+ return cardinality;
+ }
+
public static void serialize(DataOutputView out, @Nullable DeletionFile
file)
throws IOException {
if (file == null) {
@@ -83,6 +92,7 @@ public class DeletionFile implements Serializable {
out.writeUTF(file.path);
out.writeLong(file.offset);
out.writeLong(file.length);
+ out.writeLong(file.cardinality == null ? -1 : file.cardinality);
}
}
@@ -108,17 +118,32 @@ public class DeletionFile implements Serializable {
String path = in.readUTF();
long offset = in.readLong();
long length = in.readLong();
- return new DeletionFile(path, offset, length);
+ long cardinality = in.readLong();
+ return new DeletionFile(path, offset, length, cardinality == -1 ? null
: cardinality);
}
@Nullable
- public static List<DeletionFile> deserializeList(DataInputView in) throws
IOException {
+ public static DeletionFile deserializeV3(DataInputView in) throws
IOException {
+ if (in.readByte() == 0) {
+ return null;
+ }
+
+ String path = in.readUTF();
+ long offset = in.readLong();
+ long length = in.readLong();
+ return new DeletionFile(path, offset, length, null);
+ }
+
+ @Nullable
+ public static List<DeletionFile> deserializeList(
+ DataInputView in, FunctionWithIOException<DataInputView,
DeletionFile> deserialize)
+ throws IOException {
List<DeletionFile> files = null;
if (in.readByte() == 1) {
int size = in.readInt();
files = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- files.add(DeletionFile.deserialize(in));
+ files.add(deserialize.apply(in));
}
}
return files;
@@ -126,22 +151,34 @@ public class DeletionFile implements Serializable {
@Override
public boolean equals(Object o) {
- if (!(o instanceof DeletionFile)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
-
- DeletionFile other = (DeletionFile) o;
- return Objects.equals(path, other.path) && offset == other.offset &&
length == other.length;
+ DeletionFile that = (DeletionFile) o;
+ return offset == that.offset
+ && length == that.length
+ && Objects.equals(path, that.path)
+ && Objects.equals(cardinality, that.cardinality);
}
@Override
public int hashCode() {
- return Objects.hash(path, offset, length);
+ return Objects.hash(path, offset, length, cardinality);
}
@Override
public String toString() {
- return String.format("{path = %s, offset = %d, length = %d}", path,
offset, length);
+ return "DeletionFile{"
+ + "path='"
+ + path
+ + '\''
+ + ", offset="
+ + offset
+ + ", length="
+ + length
+ + ", cardinality="
+ + cardinality
+ + '}';
}
static Factory emptyFactory() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ce01bdba94..bf19ba10c6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
@@ -492,23 +493,24 @@ public class SnapshotReaderImpl implements SnapshotReader
{
List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
for (IndexFileMeta indexFileMeta : indexFileMetas) {
- if (indexFileMeta.deletionVectorsRanges() != null) {
- for (String dataFileName :
indexFileMeta.deletionVectorsRanges().keySet()) {
- dataFileToIndexFileMeta.put(dataFileName, indexFileMeta);
+ if (indexFileMeta.deletionVectorMetas() != null) {
+ for (DeletionVectorMeta dvMeta :
indexFileMeta.deletionVectorMetas().values()) {
+ dataFileToIndexFileMeta.put(dvMeta.dataFileName(),
indexFileMeta);
}
}
}
for (DataFileMeta file : dataFiles) {
IndexFileMeta indexFileMeta =
dataFileToIndexFileMeta.get(file.fileName());
if (indexFileMeta != null) {
- Map<String, Pair<Integer, Integer>> ranges =
indexFileMeta.deletionVectorsRanges();
- if (ranges != null && ranges.containsKey(file.fileName())) {
- Pair<Integer, Integer> range = ranges.get(file.fileName());
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+ indexFileMeta.deletionVectorMetas();
+ if (dvMetas != null && dvMetas.containsKey(file.fileName())) {
deletionFiles.add(
new DeletionFile(
indexFileHandler.filePath(indexFileMeta).toString(),
- range.getKey(),
- range.getValue()));
+ dvMetas.get(file.fileName()).offset(),
+ dvMetas.get(file.fileName()).length(),
+
dvMetas.get(file.fileName()).cardinality()));
continue;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 6c674352b8..a52819c805 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -23,12 +23,12 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.junit.jupiter.api.Test;
@@ -94,7 +94,7 @@ class AppendDeletionFileMaintainerTest {
assertThat(res.size()).isEqualTo(3);
IndexManifestEntry entry =
res.stream().filter(file -> file.kind() ==
FileKind.ADD).findAny().get();
-
assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue();
+
assertThat(entry.indexFile().deletionVectorMetas().containsKey("f2")).isTrue();
entry =
res.stream()
.filter(file -> file.kind() == FileKind.DELETE)
@@ -117,14 +117,15 @@ class AppendDeletionFileMaintainerTest {
PathFactory indexPathFactory, List<IndexFileMeta> fileMetas) {
Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
for (IndexFileMeta indexFileMeta : fileMetas) {
- for (Map.Entry<String, Pair<Integer, Integer>> range :
- indexFileMeta.deletionVectorsRanges().entrySet()) {
+ for (Map.Entry<String, DeletionVectorMeta> dvMeta :
+ indexFileMeta.deletionVectorMetas().entrySet()) {
dataFileToDeletionFiles.put(
- range.getKey(),
+ dvMeta.getKey(),
new DeletionFile(
indexPathFactory.toPath(indexFileMeta.fileName()).toString(),
- range.getValue().getLeft(),
- range.getValue().getRight()));
+ dvMeta.getValue().offset(),
+ dvMeta.getValue().length(),
+ dvMeta.getValue().cardinality()));
}
}
return dataFileToDeletionFiles;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 724d5b4163..a7e692d2e5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.index;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.ObjectSerializerTestBase;
-import org.apache.paimon.utils.Pair;
import java.util.LinkedHashMap;
import java.util.Random;
@@ -59,14 +58,20 @@ public class IndexFileMetaSerializerTest extends
ObjectSerializerTestBase<IndexF
public static IndexFileMeta randomDeletionVectorIndexFile() {
Random rnd = new Random();
- LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges =
new LinkedHashMap<>();
- deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(),
rnd.nextInt()));
- deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(),
rnd.nextInt()));
+ LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas = new
LinkedHashMap<>();
+ deletionVectorMetas.put(
+ "my_file_name1",
+ new DeletionVectorMeta(
+ "my_file_name1", rnd.nextInt(), rnd.nextInt(),
rnd.nextLong()));
+ deletionVectorMetas.put(
+ "my_file_name2",
+ new DeletionVectorMeta(
+ "my_file_name2", rnd.nextInt(), rnd.nextInt(),
rnd.nextLong()));
return new IndexFileMeta(
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
"deletion_vectors_index_file_name" + rnd.nextLong(),
rnd.nextInt(),
rnd.nextInt(),
- deletionVectorsRanges);
+ deletionVectorMetas);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index bd272b745d..fbc02b2d73 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
@@ -27,7 +28,6 @@ import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
@@ -78,11 +78,11 @@ public class ManifestCommittableSerializerCompatibilityTest
{
Arrays.asList("field1", "field2", "field3"));
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
- LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>();
- dvRanges.put("dv_key1", Pair.of(1, 2));
- dvRanges.put("dv_key2", Pair.of(3, 4));
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
IndexFileMeta indexFile =
- new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvRanges);
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
CommitMessageImpl commitMessage =
@@ -106,6 +106,76 @@ public class
ManifestCommittableSerializerCompatibilityTest {
assertThat(deserialized).isEqualTo(manifestCommittable);
}
+ @Test
+ public void testCompatibilityToVersion4() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"));
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v4"),
+ true);
+ deserialized = serializer.deserialize(2, v2Bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
@Test
public void testCompatibilityToVersion3() throws IOException {
SimpleStats keyStats =
@@ -139,11 +209,11 @@ public class
ManifestCommittableSerializerCompatibilityTest {
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
- LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>();
- dvRanges.put("dv_key1", Pair.of(1, 2));
- dvRanges.put("dv_key2", Pair.of(3, 4));
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
IndexFileMeta indexFile =
- new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvRanges);
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
CommitMessageImpl commitMessage =
@@ -209,11 +279,11 @@ public class
ManifestCommittableSerializerCompatibilityTest {
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
- LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>();
- dvRanges.put("dv_key1", Pair.of(1, 2));
- dvRanges.put("dv_key2", Pair.of(3, 4));
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
IndexFileMeta indexFile =
- new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvRanges);
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
CommitMessageImpl commitMessage =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index eb9105189b..1f87838aea 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -48,7 +48,7 @@ public class CommitMessageSerializerTest {
CommitMessageImpl committable =
new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement, indexIncrement);
CommitMessageImpl newCommittable =
- (CommitMessageImpl) serializer.deserialize(3,
serializer.serialize(committable));
+ (CommitMessageImpl) serializer.deserialize(5,
serializer.serialize(committable));
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index c64a12ffae..359d38c973 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -107,6 +107,9 @@ public class SplitTest {
Arrays.asList("field1", "field2", "field3"));
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+ DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
+ List<DeletionFile> deletionFiles =
Collections.singletonList(deletionFile);
+
BinaryRow partition = new BinaryRow(1);
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
@@ -118,6 +121,7 @@ public class SplitTest {
.withPartition(partition)
.withBucket(20)
.withDataFiles(dataFiles)
+ .withDataDeletionFiles(deletionFiles)
.withBucketPath("my path")
.build();
@@ -243,4 +247,68 @@ public class SplitTest {
InstantiationUtil.deserializeObject(v2Bytes,
DataSplit.class.getClassLoader());
assertThat(actual).isEqualTo(split);
}
+
+ @Test
+ public void testSerializerCompatibleV3() throws Exception {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"));
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
null);
+ List<DeletionFile> deletionFiles =
Collections.singletonList(deletionFile);
+
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+ binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+ binaryRowWriter.complete();
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(18)
+ .withPartition(partition)
+ .withBucket(20)
+ .withDataFiles(dataFiles)
+ .withDataDeletionFiles(deletionFiles)
+ .withBucketPath("my path")
+ .build();
+
+ byte[] v2Bytes =
+ IOUtils.readFully(
+ SplitTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/datasplit-v3"),
+ true);
+
+ DataSplit actual =
+ InstantiationUtil.deserializeObject(v2Bytes,
DataSplit.class.getClassLoader());
+ assertThat(actual).isEqualTo(split);
+ }
}
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v3
b/paimon-core/src/test/resources/compatibility/datasplit-v3
new file mode 100644
index 0000000000..6b19fe2d95
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/datasplit-v3 differ
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v4
b/paimon-core/src/test/resources/compatibility/manifest-committable-v4
new file mode 100644
index 0000000000..9c095669a3
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v4 differ
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index ea8309e14f..46a423b9d6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -631,6 +631,27 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
)
}
+ test("Paimon deletionVector: get cardinality") {
+ sql(s"""
+ |CREATE TABLE T (id INT)
+ |TBLPROPERTIES (
+ | 'deletion-vectors.enabled' = 'true',
+ | 'bucket-key' = 'id',
+ | 'bucket' = '1'
+ |)
+ |""".stripMargin)
+
+ sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id FROM range (1, 50000)")
+ sql("DELETE FROM T WHERE id >= 111 and id <= 444")
+
+ val fileStore = loadTable("T").store()
+ val indexManifest =
fileStore.snapshotManager().latestSnapshot().indexManifest()
+ val entry =
fileStore.newIndexFileHandler().readManifest(indexManifest).get(0)
+ val dvMeta =
entry.indexFile().deletionVectorMetas().values().iterator().next()
+
+ assert(dvMeta.cardinality() == 334)
+ }
+
private def getPathName(path: String): String = {
new Path(path).getName
}