This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 317b4c58a7 [core] Integrate v2 dv with read/write (#5546)
317b4c58a7 is described below
commit 317b4c58a72505717c97164dcf4dd8d19fe9ec69
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Apr 28 21:18:06 2025 +0800
[core] Integrate v2 dv with read/write (#5546)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++
.../paimon/utils/OptimizedRoaringBitmap64.java | 13 ++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../deletionvectors/Bitmap64DeletionVector.java | 40 +++++++++-
.../deletionvectors/BitmapDeletionVector.java | 22 ++++++
.../paimon/deletionvectors/DeletionVector.java | 86 ++++++++++++++++------
.../DeletionVectorIndexFileWriter.java | 26 +++++--
.../deletionvectors/DeletionVectorsIndexFile.java | 29 ++++----
.../deletionvectors/DeletionVectorsMaintainer.java | 18 ++++-
.../paimon/deletionvectors/DeletionVectorTest.java | 7 +-
.../DeletionVectorsIndexFileTest.java | 40 +++++++++-
.../DeletionVectorsMaintainerTest.java | 47 ++++++++----
.../append/AppendDeletionFileMaintainerTest.java | 13 +++-
.../paimon/operation/FileStoreCommitTest.java | 10 ++-
.../apache/paimon/flink/BatchFileStoreITCase.java | 17 ++++-
.../apache/paimon/flink/CatalogTableITCase.java | 11 ++-
.../apache/paimon/flink/DeletionVectorITCase.java | 74 +++++++++++++------
.../flink/PrimaryKeyFileStoreTableITCase.java | 5 +-
.../paimon/spark/commands/PaimonCommand.scala | 11 ++-
.../paimon/spark/commands/PaimonSparkWriter.scala | 3 +-
.../spark/commands/SparkDeletionVectors.scala | 3 +-
.../paimon/spark/sql/DeletionVectorTest.scala | 29 ++++++--
23 files changed, 409 insertions(+), 115 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4aa54288ba..b25d1e8769 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -302,6 +302,12 @@ under the License.
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index
files containing deletion vectors are generated when data is written, which
marks the data for deletion. During read operations, by applying these index
files, merging can be avoided.</td>
</tr>
+ <tr>
+ <td><h5>deletion-vectors.version</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>The version of deletion vector, currently support v1 and v2,
default version is 1.</td>
+ </tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 537cb29fa7..0fc56757ac 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1510,6 +1510,13 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.ofMebiBytes(2))
.withDescription("The target size of deletion vector index
file.");
+ public static final ConfigOption<Integer> DELETION_VECTOR_VERSION =
+ key("deletion-vectors.version")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "The version of deletion vector, currently support
v1 and v2, default version is 1.");
+
public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG
=
key("delete.force-produce-changelog")
.booleanType()
@@ -2622,6 +2629,10 @@ public class CoreOptions implements Serializable {
return options.get(DELETION_VECTOR_INDEX_FILE_TARGET_SIZE);
}
+ public int deletionVectorVersion() {
+ return options.get(DELETION_VECTOR_VERSION);
+ }
+
public FileIndexOptions indexColumnsOptions() {
return new FileIndexOptions(this);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
b/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
index 5b14e25c30..7171771e9e 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.Arrays;
import java.util.List;
import java.util.function.LongConsumer;
@@ -314,4 +315,16 @@ public class OptimizedRoaringBitmap64 {
MAX_VALUE,
pos);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OptimizedRoaringBitmap64 that = (OptimizedRoaringBitmap64) o;
+ return Arrays.equals(this.bitmaps, that.bitmaps);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 03f0812d34..9a326f00a3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -242,7 +242,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
pathFactory().indexFileFactory(),
bucketMode() == BucketMode.BUCKET_UNAWARE
? options.deletionVectorIndexFileTargetSize()
- : MemorySize.ofBytes(Long.MAX_VALUE)));
+ : MemorySize.ofBytes(Long.MAX_VALUE),
+ options.deletionVectorVersion()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
index b6ccd50c20..e965d8bde5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.RoaringBitmap32;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.Objects;
import java.util.zip.CRC32;
/**
@@ -34,10 +35,12 @@ import java.util.zip.CRC32;
*/
public class Bitmap64DeletionVector implements DeletionVector {
+ public static final int VERSION = 2;
+
public static final int MAGIC_NUMBER = 1681511377;
public static final int LENGTH_SIZE_BYTES = 4;
public static final int CRC_SIZE_BYTES = 4;
- private static final int MAGIC_NUMBER_SIZE_BYTES = 4;
+ public static final int MAGIC_NUMBER_SIZE_BYTES = 4;
private static final int BITMAP_DATA_OFFSET = 4;
private final OptimizedRoaringBitmap64 roaringBitmap;
@@ -86,6 +89,11 @@ public class Bitmap64DeletionVector implements
DeletionVector {
return roaringBitmap.cardinality();
}
+ @Override
+ public int version() {
+ return VERSION;
+ }
+
@Override
public byte[] serializeToBytes() {
roaringBitmap.runLengthEncode(); // run-length encode the bitmap
before serializing
@@ -112,6 +120,13 @@ public class Bitmap64DeletionVector implements
DeletionVector {
return new Bitmap64DeletionVector(bitmap);
}
+ public static DeletionVector deserializeFromBitmapDataBytes(byte[] bytes) {
+ ByteBuffer bitmapData = ByteBuffer.wrap(bytes);
+ bitmapData.order(ByteOrder.LITTLE_ENDIAN);
+ OptimizedRoaringBitmap64 bitmap =
OptimizedRoaringBitmap64.deserialize(bitmapData);
+ return new Bitmap64DeletionVector(bitmap);
+ }
+
// computes and validates the length of the bitmap data (magic bytes +
bitmap)
private static int computeBitmapDataLength(OptimizedRoaringBitmap64
bitmap) {
long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes();
@@ -164,4 +179,27 @@ public class Bitmap64DeletionVector implements
DeletionVector {
crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
return (int) crc.getValue();
}
+
+ protected static int toLittleEndianInt(int bigEndianInt) {
+ byte[] bytes = ByteBuffer.allocate(4).putInt(bigEndianInt).array();
+
+ return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Bitmap64DeletionVector that = (Bitmap64DeletionVector) o;
+ return Objects.equals(this.roaringBitmap, that.roaringBitmap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(roaringBitmap);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
index 37b5fdde12..37e420c142 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -32,7 +32,10 @@ import java.util.Objects;
*/
public class BitmapDeletionVector implements DeletionVector {
+ public static final int VERSION = 1;
+
public static final int MAGIC_NUMBER = 1581511376;
+ public static final int MAGIC_NUMBER_SIZE_BYTES = 4;
private final RoaringBitmap32 roaringBitmap;
@@ -81,6 +84,11 @@ public class BitmapDeletionVector implements DeletionVector {
return roaringBitmap.getCardinality();
}
+ @Override
+ public int version() {
+ return VERSION;
+ }
+
@Override
public byte[] serializeToBytes() {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -93,6 +101,20 @@ public class BitmapDeletionVector implements DeletionVector
{
}
}
+ public static DeletionVector deserializeFromBytes(byte[] bytes) {
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ int magicNum = buffer.getInt();
+ if (magicNum == MAGIC_NUMBER) {
+ return deserializeFromByteBuffer(buffer);
+ } else {
+ throw new RuntimeException("Invalid magic number: " +
magicNum);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize deletion
vector", e);
+ }
+ }
+
/**
* Note: the result is read only, do not call any modify operation outside.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
index 4e1d15b044..6f4a399da4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -85,6 +85,9 @@ public interface DeletionVector {
/** @return the number of distinct integers added to the DeletionVector. */
long getCardinality();
+ /** @return the version of the deletion vector. */
+ int version();
+
/**
* Serializes the deletion vector to a byte array for storage or
transmission.
*
@@ -98,17 +101,13 @@ public interface DeletionVector {
* @param bytes The byte array containing the serialized deletion vector.
* @return A DeletionVector instance that represents the deserialized data.
*/
- static DeletionVector deserializeFromBytes(byte[] bytes) {
- try {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- int magicNum = buffer.getInt();
- if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
- return BitmapDeletionVector.deserializeFromByteBuffer(buffer);
- } else {
- throw new RuntimeException("Invalid magic number: " +
magicNum);
- }
- } catch (IOException e) {
- throw new RuntimeException("Unable to deserialize deletion
vector", e);
+ static DeletionVector deserializeFromBytes(byte[] bytes, int version) {
+ if (version == BitmapDeletionVector.VERSION) {
+ return BitmapDeletionVector.deserializeFromBytes(bytes);
+ } else if (version == Bitmap64DeletionVector.VERSION) {
+ return Bitmap64DeletionVector.deserializeFromBytes(bytes);
+ } else {
+ throw new RuntimeException("Invalid deletion vector version: " +
version);
}
}
@@ -117,21 +116,60 @@ public interface DeletionVector {
try (SeekableInputStream input = fileIO.newInputStream(path)) {
input.seek(deletionFile.offset());
DataInputStream dis = new DataInputStream(input);
- int actualSize = dis.readInt();
- if (actualSize != deletionFile.length()) {
+ // read bitmap length
+ int bitmapLength = dis.readInt();
+ // read magic number
+ int magicNumber = dis.readInt();
+ // v2 dv serializes magic number in little endian
+ int magicNumberInLittleEndian =
Bitmap64DeletionVector.toLittleEndianInt(magicNumber);
+
+ if (magicNumber == BitmapDeletionVector.MAGIC_NUMBER) {
+ if (bitmapLength != deletionFile.length()) {
+ throw new RuntimeException(
+ "Size not match, actual size: "
+ + bitmapLength
+ + ", expected size: "
+ + deletionFile.length()
+ + ", file path: "
+ + path);
+ }
+
+ // magic number has been read
+ byte[] bytes =
+ new byte[bitmapLength -
BitmapDeletionVector.MAGIC_NUMBER_SIZE_BYTES];
+ dis.readFully(bytes);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ return BitmapDeletionVector.deserializeFromByteBuffer(buffer);
+ } else if (magicNumberInLittleEndian ==
Bitmap64DeletionVector.MAGIC_NUMBER) {
+ long expectedBitmapLength =
+ deletionFile.length()
+ - Bitmap64DeletionVector.LENGTH_SIZE_BYTES
+ - Bitmap64DeletionVector.CRC_SIZE_BYTES;
+
+ if (bitmapLength != expectedBitmapLength) {
+ throw new RuntimeException(
+ "Size not match, actual size: "
+ + bitmapLength
+ + ", expected size: "
+ + expectedBitmapLength
+ + ", file path: "
+ + path);
+ }
+
+ // magic number have been read
+ byte[] bytes =
+ new byte[bitmapLength -
Bitmap64DeletionVector.MAGIC_NUMBER_SIZE_BYTES];
+ dis.readFully(bytes);
+ return
Bitmap64DeletionVector.deserializeFromBitmapDataBytes(bytes);
+ } else {
throw new RuntimeException(
- "Size not match, actual size: "
- + actualSize
- + ", expected size: "
- + deletionFile.length()
- + ", file path: "
- + path);
+ "Invalid magic number: "
+ + magicNumber
+ + ", v1 dv magic number: "
+ + BitmapDeletionVector.MAGIC_NUMBER
+ + ", v2 magic number: "
+ + Bitmap64DeletionVector.MAGIC_NUMBER);
}
-
- // read DeletionVector bytes
- byte[] bytes = new byte[actualSize];
- dis.readFully(bytes);
- return deserializeFromBytes(bytes);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index a4b45d1929..89e3b8ab36 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -37,8 +37,6 @@ import java.util.List;
import java.util.Map;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V1;
-import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V2;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.calculateChecksum;
/** Writer for deletion vector index file. */
@@ -48,7 +46,7 @@ public class DeletionVectorIndexFileWriter {
private final FileIO fileIO;
private final long targetSizeInBytes;
- private final int versionId;
+ private final int writeVersionId;
public DeletionVectorIndexFileWriter(
FileIO fileIO,
@@ -59,7 +57,7 @@ public class DeletionVectorIndexFileWriter {
this.fileIO = fileIO;
this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
- this.versionId = versionId;
+ this.writeVersionId = versionId;
}
/**
@@ -117,7 +115,7 @@ public class DeletionVectorIndexFileWriter {
private SingleIndexFileWriter() throws IOException {
this.path = indexPathFactory.newPath();
this.dataOutputStream = new
DataOutputStream(fileIO.newOutputStream(path, true));
- dataOutputStream.writeByte(versionId);
+ dataOutputStream.writeByte(writeVersionId);
this.dvMetas = new LinkedHashMap<>();
}
@@ -127,9 +125,23 @@ public class DeletionVectorIndexFileWriter {
private void write(String key, DeletionVector deletionVector) throws
IOException {
Preconditions.checkNotNull(dataOutputStream);
- if (versionId == VERSION_ID_V1) {
+ if (writeVersionId == BitmapDeletionVector.VERSION) {
+ Preconditions.checkArgument(
+ deletionVector instanceof BitmapDeletionVector,
+ "write version id is %s, but deletionVector is not an
instance of %s, actual class is %s.",
+ writeVersionId,
+ BitmapDeletionVector.class.getName(),
+ deletionVector.getClass().getName());
+
writeV1(key, deletionVector);
- } else if (versionId == VERSION_ID_V2) {
+ } else if (writeVersionId == Bitmap64DeletionVector.VERSION) {
+ Preconditions.checkArgument(
+ deletionVector instanceof Bitmap64DeletionVector,
+ "write version id is %s, but deletionVector is not an
instance of %s, actual class is %s.",
+ writeVersionId,
+ Bitmap64DeletionVector.class.getName(),
+ deletionVector.getClass().getName());
+
writeV2(key, deletionVector);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 85958e5625..62708c620c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -45,28 +45,29 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
public class DeletionVectorsIndexFile extends IndexFile {
public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
- // Current version id is 1
- public static final byte VERSION_ID_V1 = 1;
- public static final byte VERSION_ID_V2 = 2;
- private final byte writeVersionID;
+ private final int writeVersionID;
private final MemorySize targetSizePerIndexFile;
public DeletionVectorsIndexFile(
FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
- this(fileIO, pathFactory, targetSizePerIndexFile, VERSION_ID_V1);
+ this(fileIO, pathFactory, targetSizePerIndexFile,
BitmapDeletionVector.VERSION);
}
public DeletionVectorsIndexFile(
FileIO fileIO,
PathFactory pathFactory,
MemorySize targetSizePerIndexFile,
- byte writeVersionID) {
+ int writeVersionID) {
super(fileIO, pathFactory);
this.targetSizePerIndexFile = targetSizePerIndexFile;
this.writeVersionID = writeVersionID;
}
+ public int writeVersionID() {
+ return writeVersionID;
+ }
+
/**
* Reads all deletion vectors from a specified file.
*
@@ -174,26 +175,26 @@ public class DeletionVectorsIndexFile extends IndexFile {
private int checkVersion(InputStream in) throws IOException {
int version = in.read();
- if (version != VERSION_ID_V1 && version != VERSION_ID_V2) {
+ if (version != BitmapDeletionVector.VERSION && version !=
Bitmap64DeletionVector.VERSION) {
throw new RuntimeException(
"Version not match, actual version: "
+ version
+ ", expected version: "
- + VERSION_ID_V1
+ + BitmapDeletionVector.VERSION
+ " or "
- + VERSION_ID_V2);
+ + Bitmap64DeletionVector.VERSION);
}
return version;
}
private DeletionVector readDeletionVector(
DataInputStream inputStream, int size, int readVersion) {
- if (readVersion == VERSION_ID_V1) {
+ if (readVersion == BitmapDeletionVector.VERSION) {
return readV1DeletionVector(inputStream, size);
- } else if (readVersion == VERSION_ID_V2) {
+ } else if (readVersion == Bitmap64DeletionVector.VERSION) {
return readV2DeletionVector(inputStream, size);
} else {
- throw new RuntimeException("Unsupported DeletionVector version: "
+ writeVersionID);
+ throw new RuntimeException("Unsupported DeletionVector version: "
+ readVersion);
}
}
@@ -220,7 +221,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
+ ", expected checksum: "
+ checkSum);
}
- return DeletionVector.deserializeFromBytes(bytes);
+ return DeletionVector.deserializeFromBytes(bytes,
BitmapDeletionVector.VERSION);
} catch (IOException e) {
throw new UncheckedIOException("Unable to read deletion vector",
e);
}
@@ -231,7 +232,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
byte[] bytes = new byte[size];
inputStream.readFully(bytes);
- return Bitmap64DeletionVector.deserializeFromBytes(bytes);
+ return DeletionVector.deserializeFromBytes(bytes,
Bitmap64DeletionVector.VERSION);
} catch (IOException e) {
throw new UncheckedIOException("Unable to read deletion vector",
e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 7c578506d8..94d79ec6f6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -41,15 +41,27 @@ public class DeletionVectorsMaintainer {
private final IndexFileHandler indexFileHandler;
private final Map<String, DeletionVector> deletionVectors;
+ protected final int dvWriteVersion;
private boolean modified;
private DeletionVectorsMaintainer(
IndexFileHandler fileHandler, Map<String, DeletionVector>
deletionVectors) {
this.indexFileHandler = fileHandler;
this.deletionVectors = deletionVectors;
+ this.dvWriteVersion =
indexFileHandler.deletionVectorsIndex().writeVersionID();
this.modified = false;
}
+ private DeletionVector createNewDeletionVector() {
+ if (dvWriteVersion == BitmapDeletionVector.VERSION) {
+ return new BitmapDeletionVector();
+ } else if (dvWriteVersion == Bitmap64DeletionVector.VERSION) {
+ return new Bitmap64DeletionVector();
+ } else {
+ throw new RuntimeException("Invalid deletion vector version: " +
dvWriteVersion);
+ }
+ }
+
/**
* Notifies a new deletion which marks the specified row position as
deleted with the given file
* name.
@@ -59,7 +71,7 @@ public class DeletionVectorsMaintainer {
*/
public void notifyNewDeletion(String fileName, long position) {
DeletionVector deletionVector =
- deletionVectors.computeIfAbsent(fileName, k -> new
BitmapDeletionVector());
+ deletionVectors.computeIfAbsent(fileName, k ->
createNewDeletionVector());
if (deletionVector.checkedDelete(position)) {
modified = true;
}
@@ -139,6 +151,10 @@ public class DeletionVectorsMaintainer {
return deletionVectors;
}
+ public int dvWriteVersion() {
+ return dvWriteVersion;
+ }
+
public static Factory factory(IndexFileHandler handler) {
return new Factory(handler);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
index a5bd483f4a..bc3e11eb4d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
@@ -50,7 +50,8 @@ public class DeletionVectorTest {
assertThat(deletionVector.checkedDelete(i)).isFalse();
}
DeletionVector deserializedDeletionVector =
-
DeletionVector.deserializeFromBytes(deletionVector.serializeToBytes());
+ DeletionVector.deserializeFromBytes(
+ deletionVector.serializeToBytes(),
BitmapDeletionVector.VERSION);
assertThat(deletionVector.isEmpty()).isFalse();
assertThat(deserializedDeletionVector.isEmpty()).isFalse();
@@ -70,6 +71,7 @@ public class DeletionVectorTest {
for (int i = 0; i < 10000; i++) {
toDelete.add(ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE * 2L));
}
+ toDelete.add(1L);
HashSet<Long> notDelete = new HashSet<>();
for (long i = 0; i < 10000; i++) {
if (!toDelete.contains(i)) {
@@ -85,7 +87,8 @@ public class DeletionVectorTest {
assertThat(deletionVector.checkedDelete(i)).isFalse();
}
DeletionVector deserializedDeletionVector =
-
Bitmap64DeletionVector.deserializeFromBytes(deletionVector.serializeToBytes());
+ DeletionVector.deserializeFromBytes(
+ deletionVector.serializeToBytes(),
Bitmap64DeletionVector.VERSION);
assertThat(deletionVector.isEmpty()).isFalse();
assertThat(deserializedDeletionVector.isEmpty()).isFalse();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index a023e82639..508888e8d1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -20,8 +20,10 @@ package org.apache.paimon.deletionvectors;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.PathFactory;
import org.junit.jupiter.api.Test;
@@ -29,6 +31,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -255,6 +258,41 @@ public class DeletionVectorsIndexFileTest {
assertThat(dvs2.size()).isEqualTo(100000);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testReadDeletionFile(boolean isV2) throws IOException {
+ PathFactory pathFactory = getPathFactory();
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
+ deletionVectorsIndexFile(pathFactory, isV2);
+
+ HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+ DeletionVector index1 = createEmptyDV(isV2);
+ index1.delete(1);
+ index1.delete(10);
+ index1.delete(100);
+ deleteMap.put("file1.parquet", index1);
+
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(deleteMap);
+ assertThat(indexFiles.size()).isEqualTo(1);
+
+ IndexFileMeta indexFileMeta = indexFiles.get(0);
+ DeletionVectorMeta deletionVectorMeta =
+ indexFileMeta.deletionVectorMetas().get("file1.parquet");
+
+ DeletionFile deletionFile =
+ new DeletionFile(
+
pathFactory.toPath(indexFileMeta.fileName()).toString(),
+ deletionVectorMeta.offset(),
+ deletionVectorMeta.length(),
+ deletionVectorMeta.cardinality());
+
+ // test DeletionVector#read()
+ DeletionVector dv = DeletionVector.read(LocalFileIO.create(),
deletionFile);
+ assertThat(dv.isDeleted(1)).isTrue();
+ assertThat(dv.isDeleted(10)).isTrue();
+ assertThat(dv.isDeleted(100)).isTrue();
+ }
+
private DeletionVector createEmptyDV(boolean isV2) {
if (isV2) {
return new Bitmap64DeletionVector();
@@ -275,7 +313,7 @@ public class DeletionVectorsIndexFileTest {
LocalFileIO.create(),
pathFactory,
targetSizePerIndexFile,
- DeletionVectorsIndexFile.VERSION_ID_V2);
+ Bitmap64DeletionVector.VERSION);
} else {
return new DeletionVectorsIndexFile(
LocalFileIO.create(), pathFactory, targetSizePerIndexFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index 21ff27ae99..d2684003d3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.deletionvectors;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.compact.CompactDeletionFile;
@@ -32,12 +33,13 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileIOUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,17 +49,16 @@ import static org.assertj.core.api.Assertions.assertThat;
public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase {
private IndexFileHandler fileHandler;
- @BeforeEach
- public void beforeEach() throws Exception {
- fileHandler = table.store().newIndexFileHandler();
- }
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void test0(int dvVersion) {
+ initIndexHandler(dvVersion);
- @Test
- public void test0() {
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
DeletionVectorsMaintainer dvMaintainer =
factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0);
+ assertThat(dvMaintainer.dvWriteVersion()).isEqualTo(dvVersion);
dvMaintainer.notifyNewDeletion("f1", 1);
dvMaintainer.notifyNewDeletion("f2", 2);
@@ -76,17 +77,21 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
assertThat(deletionVectors.containsKey("f3")).isFalse();
}
- @Test
- public void test1() {
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void test1(int dvVersion) {
+ initIndexHandler(dvVersion);
+
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
DeletionVectorsMaintainer dvMaintainer = factory.create();
- BitmapDeletionVector deletionVector1 = new BitmapDeletionVector();
+ DeletionVector deletionVector1 = createDeletionVector(dvVersion);
deletionVector1.delete(1);
deletionVector1.delete(3);
deletionVector1.delete(5);
dvMaintainer.notifyNewDeletion("f1", deletionVector1);
+ assertThat(dvMaintainer.dvWriteVersion()).isEqualTo(dvVersion);
List<IndexFileMeta> fileMetas1 =
dvMaintainer.writeDeletionVectorsIndex();
assertThat(fileMetas1.size()).isEqualTo(1);
@@ -130,8 +135,11 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
assertThat(deletionVector3.isDeleted(2)).isTrue();
}
- @Test
- public void testCompactDeletion() throws IOException {
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testCompactDeletion(int dvVersion) throws IOException {
+ initIndexHandler(dvVersion);
+
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
DeletionVectorsMaintainer dvMaintainer =
@@ -169,4 +177,17 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
deletionFile4.getOrCompute();
assertThat(indexDir.listFiles()).hasSize(1);
}
+
+ private DeletionVector createDeletionVector(int dvVersion) {
+ return dvVersion == 2 ? new Bitmap64DeletionVector() : new
BitmapDeletionVector();
+ }
+
+ private void initIndexHandler(int dvVersion) {
+ Map<String, String> options = new HashMap<>();
+
+ options.put(CoreOptions.DELETION_VECTOR_VERSION.key(),
String.valueOf(dvVersion));
+
+ table = table.copy(options);
+ fileHandler = table.store().newIndexFileHandler();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index e9ce57f30a..7a00270f66 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.deletionvectors.append;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.TestAppendFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
@@ -31,8 +32,9 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.PathFactory;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@@ -46,9 +48,12 @@ class AppendDeletionFileMaintainerTest {
@TempDir java.nio.file.Path tempDir;
- @Test
- public void test() throws Exception {
- TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void test(int dvVersion) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.DELETION_VECTOR_VERSION.key(),
String.valueOf(dvVersion));
+ TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, options);
Map<String, List<Integer>> dvs = new HashMap<>();
dvs.put("f1", Arrays.asList(1, 3, 5));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 561a144c5c..d6bfbb2205 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -62,6 +62,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -870,9 +871,12 @@ public class FileStoreCommitTest {
fileStoreCommit.close();
}
- @Test
- public void testDVIndexFiles() throws Exception {
- TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testDVIndexFiles(int dvVersion) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.DELETION_VECTOR_VERSION.key(),
String.valueOf(dvVersion));
+ TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, options);
// commit 1
CommitMessageImpl commitMessage1 =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index cc9ae81b4a..bc4567f679 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -589,8 +590,12 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testCountStarAppendWithDv() {
+ int dvVersion = ThreadLocalRandom.current().nextInt(1, 3);
sql(
- "CREATE TABLE count_append_dv (f0 INT, f1 STRING) WITH
('deletion-vectors.enabled' = 'true')");
+ String.format(
+ "CREATE TABLE count_append_dv (f0 INT, f1 STRING) WITH
('deletion-vectors.enabled' = 'true', "
+ + "'deletion-vectors.version' = '%s') ",
+ dvVersion));
sql("INSERT INTO count_append_dv VALUES (1, 'a'), (2, 'b')");
String sql = "SELECT COUNT(*) FROM count_append_dv";
@@ -612,10 +617,14 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testCountStarPKDv() {
+ int dvVersion = ThreadLocalRandom.current().nextInt(1, 3);
sql(
- "CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING) WITH ("
- + "'file.format' = 'avro', "
- + "'deletion-vectors.enabled' = 'true')");
+ String.format(
+ "CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT
ENFORCED, f1 STRING) WITH ("
+ + "'file.format' = 'avro', "
+ + "'deletion-vectors.enabled' = 'true', "
+ + "'deletion-vectors.version' = '%s')",
+ dvVersion));
sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4,
'd')");
sql("INSERT INTO count_pk_dv VALUES (1, 'e')");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 3be5071e7a..a0edc4e9d7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -38,6 +38,7 @@ import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
@@ -1139,9 +1140,12 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
@Test
public void testIndexesTable() {
+ int dvVersion = ThreadLocalRandom.current().nextInt(1, 3);
sql(
- "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt,
a) NOT ENFORCED)"
- + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true')");
+ String.format(
+ "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY
KEY (pt, a) NOT ENFORCED)"
+ + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true', 'deletion-vectors.version' = '%s')",
+ dvVersion));
sql(
"INSERT INTO T VALUES ('2024-10-01', 1,
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01',
3, 'c_new1')");
@@ -1164,7 +1168,8 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
assertThat(row.getField(1)).isEqualTo(0);
assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
- assertThat(row.getField(4)).isEqualTo(33L);
+ // dv version 1 and 2 have different file size
+ assertThat(row.getField(4)).isIn(33L, 45L);
assertThat(row.getField(5)).isEqualTo(1L);
assertThat(row.getField(6)).isNotNull();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index ad29709f5a..f388a41228 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -22,17 +22,34 @@ import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.stream.Stream;
+
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
/** ITCase for deletion vector table. */
public class DeletionVectorITCase extends CatalogITCaseBase {
+ private static Stream<Arguments> parameters1() {
+ // parameters: changelogProducer, dvVersion
+ return Stream.of(
+ Arguments.of("none", 1),
+ Arguments.of("none", 2),
+ Arguments.of("lookup", 1),
+ Arguments.of("lookup", 2));
+ }
+
+ private static Stream<Arguments> parameters2() {
+ // parameters: changelogProducer, dvVersion
+ return Stream.of(Arguments.of("input", 1), Arguments.of("input", 2));
+ }
+
@ParameterizedTest
- @ValueSource(strings = {"input"})
+ @MethodSource("parameters2")
public void testStreamingReadDVTableWhenChangelogProducerIsInput(String
changelogProducer)
throws Exception {
sql(
@@ -79,13 +96,14 @@ public class DeletionVectorITCase extends CatalogITCaseBase
{
}
@ParameterizedTest
- @ValueSource(strings = {"none", "lookup"})
- public void testStreamingReadDVTable(String changelogProducer) throws
Exception {
+ @MethodSource("parameters1")
+ public void testStreamingReadDVTable(String changelogProducer, int
dvVersion) throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name
STRING) "
- + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s')",
- changelogProducer));
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
+ + "'deletion-vectors.version' = '%s')",
+ changelogProducer, dvVersion));
sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4,
'4')");
@@ -131,13 +149,14 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
}
@ParameterizedTest
- @ValueSource(strings = {"none", "lookup"})
- public void testBatchReadDVTable(String changelogProducer) {
+ @MethodSource("parameters1")
+ public void testBatchReadDVTable(String changelogProducer, int dvVersion) {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name
STRING) "
- + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s')",
- changelogProducer));
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
+ + "'deletion-vectors.version' = '%s')",
+ changelogProducer, dvVersion));
sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4,
'4')");
@@ -163,14 +182,15 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
}
@ParameterizedTest
- @ValueSource(strings = {"none", "lookup"})
- public void testDVTableWithAggregationMergeEngine(String
changelogProducer) throws Exception {
+ @MethodSource("parameters1")
+ public void testDVTableWithAggregationMergeEngine(String
changelogProducer, int dvVersion)
+ throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v
INT) "
- + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', 'deletion-vectors.version' = '%s', "
+ "'merge-engine'='aggregation',
'fields.v.aggregate-function'='sum')",
- changelogProducer));
+ changelogProducer, dvVersion));
sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)");
@@ -203,14 +223,15 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
}
@ParameterizedTest
- @ValueSource(strings = {"none", "lookup"})
- public void testDVTableWithPartialUpdateMergeEngine(String
changelogProducer) throws Exception {
+ @MethodSource("parameters1")
+ public void testDVTableWithPartialUpdateMergeEngine(String
changelogProducer, int dvVersion)
+ throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1
STRING, v2 STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
- + "'merge-engine'='partial-update')",
- changelogProducer));
+ + "'deletion-vectors.version' = '%s',
'merge-engine'='partial-update')",
+ changelogProducer, dvVersion));
sql(
"INSERT INTO T VALUES (1, '111111111', '1'), (2, '2',
CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')");
@@ -248,13 +269,14 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
}
@ParameterizedTest
- @ValueSource(strings = {"none", "lookup"})
- public void testBatchReadDVTableWithSequenceField(String
changelogProducer) {
+ @MethodSource("parameters1")
+ public void testBatchReadDVTableWithSequenceField(String
changelogProducer, int dvVersion) {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED,
sequence INT, name STRING) "
- + "WITH ('deletion-vectors.enabled' = 'true',
'sequence.field' = 'sequence', 'changelog-producer' = '%s')",
- changelogProducer));
+ + "WITH ('deletion-vectors.enabled' = 'true',
'sequence.field' = 'sequence', 'changelog-producer' = '%s', "
+ + "'deletion-vectors.version' = '%s')",
+ changelogProducer, dvVersion));
sql("INSERT INTO T VALUES (1, 1, '1'), (2, 1, '2')");
sql("INSERT INTO T VALUES (1, 2, '1_1'), (2, 2, '2_1')");
@@ -264,11 +286,15 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(1, 3, "1_2"), Row.of(2, 2,
"2_1"));
}
- @Test
- public void testReadTagWithDv() {
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testReadTagWithDv(int dvVersion) {
sql(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING)
WITH ("
+ "'deletion-vectors.enabled' = 'true', "
+ + "'deletion-vectors.version' = '"
+ + dvVersion
+ + "', "
+ "'snapshot.num-retained.min' = '1', "
+ "'snapshot.num-retained.max' = '1')");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 117afd1213..0c6bbebec3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -968,7 +968,8 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
enableFailure,
"'bucket' = '4',"
+ String.format(
- "'deletion-vectors.enabled' = '%s'",
enableDeletionVectors));
+ "'deletion-vectors.enabled' = '%s',
'deletion-vectors.version' = '%s'",
+ enableDeletionVectors, random.nextInt(1, 3)));
// changelog is produced by Flink normalize operator
checkChangelogTestResult(numProducers);
@@ -1014,10 +1015,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "'changelog-producer' = 'lookup', "
+ "'lookup-wait' = '%s', "
+ "'deletion-vectors.enabled' = '%s', "
+ + "'deletion-vectors.version' = '%s', "
+ "'precommit-compact' = '%s'",
random.nextBoolean() ? "4mb" : "8mb",
random.nextBoolean(),
enableDeletionVectors,
+ random.nextInt(1, 3),
random.nextBoolean()));
// sleep for a random amount of time to check
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 5fbef42811..20fa2e193e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
-import org.apache.paimon.deletionvectors.BitmapDeletionVector
+import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector,
BitmapDeletionVector}
import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, EqualTo,
Filter}
+import org.apache.spark.sql.sources._
import java.net.URI
import java.util.Collections
@@ -252,13 +252,15 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
val my_table = table
val location = my_table.location
+ val dvWriteVersion = my_table.coreOptions().deletionVectorVersion()
dataWithMetadataColumns
.select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
.as[(String, Long)]
.groupByKey(_._1)
.mapGroups {
(filePath, iter) =>
- val dv = new BitmapDeletionVector()
+ val dv =
+ if (dvWriteVersion == 2) new Bitmap64DeletionVector() else new
BitmapDeletionVector()
while (iter.hasNext) {
dv.delete(iter.next()._2)
}
@@ -274,7 +276,8 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
relativeBucketPath,
SerializationUtils.serializeBinaryRow(partition),
bucket,
- Seq((new Path(filePath).getName, dv.serializeToBytes()))
+ Seq((new Path(filePath).getName, dv.serializeToBytes())),
+ dvWriteVersion
)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 32eb66d6cd..f669392761 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -290,6 +290,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
var dvIndexFileMaintainer: BaseAppendDeleteFileMaintainer = null
while (iter.hasNext) {
val sdv: SparkDeletionVectors = iter.next()
+ val dvWriteVersion = sdv.dvWriteVersion
if (dvIndexFileMaintainer == null) {
val partition =
SerializationUtils.deserializeBinaryRow(sdv.partition)
dvIndexFileMaintainer = if (bucketMode == BUCKET_UNAWARE) {
@@ -310,7 +311,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
case (dataFileName, dv) =>
dvIndexFileMaintainer.notifyNewDeletionVector(
dataFileName,
- DeletionVector.deserializeFromBytes(dv))
+ DeletionVector.deserializeFromBytes(dv, dvWriteVersion))
}
}
val indexEntries = dvIndexFileMaintainer.persist()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
index 5a76030fd4..4d7790c38f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
@@ -32,7 +32,8 @@ case class SparkDeletionVectors(
partitionAndBucket: String,
partition: Array[Byte],
bucket: Int,
- dataFileAndDeletionVector: Seq[(String, Array[Byte])]
+ dataFileAndDeletionVector: Seq[(String, Array[Byte])],
+ dvWriteVersion: Int
) {
def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] =
{
val prefix = fileStorePathFactory
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 015d071e18..f333774756 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -106,10 +106,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
.toDF("a", "b", "c")
.createOrReplaceTempView("source")
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE target (a INT, b INT, c STRING)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' =
'$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
spark.sql(
"INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
@@ -156,10 +157,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.version' = '$dvVersion',
| 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
@@ -227,11 +230,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, pt STRING)
|PARTITIONED BY(pt)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' =
'$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
val table = loadTable("T")
@@ -314,10 +318,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' =
'$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
val table = loadTable("T")
@@ -369,11 +374,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, pt STRING)
|PARTITIONED BY(pt)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' =
'$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
val table = loadTable("T")
@@ -440,13 +446,15 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
test("Paimon deletionVector: deletion vector write verification") {
withTable("T") {
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'bucket' = '1',
| 'primary-key' = 'id',
| 'file.format' = 'parquet',
- | 'deletion-vectors.enabled' = 'true'
+ | 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.version' = '$dvVersion'
|)
|""".stripMargin)
val table = loadTable("T")
@@ -508,11 +516,13 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val dvTbl = "deletion_vector_tbl"
val resultTbl = "result_tbl"
spark.sql(s"drop table if exists $dvTbl")
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE $dvTbl (id INT, name STRING, pt STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id, pt',
| 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.version' = '$dvVersion',
| 'bucket' = '$bucket',
| 'changelog-producer' = '$changelogProducer',
| 'file.format' = '$format',
@@ -526,7 +536,8 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
|CREATE TABLE $resultTbl (id INT, name STRING, pt STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id, pt',
- | 'deletion-vectors.enabled' = 'false'
+ | 'deletion-vectors.enabled' = 'false',
+ | 'deletion-vectors.version' = '$dvVersion'
|)
|PARTITIONED BY (pt)
|""".stripMargin)
@@ -591,11 +602,13 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
test("Paimon deletionVector: select with format filter push down") {
val format = Random.shuffle(Seq("parquet", "orc", "avro")).head
val blockSize = Random.nextInt(10240) + 1
+ val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id',
| 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.version' = '$dvVersion',
| 'file.format' = '$format',
| 'file.block-size' = '${blockSize}b',
| 'bucket' = '1'
@@ -631,10 +644,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
}
test("Paimon deletionVector: get cardinality") {
+ val dvVersion = Random.nextInt(2) + 1
sql(s"""
|CREATE TABLE T (id INT)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.version' = '$dvVersion',
| 'bucket-key' = 'id',
| 'bucket' = '1'
|)
@@ -652,10 +667,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
}
test("Paimon deletionVector: delete from non-pk table with data file path") {
+ val dvVersion = Random.nextInt(2) + 1
sql(s"""
|CREATE TABLE T (id INT)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.version' = '$dvVersion',
| 'bucket-key' = 'id',
| 'bucket' = '1',
| 'data-file.path-directory' = 'data'