This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eac27980ee [core] Rename 'deletion-vectors.version' to
'deletion-vectors.bitmap64' (#5552)
eac27980ee is described below
commit eac27980ee298abb5a6f0fd1b850b03078e5df10
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 29 12:56:29 2025 +0800
[core] Rename 'deletion-vectors.version' to 'deletion-vectors.bitmap64'
(#5552)
---
.../shortcodes/generated/core_configuration.html | 12 +-
docs/static/rest-catalog-open-api.yaml | 25 ++++
.../main/java/org/apache/paimon/CoreOptions.java | 14 +--
.../java/org/apache/paimon/AbstractFileStore.java | 2 +-
.../deletionvectors/Bitmap64DeletionVector.java | 26 +---
.../deletionvectors/BitmapDeletionVector.java | 37 +++---
.../paimon/deletionvectors/DeletionVector.java | 137 +++++++++++----------
.../DeletionVectorIndexFileWriter.java | 65 ++--------
.../deletionvectors/DeletionVectorsIndexFile.java | 101 +++------------
.../deletionvectors/DeletionVectorsMaintainer.java | 16 +--
.../apache/paimon/index/DeletionVectorMeta.java | 4 +-
.../paimon/deletionvectors/DeletionVectorTest.java | 5 +-
.../DeletionVectorsIndexFileTest.java | 62 ++++------
.../DeletionVectorsMaintainerTest.java | 32 ++---
.../append/AppendDeletionFileMaintainerTest.java | 6 +-
.../paimon/operation/FileStoreCommitTest.java | 6 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 10 +-
.../apache/paimon/flink/CatalogTableITCase.java | 5 +-
.../apache/paimon/flink/DeletionVectorITCase.java | 52 ++++----
.../flink/PrimaryKeyFileStoreTableITCase.java | 8 +-
.../paimon/spark/commands/PaimonCommand.scala | 16 +--
.../paimon/spark/commands/PaimonSparkWriter.scala | 17 ++-
...tionVectors.scala => SparkDeletionVector.scala} | 37 +++---
.../spark/commands/UpdatePaimonTableCommand.scala | 2 +-
.../paimon/spark/sql/DeletionVectorTest.scala | 59 +++++----
25 files changed, 315 insertions(+), 441 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b25d1e8769..f3adbf8464 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -297,16 +297,16 @@ under the License.
<td>The target size of deletion vector index file.</td>
</tr>
<tr>
- <td><h5>deletion-vectors.enabled</h5></td>
+ <td><h5>deletion-vectors.bitmap64</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Whether to enable deletion vectors mode. In this mode, index
files containing deletion vectors are generated when data is written, which
marks the data for deletion. During read operations, by applying these index
files, merging can be avoided.</td>
+ <td>Enable 64 bit bitmap implementation. Note that only 64 bit
bitmap implementation is compatible with Iceberg.</td>
</tr>
<tr>
- <td><h5>deletion-vectors.version</h5></td>
- <td style="word-wrap: break-word;">1</td>
- <td>Integer</td>
- <td>The version of deletion vector, currently support v1 and v2,
default version is 1.</td>
+ <td><h5>deletion-vectors.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable deletion vectors mode. In this mode, index
files containing deletion vectors are generated when data is written, which
marks the data for deletion. During read operations, by applying these index
files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index c6793651b4..669eb4875e 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -232,6 +232,11 @@ paths:
in: query
schema:
type: string
+ - name: tableNamePattern
+ description: A sql LIKE pattern (% and _) for table names.
+ in: query
+ schema:
+ type: string
responses:
"200":
description: OK
@@ -305,6 +310,11 @@ paths:
in: query
schema:
type: string
+ - name: tableNamePattern
+ description: A sql LIKE pattern (% and _) for table names.
+ in: query
+ schema:
+ type: string
responses:
"200":
description: OK
@@ -658,6 +668,11 @@ paths:
in: query
schema:
type: string
+ - name: partitionNamePattern
+ description: A sql LIKE pattern (% and _) for partition names.
+ in: query
+ schema:
+ type: string
responses:
"200":
description: OK
@@ -896,6 +911,11 @@ paths:
in: query
schema:
type: string
+ - name: viewNamePattern
+ description: A sql LIKE pattern (% and _) for view names.
+ in: query
+ schema:
+ type: string
responses:
"200":
description: OK
@@ -969,6 +989,11 @@ paths:
in: query
schema:
type: string
+ - name: viewNamePattern
+ description: A sql LIKE pattern (% and _) for view names.
+ in: query
+ schema:
+ type: string
responses:
"200":
description: OK
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0fc56757ac..adfc54f67e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1510,12 +1510,12 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.ofMebiBytes(2))
.withDescription("The target size of deletion vector index
file.");
- public static final ConfigOption<Integer> DELETION_VECTOR_VERSION =
- key("deletion-vectors.version")
- .intType()
- .defaultValue(1)
+ public static final ConfigOption<Boolean> DELETION_VECTOR_BITMAP64 =
+ key("deletion-vectors.bitmap64")
+ .booleanType()
+ .defaultValue(false)
.withDescription(
- "The version of deletion vector, currently support
v1 and v2, default version is 1.");
+ "Enable 64 bit bitmap implementation. Note that
only 64 bit bitmap implementation is compatible with Iceberg.");
public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG
=
key("delete.force-produce-changelog")
@@ -2629,8 +2629,8 @@ public class CoreOptions implements Serializable {
return options.get(DELETION_VECTOR_INDEX_FILE_TARGET_SIZE);
}
- public int deletionVectorVersion() {
- return options.get(DELETION_VECTOR_VERSION);
+ public boolean deletionVectorBitmap64() {
+ return options.get(DELETION_VECTOR_BITMAP64);
}
public FileIndexOptions indexColumnsOptions() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 9a326f00a3..c0fb464092 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -243,7 +243,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
bucketMode() == BucketMode.BUCKET_UNAWARE
? options.deletionVectorIndexFileTargetSize()
: MemorySize.ofBytes(Long.MAX_VALUE),
- options.deletionVectorVersion()));
+ options.deletionVectorBitmap64()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
index e965d8bde5..fdd6955638 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
@@ -22,6 +22,8 @@ import org.apache.paimon.utils.OptimizedRoaringBitmap64;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RoaringBitmap32;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
@@ -35,8 +37,6 @@ import java.util.zip.CRC32;
*/
public class Bitmap64DeletionVector implements DeletionVector {
- public static final int VERSION = 2;
-
public static final int MAGIC_NUMBER = 1681511377;
public static final int LENGTH_SIZE_BYTES = 4;
public static final int CRC_SIZE_BYTES = 4;
@@ -90,12 +90,7 @@ public class Bitmap64DeletionVector implements
DeletionVector {
}
@Override
- public int version() {
- return VERSION;
- }
-
- @Override
- public byte[] serializeToBytes() {
+ public int serializeTo(DataOutputStream out) throws IOException {
roaringBitmap.runLengthEncode(); // run-length encode the bitmap
before serializing
int bitmapDataLength = computeBitmapDataLength(roaringBitmap); //
magic bytes + bitmap
byte[] bytes = new byte[LENGTH_SIZE_BYTES + bitmapDataLength +
CRC_SIZE_BYTES];
@@ -106,18 +101,8 @@ public class Bitmap64DeletionVector implements
DeletionVector {
int crc = computeChecksum(bytes, bitmapDataLength);
buffer.putInt(crcOffset, crc);
buffer.rewind();
- return bytes;
- }
-
- public static DeletionVector deserializeFromBytes(byte[] bytes) {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- int bitmapDataLength = readBitmapDataLength(buffer, bytes.length);
- OptimizedRoaringBitmap64 bitmap = deserializeBitmap(bytes,
bitmapDataLength);
- int crc = computeChecksum(bytes, bitmapDataLength);
- int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
- int expectedCrc = buffer.getInt(crcOffset);
- Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC");
- return new Bitmap64DeletionVector(bitmap);
+ out.write(bytes);
+ return bytes.length;
}
public static DeletionVector deserializeFromBitmapDataBytes(byte[] bytes) {
@@ -182,7 +167,6 @@ public class Bitmap64DeletionVector implements
DeletionVector {
protected static int toLittleEndianInt(int bigEndianInt) {
byte[] bytes = ByteBuffer.allocate(4).putInt(bigEndianInt).array();
-
return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
index 37e420c142..40681a7772 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.zip.CRC32;
/**
* A {@link DeletionVector} based on {@link RoaringBitmap32}, it only supports
files with row count
@@ -32,8 +33,6 @@ import java.util.Objects;
*/
public class BitmapDeletionVector implements DeletionVector {
- public static final int VERSION = 1;
-
public static final int MAGIC_NUMBER = 1581511376;
public static final int MAGIC_NUMBER_SIZE_BYTES = 4;
@@ -85,36 +84,22 @@ public class BitmapDeletionVector implements DeletionVector
{
}
@Override
- public int version() {
- return VERSION;
- }
-
- @Override
- public byte[] serializeToBytes() {
+ public int serializeTo(DataOutputStream out) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
dos.writeInt(MAGIC_NUMBER);
roaringBitmap.serialize(dos);
- return bos.toByteArray();
+ byte[] data = bos.toByteArray();
+ int size = data.length;
+ out.writeInt(size);
+ out.write(data);
+ out.writeInt(calculateChecksum(data));
+ return size;
} catch (Exception e) {
throw new RuntimeException("Unable to serialize deletion vector",
e);
}
}
- public static DeletionVector deserializeFromBytes(byte[] bytes) {
- try {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- int magicNum = buffer.getInt();
- if (magicNum == MAGIC_NUMBER) {
- return deserializeFromByteBuffer(buffer);
- } else {
- throw new RuntimeException("Invalid magic number: " +
magicNum);
- }
- } catch (IOException e) {
- throw new RuntimeException("Unable to deserialize deletion
vector", e);
- }
- }
-
/**
* Note: the result is read only, do not call any modify operation outside.
*
@@ -153,4 +138,10 @@ public class BitmapDeletionVector implements
DeletionVector {
public int hashCode() {
return Objects.hashCode(roaringBitmap);
}
+
+ public static int calculateChecksum(byte[] bytes) {
+ CRC32 crc = new CRC32();
+ crc.update(bytes);
+ return (int) crc.getValue();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
index 6f4a399da4..7766aba992 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -26,12 +26,17 @@ import org.apache.paimon.table.source.DeletionFile;
import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
+import static
org.apache.paimon.deletionvectors.Bitmap64DeletionVector.toLittleEndianInt;
+
/**
* The DeletionVector can efficiently record the positions of rows that are
deleted in a file, which
* can then be used to filter out deleted rows when processing the file.
@@ -85,91 +90,66 @@ public interface DeletionVector {
/** @return the number of distinct integers added to the DeletionVector. */
long getCardinality();
- /** @return the version of the deletion vector. */
- int version();
-
- /**
- * Serializes the deletion vector to a byte array for storage or
transmission.
- *
- * @return A byte array representing the serialized deletion vector.
- */
- byte[] serializeToBytes();
-
- /**
- * Deserializes a deletion vector from a byte array.
- *
- * @param bytes The byte array containing the serialized deletion vector.
- * @return A DeletionVector instance that represents the deserialized data.
- */
- static DeletionVector deserializeFromBytes(byte[] bytes, int version) {
- if (version == BitmapDeletionVector.VERSION) {
- return BitmapDeletionVector.deserializeFromBytes(bytes);
- } else if (version == Bitmap64DeletionVector.VERSION) {
- return Bitmap64DeletionVector.deserializeFromBytes(bytes);
- } else {
- throw new RuntimeException("Invalid deletion vector version: " +
version);
- }
- }
+ /** Serializes the deletion vector. */
+ int serializeTo(DataOutputStream out) throws IOException;
static DeletionVector read(FileIO fileIO, DeletionFile deletionFile)
throws IOException {
Path path = new Path(deletionFile.path());
try (SeekableInputStream input = fileIO.newInputStream(path)) {
input.seek(deletionFile.offset());
DataInputStream dis = new DataInputStream(input);
- // read bitmap length
- int bitmapLength = dis.readInt();
- // read magic number
- int magicNumber = dis.readInt();
- // v2 dv serializes magic number in little endian
- int magicNumberInLittleEndian =
Bitmap64DeletionVector.toLittleEndianInt(magicNumber);
-
- if (magicNumber == BitmapDeletionVector.MAGIC_NUMBER) {
- if (bitmapLength != deletionFile.length()) {
- throw new RuntimeException(
- "Size not match, actual size: "
- + bitmapLength
- + ", expected size: "
- + deletionFile.length()
- + ", file path: "
- + path);
- }
+ return read(dis, deletionFile.length());
+ }
+ }
+
+ static DeletionVector read(DataInputStream dis, @Nullable Long length)
throws IOException {
+ // read bitmap length
+ int bitmapLength = dis.readInt();
+ // read magic number
+ int magicNumber = dis.readInt();
- // magic number has been read
- byte[] bytes =
- new byte[bitmapLength -
BitmapDeletionVector.MAGIC_NUMBER_SIZE_BYTES];
- dis.readFully(bytes);
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- return BitmapDeletionVector.deserializeFromByteBuffer(buffer);
- } else if (magicNumberInLittleEndian ==
Bitmap64DeletionVector.MAGIC_NUMBER) {
+ if (magicNumber == BitmapDeletionVector.MAGIC_NUMBER) {
+ if (length != null && bitmapLength != length) {
+ throw new RuntimeException(
+ "Size not match, actual size: "
+ + bitmapLength
+ + ", expected size: "
+ + length);
+ }
+
+ // magic number has been read
+ byte[] bytes = new byte[bitmapLength -
BitmapDeletionVector.MAGIC_NUMBER_SIZE_BYTES];
+ dis.readFully(bytes);
+ dis.skipBytes(4); // skip crc
+ return
BitmapDeletionVector.deserializeFromByteBuffer(ByteBuffer.wrap(bytes));
+ } else if (toLittleEndianInt(magicNumber) ==
Bitmap64DeletionVector.MAGIC_NUMBER) {
+ if (length != null) {
long expectedBitmapLength =
- deletionFile.length()
+ length
- Bitmap64DeletionVector.LENGTH_SIZE_BYTES
- Bitmap64DeletionVector.CRC_SIZE_BYTES;
-
if (bitmapLength != expectedBitmapLength) {
throw new RuntimeException(
"Size not match, actual size: "
+ bitmapLength
+ ", expected size: "
- + expectedBitmapLength
- + ", file path: "
- + path);
+ + expectedBitmapLength);
}
-
- // magic number have been read
- byte[] bytes =
- new byte[bitmapLength -
Bitmap64DeletionVector.MAGIC_NUMBER_SIZE_BYTES];
- dis.readFully(bytes);
- return
Bitmap64DeletionVector.deserializeFromBitmapDataBytes(bytes);
- } else {
- throw new RuntimeException(
- "Invalid magic number: "
- + magicNumber
- + ", v1 dv magic number: "
- + BitmapDeletionVector.MAGIC_NUMBER
- + ", v2 magic number: "
- + Bitmap64DeletionVector.MAGIC_NUMBER);
}
+
+ // magic number have been read
+ byte[] bytes = new byte[bitmapLength -
Bitmap64DeletionVector.MAGIC_NUMBER_SIZE_BYTES];
+ dis.readFully(bytes);
+ dis.skipBytes(4); // skip crc
+ return
Bitmap64DeletionVector.deserializeFromBitmapDataBytes(bytes);
+ } else {
+ throw new RuntimeException(
+ "Invalid magic number: "
+ + magicNumber
+ + ", v1 dv magic number: "
+ + BitmapDeletionVector.MAGIC_NUMBER
+ + ", v2 magic number: "
+ + Bitmap64DeletionVector.MAGIC_NUMBER);
}
}
@@ -196,6 +176,27 @@ public interface DeletionVector {
};
}
+ static byte[] serializeToBytes(DeletionVector deletionVector) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try {
+ deletionVector.serializeTo(dos);
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static DeletionVector deserializeFromBytes(byte[] bytes) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis);
+ try {
+ return read(dis, null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Interface to create {@link DeletionVector}. */
interface Factory {
Optional<DeletionVector> create(String fileName) throws IOException;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index 89e3b8ab36..72ce352c35 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -24,7 +24,6 @@ import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.PathFactory;
-import org.apache.paimon.utils.Preconditions;
import java.io.Closeable;
import java.io.DataOutputStream;
@@ -37,7 +36,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.calculateChecksum;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V1;
/** Writer for deletion vector index file. */
public class DeletionVectorIndexFileWriter {
@@ -46,18 +45,11 @@ public class DeletionVectorIndexFileWriter {
private final FileIO fileIO;
private final long targetSizeInBytes;
- private final int writeVersionId;
-
public DeletionVectorIndexFileWriter(
- FileIO fileIO,
- PathFactory pathFactory,
- MemorySize targetSizePerIndexFile,
- int versionId) {
+ FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
this.indexPathFactory = pathFactory;
this.fileIO = fileIO;
this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
-
- this.writeVersionId = versionId;
}
/**
@@ -109,63 +101,26 @@ public class DeletionVectorIndexFileWriter {
private class SingleIndexFileWriter implements Closeable {
private final Path path;
- private final DataOutputStream dataOutputStream;
+ private final DataOutputStream out;
private final LinkedHashMap<String, DeletionVectorMeta> dvMetas;
private SingleIndexFileWriter() throws IOException {
this.path = indexPathFactory.newPath();
- this.dataOutputStream = new
DataOutputStream(fileIO.newOutputStream(path, true));
- dataOutputStream.writeByte(writeVersionId);
+ this.out = new DataOutputStream(fileIO.newOutputStream(path,
true));
+ out.writeByte(VERSION_ID_V1);
this.dvMetas = new LinkedHashMap<>();
}
private long writtenSizeInBytes() {
- return dataOutputStream.size();
+ return out.size();
}
private void write(String key, DeletionVector deletionVector) throws
IOException {
- Preconditions.checkNotNull(dataOutputStream);
- if (writeVersionId == BitmapDeletionVector.VERSION) {
- Preconditions.checkArgument(
- deletionVector instanceof BitmapDeletionVector,
- "write version id is %s, but deletionVector is not an
instance of %s, actual class is %s.",
- writeVersionId,
- BitmapDeletionVector.class.getName(),
- deletionVector.getClass().getName());
-
- writeV1(key, deletionVector);
- } else if (writeVersionId == Bitmap64DeletionVector.VERSION) {
- Preconditions.checkArgument(
- deletionVector instanceof Bitmap64DeletionVector,
- "write version id is %s, but deletionVector is not an
instance of %s, actual class is %s.",
- writeVersionId,
- Bitmap64DeletionVector.class.getName(),
- deletionVector.getClass().getName());
-
- writeV2(key, deletionVector);
- }
- }
-
- private void writeV1(String key, DeletionVector deletionVector) throws
IOException {
- byte[] data = deletionVector.serializeToBytes();
- int size = data.length;
- dvMetas.put(
- key,
- new DeletionVectorMeta(
- key, dataOutputStream.size(), size,
deletionVector.getCardinality()));
- dataOutputStream.writeInt(size);
- dataOutputStream.write(data);
- dataOutputStream.writeInt(calculateChecksum(data));
- }
-
- private void writeV2(String key, DeletionVector deletionVector) throws
IOException {
- byte[] data = deletionVector.serializeToBytes();
- int size = data.length;
+ int start = out.size();
+ int length = deletionVector.serializeTo(out);
dvMetas.put(
key,
- new DeletionVectorMeta(
- key, dataOutputStream.size(), size,
deletionVector.getCardinality()));
- dataOutputStream.write(data);
+ new DeletionVectorMeta(key, start, length,
deletionVector.getCardinality()));
}
public IndexFileMeta writtenIndexFile() {
@@ -179,7 +134,7 @@ public class DeletionVectorIndexFileWriter {
@Override
public void close() throws IOException {
- dataOutputStream.close();
+ out.close();
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 62708c620c..fa9ed56417 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -36,7 +36,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.zip.CRC32;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -45,27 +44,23 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
public class DeletionVectorsIndexFile extends IndexFile {
public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
+ public static final byte VERSION_ID_V1 = 1;
- private final int writeVersionID;
private final MemorySize targetSizePerIndexFile;
-
- public DeletionVectorsIndexFile(
- FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
- this(fileIO, pathFactory, targetSizePerIndexFile,
BitmapDeletionVector.VERSION);
- }
+ private final boolean bitmap64;
public DeletionVectorsIndexFile(
FileIO fileIO,
PathFactory pathFactory,
MemorySize targetSizePerIndexFile,
- int writeVersionID) {
+ boolean bitmap64) {
super(fileIO, pathFactory);
this.targetSizePerIndexFile = targetSizePerIndexFile;
- this.writeVersionID = writeVersionID;
+ this.bitmap64 = bitmap64;
}
- public int writeVersionID() {
- return writeVersionID;
+ public boolean bitmap64() {
+ return bitmap64;
}
/**
@@ -84,12 +79,12 @@ public class DeletionVectorsIndexFile extends IndexFile {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
Path filePath = pathFactory.toPath(indexFileName);
try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
- int version = checkVersion(inputStream);
+ checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
for (DeletionVectorMeta deletionVectorMeta :
deletionVectorMetas.values()) {
deletionVectors.put(
deletionVectorMeta.dataFileName(),
- readDeletionVector(dataInputStream,
deletionVectorMeta.length(), version));
+ DeletionVector.read(dataInputStream, (long)
deletionVectorMeta.length()));
}
} catch (Exception e) {
throw new RuntimeException(
@@ -118,15 +113,14 @@ public class DeletionVectorsIndexFile extends IndexFile {
String indexFile =
dataFileToDeletionFiles.values().stream().findAny().get().path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
- int version = checkVersion(inputStream);
+ checkVersion(inputStream);
for (String dataFile : dataFileToDeletionFiles.keySet()) {
DeletionFile deletionFile =
dataFileToDeletionFiles.get(dataFile);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new
DataInputStream(inputStream);
deletionVectors.put(
- dataFile,
- readDeletionVector(dataInputStream, (int)
deletionFile.length(), version));
+ dataFile, DeletionVector.read(dataInputStream,
deletionFile.length()));
}
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from
file: " + indexFile, e);
@@ -137,11 +131,11 @@ public class DeletionVectorsIndexFile extends IndexFile {
public DeletionVector readDeletionVector(DeletionFile deletionFile) {
String indexFile = deletionFile.path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
- int version = checkVersion(inputStream);
+ checkVersion(inputStream);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new DataInputStream(inputStream);
- return readDeletionVector(dataInputStream, (int)
deletionFile.length(), version);
+ return DeletionVector.read(dataInputStream, deletionFile.length());
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from
file: " + indexFile, e);
}
@@ -163,84 +157,21 @@ public class DeletionVectorsIndexFile extends IndexFile {
try {
DeletionVectorIndexFileWriter writer =
new DeletionVectorIndexFileWriter(
- this.fileIO,
- this.pathFactory,
- this.targetSizePerIndexFile,
- writeVersionID);
+ this.fileIO, this.pathFactory,
this.targetSizePerIndexFile);
return writer.write(input);
} catch (IOException e) {
throw new RuntimeException("Failed to write deletion vectors.", e);
}
}
- private int checkVersion(InputStream in) throws IOException {
+ private void checkVersion(InputStream in) throws IOException {
int version = in.read();
- if (version != BitmapDeletionVector.VERSION && version !=
Bitmap64DeletionVector.VERSION) {
+ if (version != VERSION_ID_V1) {
throw new RuntimeException(
"Version not match, actual version: "
+ version
+ ", expected version: "
- + BitmapDeletionVector.VERSION
- + " or "
- + Bitmap64DeletionVector.VERSION);
- }
- return version;
- }
-
- private DeletionVector readDeletionVector(
- DataInputStream inputStream, int size, int readVersion) {
- if (readVersion == BitmapDeletionVector.VERSION) {
- return readV1DeletionVector(inputStream, size);
- } else if (readVersion == Bitmap64DeletionVector.VERSION) {
- return readV2DeletionVector(inputStream, size);
- } else {
- throw new RuntimeException("Unsupported DeletionVector version: "
+ readVersion);
- }
- }
-
- private DeletionVector readV1DeletionVector(DataInputStream inputStream,
int size) {
- try {
- // check size
- int actualSize = inputStream.readInt();
- if (actualSize != size) {
- throw new RuntimeException(
- "Size not match, actual size: " + actualSize + ",
expected size: " + size);
- }
-
- // read DeletionVector bytes
- byte[] bytes = new byte[size];
- inputStream.readFully(bytes);
-
- // check checksum
- int checkSum = calculateChecksum(bytes);
- int actualCheckSum = inputStream.readInt();
- if (actualCheckSum != checkSum) {
- throw new RuntimeException(
- "Checksum not match, actual checksum: "
- + actualCheckSum
- + ", expected checksum: "
- + checkSum);
- }
- return DeletionVector.deserializeFromBytes(bytes,
BitmapDeletionVector.VERSION);
- } catch (IOException e) {
- throw new UncheckedIOException("Unable to read deletion vector",
e);
+ + VERSION_ID_V1);
}
}
-
- private DeletionVector readV2DeletionVector(DataInputStream inputStream,
int size) {
- try {
- byte[] bytes = new byte[size];
- inputStream.readFully(bytes);
-
- return DeletionVector.deserializeFromBytes(bytes,
Bitmap64DeletionVector.VERSION);
- } catch (IOException e) {
- throw new UncheckedIOException("Unable to read deletion vector",
e);
- }
- }
-
- public static int calculateChecksum(byte[] bytes) {
- CRC32 crc = new CRC32();
- crc.update(bytes);
- return (int) crc.getValue();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 94d79ec6f6..3ac905e7db 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -41,25 +41,19 @@ public class DeletionVectorsMaintainer {
private final IndexFileHandler indexFileHandler;
private final Map<String, DeletionVector> deletionVectors;
- protected final int dvWriteVersion;
+ protected final boolean bitmap64;
private boolean modified;
private DeletionVectorsMaintainer(
IndexFileHandler fileHandler, Map<String, DeletionVector>
deletionVectors) {
this.indexFileHandler = fileHandler;
this.deletionVectors = deletionVectors;
- this.dvWriteVersion =
indexFileHandler.deletionVectorsIndex().writeVersionID();
+ this.bitmap64 = indexFileHandler.deletionVectorsIndex().bitmap64();
this.modified = false;
}
private DeletionVector createNewDeletionVector() {
- if (dvWriteVersion == BitmapDeletionVector.VERSION) {
- return new BitmapDeletionVector();
- } else if (dvWriteVersion == Bitmap64DeletionVector.VERSION) {
- return new Bitmap64DeletionVector();
- } else {
- throw new RuntimeException("Invalid deletion vector version: " +
dvWriteVersion);
- }
+ return bitmap64 ? new Bitmap64DeletionVector() : new
BitmapDeletionVector();
}
/**
@@ -151,8 +145,8 @@ public class DeletionVectorsMaintainer {
return deletionVectors;
}
- public int dvWriteVersion() {
- return dvWriteVersion;
+ public boolean bitmap64() {
+ return bitmap64;
}
public static Factory factory(IndexFileHandler handler) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
index 9eb38818f6..175a263b0a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
@@ -45,10 +45,10 @@ public class DeletionVectorMeta {
@Nullable private final Long cardinality;
public DeletionVectorMeta(
- String dataFileName, int start, int size, @Nullable Long
cardinality) {
+ String dataFileName, int start, int length, @Nullable Long
cardinality) {
this.dataFileName = dataFileName;
this.offset = start;
- this.length = size;
+ this.length = length;
this.cardinality = cardinality;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
index bc3e11eb4d..a33f729963 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
@@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DeletionVector}. */
public class DeletionVectorTest {
+
@Test
public void testBitmapDeletionVector() {
HashSet<Integer> toDelete = new HashSet<>();
@@ -51,7 +52,7 @@ public class DeletionVectorTest {
}
DeletionVector deserializedDeletionVector =
DeletionVector.deserializeFromBytes(
- deletionVector.serializeToBytes(),
BitmapDeletionVector.VERSION);
+ DeletionVector.serializeToBytes(deletionVector));
assertThat(deletionVector.isEmpty()).isFalse();
assertThat(deserializedDeletionVector.isEmpty()).isFalse();
@@ -88,7 +89,7 @@ public class DeletionVectorTest {
}
DeletionVector deserializedDeletionVector =
DeletionVector.deserializeFromBytes(
- deletionVector.serializeToBytes(),
Bitmap64DeletionVector.VERSION);
+ DeletionVector.serializeToBytes(deletionVector));
assertThat(deletionVector.isEmpty()).isFalse();
assertThat(deserializedDeletionVector.isEmpty()).isFalse();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index 508888e8d1..1cc32757d8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -49,24 +49,24 @@ public class DeletionVectorsIndexFileTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testReadDvIndex(boolean isV2) {
+ public void testReadDvIndex(boolean bitmap64) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- deletionVectorsIndexFile(pathFactory, isV2);
+ deletionVectorsIndexFile(pathFactory, bitmap64);
// write
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
- DeletionVector index1 = createEmptyDV(isV2);
+ DeletionVector index1 = createEmptyDV(bitmap64);
index1.delete(1);
deleteMap.put("file1.parquet", index1);
- DeletionVector index2 = createEmptyDV(isV2);
+ DeletionVector index2 = createEmptyDV(bitmap64);
index2.delete(2);
index2.delete(3);
deleteMap.put("file2.parquet", index2);
- DeletionVector index3 = createEmptyDV(isV2);
+ DeletionVector index3 = createEmptyDV(bitmap64);
index3.delete(3);
deleteMap.put("file33.parquet", index3);
@@ -90,17 +90,17 @@ public class DeletionVectorsIndexFileTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testReadDvIndexWithCopiousDv(boolean isV2) {
+ public void testReadDvIndexWithCopiousDv(boolean bitmap64) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- deletionVectorsIndexFile(pathFactory, isV2);
+ deletionVectorsIndexFile(pathFactory, bitmap64);
// write
Random random = new Random();
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
HashMap<String, Integer> deleteInteger = new HashMap<>();
for (int i = 0; i < 100000; i++) {
- DeletionVector index = createEmptyDV(isV2);
+ DeletionVector index = createEmptyDV(bitmap64);
int num = random.nextInt(1000000);
index.delete(num);
deleteMap.put(String.format("file%s.parquet", i), index);
@@ -122,17 +122,17 @@ public class DeletionVectorsIndexFileTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testReadDvIndexWithEnormousDv(boolean isV2) {
+ public void testReadDvIndexWithEnormousDv(boolean bitmap64) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- deletionVectorsIndexFile(pathFactory, isV2);
+ deletionVectorsIndexFile(pathFactory, bitmap64);
// write
Random random = new Random();
Map<String, DeletionVector> fileToDV = new HashMap<>();
Map<String, Long> fileToCardinality = new HashMap<>();
for (int i = 0; i < 5; i++) {
- DeletionVector index = createEmptyDV(isV2);
+ DeletionVector index = createEmptyDV(bitmap64);
// the size of dv index file is about 20M
for (int j = 0; j < 10000000; j++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -154,17 +154,17 @@ public class DeletionVectorsIndexFileTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testWriteDVIndexWithLimitedTargetSizePerIndexFile(boolean
isV2) {
+ public void testWriteDVIndexWithLimitedTargetSizePerIndexFile(boolean
bitmap64) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- deletionVectorsIndexFile(pathFactory, MemorySize.parse("2MB"),
isV2);
+ deletionVectorsIndexFile(pathFactory, MemorySize.parse("2MB"),
bitmap64);
// write1
Random random = new Random();
Map<String, DeletionVector> fileToDV = new HashMap<>();
Map<String, Long> fileToCardinality = new HashMap<>();
for (int i = 0; i < 5; i++) {
- DeletionVector index = createEmptyDV(isV2);
+ DeletionVector index = createEmptyDV(bitmap64);
// the size of dv index file is about 1.7M
for (int j = 0; j < 750000; j++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -186,7 +186,7 @@ public class DeletionVectorsIndexFileTest {
fileToDV.clear();
fileToCardinality.clear();
for (int i = 0; i < 10; i++) {
- DeletionVector index = createEmptyDV(isV2);
+ DeletionVector index = createEmptyDV(bitmap64);
// the size of dv index file is about 0.42M
for (int j = 0; j < 100000; j++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -260,13 +260,13 @@ public class DeletionVectorsIndexFileTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testReadDeletionFile(boolean isV2) throws IOException {
+ public void testReadDeletionFile(boolean bitmap64) throws IOException {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- deletionVectorsIndexFile(pathFactory, isV2);
+ deletionVectorsIndexFile(pathFactory, bitmap64);
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
- DeletionVector index1 = createEmptyDV(isV2);
+ DeletionVector index1 = createEmptyDV(bitmap64);
index1.delete(1);
index1.delete(10);
index1.delete(100);
@@ -293,31 +293,19 @@ public class DeletionVectorsIndexFileTest {
assertThat(dv.isDeleted(100)).isTrue();
}
- private DeletionVector createEmptyDV(boolean isV2) {
- if (isV2) {
- return new Bitmap64DeletionVector();
- }
- return new BitmapDeletionVector();
+ private DeletionVector createEmptyDV(boolean bitmap64) {
+ return bitmap64 ? new Bitmap64DeletionVector() : new
BitmapDeletionVector();
}
private DeletionVectorsIndexFile deletionVectorsIndexFile(
- PathFactory pathFactory, boolean isV2) {
-
- return deletionVectorsIndexFile(pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE), isV2);
+ PathFactory pathFactory, boolean bitmap64) {
+ return deletionVectorsIndexFile(pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE), bitmap64);
}
private DeletionVectorsIndexFile deletionVectorsIndexFile(
- PathFactory pathFactory, MemorySize targetSizePerIndexFile,
boolean isV2) {
- if (isV2) {
- return new DeletionVectorsIndexFile(
- LocalFileIO.create(),
- pathFactory,
- targetSizePerIndexFile,
- Bitmap64DeletionVector.VERSION);
- } else {
- return new DeletionVectorsIndexFile(
- LocalFileIO.create(), pathFactory, targetSizePerIndexFile);
- }
+ PathFactory pathFactory, MemorySize targetSizePerIndexFile,
boolean bitmap64) {
+ return new DeletionVectorsIndexFile(
+ LocalFileIO.create(), pathFactory, targetSizePerIndexFile,
bitmap64);
}
private PathFactory getPathFactory() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index d2684003d3..cbb44d5eb5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -50,15 +50,15 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
private IndexFileHandler fileHandler;
@ParameterizedTest
- @ValueSource(ints = {1, 2})
- public void test0(int dvVersion) {
- initIndexHandler(dvVersion);
+ @ValueSource(booleans = {true, false})
+ public void test0(boolean bitmap64) {
+ initIndexHandler(bitmap64);
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
DeletionVectorsMaintainer dvMaintainer =
factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0);
- assertThat(dvMaintainer.dvWriteVersion()).isEqualTo(dvVersion);
+ assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
dvMaintainer.notifyNewDeletion("f1", 1);
dvMaintainer.notifyNewDeletion("f2", 2);
@@ -78,20 +78,20 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
}
@ParameterizedTest
- @ValueSource(ints = {1, 2})
- public void test1(int dvVersion) {
- initIndexHandler(dvVersion);
+ @ValueSource(booleans = {true, false})
+ public void test1(boolean bitmap64) {
+ initIndexHandler(bitmap64);
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
DeletionVectorsMaintainer dvMaintainer = factory.create();
- DeletionVector deletionVector1 = createDeletionVector(dvVersion);
+ DeletionVector deletionVector1 = createDeletionVector(bitmap64);
deletionVector1.delete(1);
deletionVector1.delete(3);
deletionVector1.delete(5);
dvMaintainer.notifyNewDeletion("f1", deletionVector1);
- assertThat(dvMaintainer.dvWriteVersion()).isEqualTo(dvVersion);
+ assertThat(dvMaintainer.bitmap64()).isEqualTo(bitmap64);
List<IndexFileMeta> fileMetas1 =
dvMaintainer.writeDeletionVectorsIndex();
assertThat(fileMetas1.size()).isEqualTo(1);
@@ -136,9 +136,9 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
}
@ParameterizedTest
- @ValueSource(ints = {1, 2})
- public void testCompactDeletion(int dvVersion) throws IOException {
- initIndexHandler(dvVersion);
+ @ValueSource(booleans = {true, false})
+ public void testCompactDeletion(boolean bitmap64) throws IOException {
+ initIndexHandler(bitmap64);
DeletionVectorsMaintainer.Factory factory =
new DeletionVectorsMaintainer.Factory(fileHandler);
@@ -178,14 +178,14 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
assertThat(indexDir.listFiles()).hasSize(1);
}
- private DeletionVector createDeletionVector(int dvVersion) {
- return dvVersion == 2 ? new Bitmap64DeletionVector() : new
BitmapDeletionVector();
+ private DeletionVector createDeletionVector(boolean bitmap64) {
+ return bitmap64 ? new Bitmap64DeletionVector() : new
BitmapDeletionVector();
}
- private void initIndexHandler(int dvVersion) {
+ private void initIndexHandler(boolean bitmap64) {
Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.DELETION_VECTOR_VERSION.key(),
String.valueOf(dvVersion));
+ options.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(),
String.valueOf(bitmap64));
table = table.copy(options);
fileHandler = table.store().newIndexFileHandler();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 7a00270f66..3b81c8478e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -49,10 +49,10 @@ class AppendDeletionFileMaintainerTest {
@TempDir java.nio.file.Path tempDir;
@ParameterizedTest
- @ValueSource(ints = {1, 2})
- public void test(int dvVersion) throws Exception {
+ @ValueSource(booleans = {true, false})
+ public void test(boolean bitmap64) throws Exception {
Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.DELETION_VECTOR_VERSION.key(),
String.valueOf(dvVersion));
+ options.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(),
String.valueOf(bitmap64));
TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, options);
Map<String, List<Integer>> dvs = new HashMap<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index d6bfbb2205..8d97983a8b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -872,10 +872,10 @@ public class FileStoreCommitTest {
}
@ParameterizedTest
- @ValueSource(ints = {1, 2})
- public void testDVIndexFiles(int dvVersion) throws Exception {
+ @ValueSource(booleans = {true, false})
+ public void testDVIndexFiles(boolean bitmap64) throws Exception {
Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.DELETION_VECTOR_VERSION.key(),
String.valueOf(dvVersion));
+ options.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(),
String.valueOf(bitmap64));
TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, options);
// commit 1
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index bc4567f679..5f9773a0cf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -590,12 +590,11 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testCountStarAppendWithDv() {
- int dvVersion = ThreadLocalRandom.current().nextInt(1, 3);
sql(
String.format(
"CREATE TABLE count_append_dv (f0 INT, f1 STRING) WITH
('deletion-vectors.enabled' = 'true', "
- + "'deletion-vectors.version' = '%s') ",
- dvVersion));
+ + "'deletion-vectors.bitmap64' = '%s') ",
+ ThreadLocalRandom.current().nextBoolean()));
sql("INSERT INTO count_append_dv VALUES (1, 'a'), (2, 'b')");
String sql = "SELECT COUNT(*) FROM count_append_dv";
@@ -617,14 +616,13 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testCountStarPKDv() {
- int dvVersion = ThreadLocalRandom.current().nextInt(1, 3);
sql(
String.format(
"CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT
ENFORCED, f1 STRING) WITH ("
+ "'file.format' = 'avro', "
+ "'deletion-vectors.enabled' = 'true', "
- + "'deletion-vectors.version' = '%s')",
- dvVersion));
+ + "'deletion-vectors.bitmap64' = '%s')",
+ ThreadLocalRandom.current().nextBoolean()));
sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4,
'd')");
sql("INSERT INTO count_pk_dv VALUES (1, 'e')");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index a0edc4e9d7..0797322c0d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -1140,12 +1140,11 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
@Test
public void testIndexesTable() {
- int dvVersion = ThreadLocalRandom.current().nextInt(1, 3);
sql(
String.format(
"CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY
KEY (pt, a) NOT ENFORCED)"
- + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true', 'deletion-vectors.version' = '%s')",
- dvVersion));
+ + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true', 'deletion-vectors.bitmap64' = '%s')",
+ ThreadLocalRandom.current().nextBoolean()));
sql(
"INSERT INTO T VALUES ('2024-10-01', 1,
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01',
3, 'c_new1')");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index f388a41228..e51b617db2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -35,12 +35,12 @@ import static
org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
public class DeletionVectorITCase extends CatalogITCaseBase {
private static Stream<Arguments> parameters1() {
- // parameters: changelogProducer, dvVersion
+ // parameters: changelogProducer, dvBitmap64
return Stream.of(
- Arguments.of("none", 1),
- Arguments.of("none", 2),
- Arguments.of("lookup", 1),
- Arguments.of("lookup", 2));
+ Arguments.of("none", true),
+ Arguments.of("none", false),
+ Arguments.of("lookup", true),
+ Arguments.of("lookup", false));
}
private static Stream<Arguments> parameters2() {
@@ -97,13 +97,14 @@ public class DeletionVectorITCase extends CatalogITCaseBase
{
@ParameterizedTest
@MethodSource("parameters1")
- public void testStreamingReadDVTable(String changelogProducer, int
dvVersion) throws Exception {
+ public void testStreamingReadDVTable(String changelogProducer, boolean
dvBitmap64)
+ throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name
STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
- + "'deletion-vectors.version' = '%s')",
- changelogProducer, dvVersion));
+ + "'deletion-vectors.bitmap64' = '%s')",
+ changelogProducer, dvBitmap64));
sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4,
'4')");
@@ -150,13 +151,13 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
@ParameterizedTest
@MethodSource("parameters1")
- public void testBatchReadDVTable(String changelogProducer, int dvVersion) {
+ public void testBatchReadDVTable(String changelogProducer, boolean
dvBitmap64) {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name
STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
- + "'deletion-vectors.version' = '%s')",
- changelogProducer, dvVersion));
+ + "'deletion-vectors.bitmap64' = '%s')",
+ changelogProducer, dvBitmap64));
sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4,
'4')");
@@ -183,14 +184,14 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
@ParameterizedTest
@MethodSource("parameters1")
- public void testDVTableWithAggregationMergeEngine(String
changelogProducer, int dvVersion)
+ public void testDVTableWithAggregationMergeEngine(String
changelogProducer, boolean dvBitmap64)
throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v
INT) "
- + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', 'deletion-vectors.version' = '%s', "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s', "
+ "'merge-engine'='aggregation',
'fields.v.aggregate-function'='sum')",
- changelogProducer, dvVersion));
+ changelogProducer, dvBitmap64));
sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)");
@@ -224,14 +225,14 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
@ParameterizedTest
@MethodSource("parameters1")
- public void testDVTableWithPartialUpdateMergeEngine(String
changelogProducer, int dvVersion)
- throws Exception {
+ public void testDVTableWithPartialUpdateMergeEngine(
+ String changelogProducer, boolean dvBitmap64) throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1
STRING, v2 STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
- + "'deletion-vectors.version' = '%s',
'merge-engine'='partial-update')",
- changelogProducer, dvVersion));
+ + "'deletion-vectors.bitmap64' = '%s',
'merge-engine'='partial-update')",
+ changelogProducer, dvBitmap64));
sql(
"INSERT INTO T VALUES (1, '111111111', '1'), (2, '2',
CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')");
@@ -270,13 +271,14 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
@ParameterizedTest
@MethodSource("parameters1")
- public void testBatchReadDVTableWithSequenceField(String
changelogProducer, int dvVersion) {
+ public void testBatchReadDVTableWithSequenceField(
+ String changelogProducer, boolean dvBitmap64) {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED,
sequence INT, name STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true',
'sequence.field' = 'sequence', 'changelog-producer' = '%s', "
- + "'deletion-vectors.version' = '%s')",
- changelogProducer, dvVersion));
+ + "'deletion-vectors.bitmap64' = '%s')",
+ changelogProducer, dvBitmap64));
sql("INSERT INTO T VALUES (1, 1, '1'), (2, 1, '2')");
sql("INSERT INTO T VALUES (1, 2, '1_1'), (2, 2, '2_1')");
@@ -287,13 +289,13 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
}
@ParameterizedTest
- @ValueSource(ints = {1, 2})
- public void testReadTagWithDv(int dvVersion) {
+ @ValueSource(booleans = {true, false})
+ public void testReadTagWithDv(boolean dvBitmap64) {
sql(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING)
WITH ("
+ "'deletion-vectors.enabled' = 'true', "
- + "'deletion-vectors.version' = '"
- + dvVersion
+ + "'deletion-vectors.bitmap64' = '"
+ + dvBitmap64
+ "', "
+ "'snapshot.num-retained.min' = '1', "
+ "'snapshot.num-retained.max' = '1')");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 0c6bbebec3..032a94c9a9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -968,8 +968,8 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
enableFailure,
"'bucket' = '4',"
+ String.format(
- "'deletion-vectors.enabled' = '%s',
'deletion-vectors.version' = '%s'",
- enableDeletionVectors, random.nextInt(1, 3)));
+ "'deletion-vectors.enabled' = '%s',
'deletion-vectors.bitmap64' = '%s'",
+ enableDeletionVectors, random.nextBoolean()));
// changelog is produced by Flink normalize operator
checkChangelogTestResult(numProducers);
@@ -1015,12 +1015,12 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ "'changelog-producer' = 'lookup', "
+ "'lookup-wait' = '%s', "
+ "'deletion-vectors.enabled' = '%s', "
- + "'deletion-vectors.version' = '%s', "
+ + "'deletion-vectors.bitmap64' = '%s', "
+ "'precommit-compact' = '%s'",
random.nextBoolean() ? "4mb" : "8mb",
random.nextBoolean(),
enableDeletionVectors,
- random.nextInt(1, 3),
+ random.nextBoolean(),
random.nextBoolean()));
// sleep for a random amount of time to check
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 20fa2e193e..0b466c8efd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
-import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector,
BitmapDeletionVector}
+import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector,
BitmapDeletionVector, DeletionVector}
import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
@@ -226,7 +226,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
dataFilePathToMeta: Map[String, SparkDataFileMeta],
condition: Expression,
relation: DataSourceV2Relation,
- sparkSession: SparkSession): Dataset[SparkDeletionVectors] = {
+ sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
val filteredRelation = createNewScanPlan(candidateDataSplits, condition,
relation)
val dataWithMetadataColumns = createDataset(sparkSession, filteredRelation)
collectDeletionVectors(dataFilePathToMeta, dataWithMetadataColumns,
sparkSession)
@@ -235,7 +235,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
protected def collectDeletionVectors(
dataFilePathToMeta: Map[String, SparkDataFileMeta],
dataWithMetadataColumns: Dataset[Row],
- sparkSession: SparkSession): Dataset[SparkDeletionVectors] = {
+ sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
import sparkSession.implicits._
val resolver = sparkSession.sessionState.conf.resolver
@@ -252,7 +252,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
val my_table = table
val location = my_table.location
- val dvWriteVersion = my_table.coreOptions().deletionVectorVersion()
+ val dvBitmap64 = my_table.coreOptions().deletionVectorBitmap64()
dataWithMetadataColumns
.select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
.as[(String, Long)]
@@ -260,7 +260,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
.mapGroups {
(filePath, iter) =>
val dv =
- if (dvWriteVersion == 2) new Bitmap64DeletionVector() else new
BitmapDeletionVector()
+ if (dvBitmap64) new Bitmap64DeletionVector() else new
BitmapDeletionVector()
while (iter.hasNext) {
dv.delete(iter.next()._2)
}
@@ -272,12 +272,12 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
.relativeBucketPath(partition, bucket)
.toString
- SparkDeletionVectors(
+ SparkDeletionVector(
relativeBucketPath,
SerializationUtils.serializeBinaryRow(partition),
bucket,
- Seq((new Path(filePath).getName, dv.serializeToBytes())),
- dvWriteVersion
+ new Path(filePath).getName,
+ DeletionVector.serializeToBytes(dv)
)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index f669392761..a2fe487bf6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -25,6 +25,7 @@ import org.apache.paimon.crosspartition.{IndexBootstrap,
KeyPartOrRow}
import org.apache.paimon.data.serializer.InternalSerializers
import org.apache.paimon.deletionvectors.DeletionVector
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
+import org.apache.paimon.fs.Path
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
import org.apache.paimon.manifest.FileKind
@@ -278,19 +279,18 @@ case class PaimonSparkWriter(table: FileStoreTable) {
* deletion vectors; else, one index file will contains all deletion vector
with the same
* partition and bucket.
*/
- def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]):
Seq[CommitMessage] = {
+ def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]):
Seq[CommitMessage] = {
val sparkSession = deletionVectors.sparkSession
import sparkSession.implicits._
val snapshot = table.snapshotManager().latestSnapshot()
val serializedCommits = deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {
- (_, iter: Iterator[SparkDeletionVectors]) =>
+ (_, iter: Iterator[SparkDeletionVector]) =>
val indexHandler = table.store().newIndexFileHandler()
var dvIndexFileMaintainer: BaseAppendDeleteFileMaintainer = null
while (iter.hasNext) {
- val sdv: SparkDeletionVectors = iter.next()
- val dvWriteVersion = sdv.dvWriteVersion
+ val sdv: SparkDeletionVector = iter.next()
if (dvIndexFileMaintainer == null) {
val partition =
SerializationUtils.deserializeBinaryRow(sdv.partition)
dvIndexFileMaintainer = if (bucketMode == BUCKET_UNAWARE) {
@@ -307,12 +307,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {
throw new RuntimeException("can't create the dv maintainer.")
}
- sdv.dataFileAndDeletionVector.foreach {
- case (dataFileName, dv) =>
- dvIndexFileMaintainer.notifyNewDeletionVector(
- dataFileName,
- DeletionVector.deserializeFromBytes(dv, dvWriteVersion))
- }
+ dvIndexFileMaintainer.notifyNewDeletionVector(
+ sdv.dataFileName,
+ DeletionVector.deserializeFromBytes(sdv.deletionVector))
}
val indexEntries = dvIndexFileMaintainer.persist()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
similarity index 61%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
index 4d7790c38f..9fc7fdadcb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
@@ -28,44 +28,37 @@ import scala.collection.JavaConverters._
* This class will be used as Dataset's pattern type. So here use Array[Byte]
instead of BinaryRow
* or DeletionVector.
*/
-case class SparkDeletionVectors(
+case class SparkDeletionVector(
partitionAndBucket: String,
partition: Array[Byte],
bucket: Int,
- dataFileAndDeletionVector: Seq[(String, Array[Byte])],
- dvWriteVersion: Int
+ dataFileName: String,
+ deletionVector: Array[Byte]
) {
- def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] =
{
- val prefix = fileStorePathFactory
+ def relativePath(pathFactory: FileStorePathFactory): String = {
+ val prefix = pathFactory
.relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition),
bucket)
.toUri
.toString + "/"
- dataFileAndDeletionVector.map(prefix + _._1)
+ prefix + dataFileName
}
}
-object SparkDeletionVectors {
+object SparkDeletionVector {
def toDataSplit(
- sparkDeletionVectors: SparkDeletionVectors,
+ deletionVector: SparkDeletionVector,
root: Path,
pathFactory: FileStorePathFactory,
dataFilePathToMeta: Map[String, SparkDataFileMeta]): DataSplit = {
- val (dataFiles, deletionFiles, totalBuckets) = sparkDeletionVectors
- .relativePaths(pathFactory)
- .map {
- dataFile =>
- val meta = dataFilePathToMeta(dataFile)
- (meta.dataFileMeta, meta.deletionFile.orNull, meta.totalBuckets)
- }
- .unzip3
+ val meta = dataFilePathToMeta(deletionVector.relativePath(pathFactory))
DataSplit
.builder()
- .withBucketPath(root + "/" + sparkDeletionVectors.partitionAndBucket)
-
.withPartition(SerializationUtils.deserializeBinaryRow(sparkDeletionVectors.partition))
- .withBucket(sparkDeletionVectors.bucket)
- .withTotalBuckets(totalBuckets.head)
- .withDataFiles(dataFiles.toList.asJava)
- .withDataDeletionFiles(deletionFiles.toList.asJava)
+ .withBucketPath(root + "/" + deletionVector.partitionAndBucket)
+
.withPartition(SerializationUtils.deserializeBinaryRow(deletionVector.partition))
+ .withBucket(deletionVector.bucket)
+ .withTotalBuckets(meta.totalBuckets)
+ .withDataFiles(Seq(meta.dataFileMeta).asJava)
+ .withDataDeletionFiles(Seq(meta.deletionFile.orNull).asJava)
.rawConvertible(true)
.build()
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 0047de1401..74c7e122cd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -96,7 +96,7 @@ case class UpdatePaimonTableCommand(
try {
// Step3: write these updated data
val touchedDataSplits = deletionVectors.collect().map {
- SparkDeletionVectors.toDataSplit(_, root, pathFactory,
dataFilePathToMeta)
+ SparkDeletionVector.toDataSplit(_, root, pathFactory,
dataFilePathToMeta)
}
val addCommitMessage = writeOnlyUpdatedData(sparkSession,
touchedDataSplits)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f333774756..cdc4f75ed0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -106,11 +106,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
.toDF("a", "b", "c")
.createOrReplaceTempView("source")
- val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE target (a INT, b INT, c STRING)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.bitmap64' = '${Random
+ .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
spark.sql(
"INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
@@ -157,12 +157,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
- val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
- | 'deletion-vectors.version' = '$dvVersion',
+ | 'deletion-vectors.bitmap64' =
'${Random.nextBoolean()}',
| 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
@@ -230,12 +229,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
- val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, pt STRING)
|PARTITIONED BY(pt)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.bitmap64' = '${Random
+ .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
val table = loadTable("T")
@@ -318,11 +317,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
- val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.bitmap64' = '${Random
+ .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
val table = loadTable("T")
@@ -374,12 +373,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
} else {
""
}
- val dvVersion = Random.nextInt(2) + 1
spark.sql(
s"""
|CREATE TABLE T (id INT, name STRING, pt STRING)
|PARTITIONED BY(pt)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.version' = '$dvVersion', 'bucket' = '$bucket' $bucketKey)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.bitmap64' = '${Random
+ .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
|""".stripMargin)
val table = loadTable("T")
@@ -446,7 +445,6 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
test("Paimon deletionVector: deletion vector write verification") {
withTable("T") {
- val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
@@ -454,7 +452,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
| 'primary-key' = 'id',
| 'file.format' = 'parquet',
| 'deletion-vectors.enabled' = 'true',
- | 'deletion-vectors.version' = '$dvVersion'
+ | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}'
|)
|""".stripMargin)
val table = loadTable("T")
@@ -516,13 +514,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val dvTbl = "deletion_vector_tbl"
val resultTbl = "result_tbl"
spark.sql(s"drop table if exists $dvTbl")
- val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE $dvTbl (id INT, name STRING, pt STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id, pt',
| 'deletion-vectors.enabled' = 'true',
- | 'deletion-vectors.version' = '$dvVersion',
+ | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}',
| 'bucket' = '$bucket',
| 'changelog-producer' = '$changelogProducer',
| 'file.format' = '$format',
@@ -536,8 +533,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
|CREATE TABLE $resultTbl (id INT, name STRING, pt STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id, pt',
- | 'deletion-vectors.enabled' = 'false',
- | 'deletion-vectors.version' = '$dvVersion'
+ | 'deletion-vectors.enabled' = 'false'
|)
|PARTITIONED BY (pt)
|""".stripMargin)
@@ -602,13 +598,12 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
test("Paimon deletionVector: select with format filter push down") {
val format = Random.shuffle(Seq("parquet", "orc", "avro")).head
val blockSize = Random.nextInt(10240) + 1
- val dvVersion = Random.nextInt(2) + 1
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'primary-key' = 'id',
| 'deletion-vectors.enabled' = 'true',
- | 'deletion-vectors.version' = '$dvVersion',
+ | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}',
| 'file.format' = '$format',
| 'file.block-size' = '${blockSize}b',
| 'bucket' = '1'
@@ -644,12 +639,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
}
test("Paimon deletionVector: get cardinality") {
- val dvVersion = Random.nextInt(2) + 1
sql(s"""
|CREATE TABLE T (id INT)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
- | 'deletion-vectors.version' = '$dvVersion',
+ | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}',
| 'bucket-key' = 'id',
| 'bucket' = '1'
|)
@@ -667,12 +661,11 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
}
test("Paimon deletionVector: delete from non-pk table with data file path") {
- val dvVersion = Random.nextInt(2) + 1
sql(s"""
|CREATE TABLE T (id INT)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
- | 'deletion-vectors.version' = '$dvVersion',
+ | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}',
| 'bucket-key' = 'id',
| 'bucket' = '1',
| 'data-file.path-directory' = 'data'
@@ -684,6 +677,28 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
}
+ test("Paimon deletionVector: work v1 with v2") {
+ sql(s"""
+ |CREATE TABLE T (id INT)
+ |TBLPROPERTIES (
+ | 'deletion-vectors.enabled' = 'true',
+ | 'deletion-vectors.bitmap64' = 'false',
+ | 'file.format' = 'avro'
+ |)
+ |""".stripMargin)
+ // file 1
+ sql("INSERT INTO T VALUES (1), (2)")
+ // file 2
+ sql("INSERT INTO T VALUES (3), (4)")
+ // delete in file 1
+ sql("DELETE FROM T WHERE id = 1")
+ // alter to v2 deletion vectors
+ sql("ALTER TABLE T SET TBLPROPERTIES ('deletion-vectors.bitmap64' =
'true')")
+ // delete in file 2
+ sql("DELETE FROM T WHERE id = 3")
+ checkAnswer(sql("SELECT * FROM T"), Row(2) :: Row(4) :: Nil)
+ }
+
private def getPathName(path: String): String = {
new Path(path).getName
}