This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 43b2f7d007 Core: Make PositionDeleteIndex serializable (#11463)
43b2f7d007 is described below
commit 43b2f7d007c26ad79ebdf60d37ccca144db1f08f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Nov 5 08:35:42 2024 +0100
Core: Make PositionDeleteIndex serializable (#11463)
---
.../iceberg/deletes/BitmapPositionDeleteIndex.java | 124 +++++++++++++++++++++
.../iceberg/deletes/PositionDeleteIndex.java | 21 ++++
.../deletes/TestBitmapPositionDeleteIndex.java | 105 +++++++++++++++++
.../deletes/all-container-types-position-index.bin | Bin 0 -> 94 bytes
.../iceberg/deletes/empty-position-index.bin | Bin 0 -> 20 bytes
.../small-alternating-values-position-index.bin | Bin 0 -> 50 bytes
.../small-and-large-values-position-index.bin | Bin 0 -> 56 bytes
7 files changed, 250 insertions(+)
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index cfb163e837..376b391d9c 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -18,13 +18,23 @@
*/
package org.apache.iceberg.deletes;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Collection;
import java.util.List;
import java.util.function.LongConsumer;
+import java.util.zip.CRC32;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
class BitmapPositionDeleteIndex implements PositionDeleteIndex {
+ private static final int LENGTH_SIZE_BYTES = 4;
+ private static final int MAGIC_NUMBER_SIZE_BYTES = 4;
+ private static final int CRC_SIZE_BYTES = 4;
+ private static final int BITMAP_DATA_OFFSET = 4;
+ private static final int MAGIC_NUMBER = 1681511377;
+
private final RoaringPositionBitmap bitmap;
private final List<DeleteFile> deleteFiles;
@@ -43,6 +53,11 @@ class BitmapPositionDeleteIndex implements
PositionDeleteIndex {
this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) :
Lists.newArrayList();
}
+ BitmapPositionDeleteIndex(RoaringPositionBitmap bitmap, DeleteFile
deleteFile) {
+ this.bitmap = bitmap;
+ this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) :
Lists.newArrayList();
+ }
+
void merge(BitmapPositionDeleteIndex that) {
bitmap.setAll(that.bitmap);
deleteFiles.addAll(that.deleteFiles);
@@ -92,4 +107,113 @@ class BitmapPositionDeleteIndex implements
PositionDeleteIndex {
public long cardinality() {
return bitmap.cardinality();
}
+
+ /**
+ * Serializes the index using the following format:
+ *
+ * <ul>
+ * <li>The length of the magic bytes and bitmap stored as 4 bytes
(big-endian).
+ * <li>A 4-byte {@link #MAGIC_NUMBER} (little-endian).
+ * <li>The bitmap serialized using the portable Roaring spec
(little-endian).
+ * <li>A CRC-32 checksum of the magic bytes and bitmap as 4-bytes
(big-endian).
+ * </ul>
+ *
+ * Note that the length and the checksum are computed for the bitmap data,
which includes the
+ * magic bytes and bitmap for compatibility with Delta.
+ */
+ @Override
+ public ByteBuffer serialize() {
+ bitmap.runLengthEncode(); // run-length encode the bitmap before
serializing
+ int bitmapDataLength = computeBitmapDataLength(bitmap); // magic bytes +
bitmap
+ byte[] bytes = new byte[LENGTH_SIZE_BYTES + bitmapDataLength +
CRC_SIZE_BYTES];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.putInt(bitmapDataLength);
+ serializeBitmapData(bytes, bitmapDataLength, bitmap);
+ int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+ int crc = computeChecksum(bytes, bitmapDataLength);
+ buffer.putInt(crcOffset, crc);
+ buffer.rewind();
+ return buffer;
+ }
+
+ /**
+ * Deserializes the index from bytes, assuming the format described in
{@link #serialize()}.
+ *
+ * @param bytes an array containing the serialized index
+ * @param deleteFile the DV file
+ * @return the deserialized index
+ */
+ public static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile
deleteFile) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ int bitmapDataLength = readBitmapDataLength(buffer, deleteFile);
+ RoaringPositionBitmap bitmap = deserializeBitmap(bytes, bitmapDataLength,
deleteFile);
+ int crc = computeChecksum(bytes, bitmapDataLength);
+ int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+ int expectedCrc = buffer.getInt(crcOffset);
+ Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC");
+ return new BitmapPositionDeleteIndex(bitmap, deleteFile);
+ }
+
+ // computes and validates the length of the bitmap data (magic bytes +
bitmap)
+ private static int computeBitmapDataLength(RoaringPositionBitmap bitmap) {
+ long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes();
+ long bufferSize = LENGTH_SIZE_BYTES + length + CRC_SIZE_BYTES;
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Can't serialize
index > 2GB");
+ return (int) length;
+ }
+
+ // serializes the bitmap data (magic bytes + bitmap) using the little-endian
byte order
+ private static void serializeBitmapData(
+ byte[] bytes, int bitmapDataLength, RoaringPositionBitmap bitmap) {
+ ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+ bitmapData.putInt(MAGIC_NUMBER);
+ bitmap.serialize(bitmapData);
+ }
+
+ // points to the bitmap data in the blob
+ private static ByteBuffer pointToBitmapData(byte[] bytes, int
bitmapDataLength) {
+ ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET,
bitmapDataLength);
+ bitmapData.order(ByteOrder.LITTLE_ENDIAN);
+ return bitmapData;
+ }
+
+ // checks the blob size is equal to the bitmap data length + extra bytes for
length and CRC
+ private static int readBitmapDataLength(ByteBuffer buffer, DeleteFile
deleteFile) {
+ int length = buffer.getInt();
+ long expectedLength = deleteFile.contentSizeInBytes() - LENGTH_SIZE_BYTES
- CRC_SIZE_BYTES;
+ Preconditions.checkArgument(
+ length == expectedLength,
+ "Invalid bitmap data length: %s, expected %s",
+ length,
+ expectedLength);
+ return length;
+ }
+
+ // validates magic bytes and deserializes the bitmap
+ private static RoaringPositionBitmap deserializeBitmap(
+ byte[] bytes, int bitmapDataLength, DeleteFile deleteFile) {
+ ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+ int magicNumber = bitmapData.getInt();
+ Preconditions.checkArgument(
+ magicNumber == MAGIC_NUMBER,
+ "Invalid magic number: %s, expected %s",
+ magicNumber,
+ MAGIC_NUMBER);
+ RoaringPositionBitmap bitmap =
RoaringPositionBitmap.deserialize(bitmapData);
+ long cardinality = bitmap.cardinality();
+ long expectedCardinality = deleteFile.recordCount();
+ Preconditions.checkArgument(
+ cardinality == expectedCardinality,
+ "Invalid cardinality: %s, expected %s",
+ cardinality,
+ expectedCardinality);
+ return bitmap;
+ }
+
+ // generates a 32-bit unsigned checksum for the magic bytes and serialized
bitmap
+ private static int computeChecksum(byte[] bytes, int bitmapDataLength) {
+ CRC32 crc = new CRC32();
+ crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
+ return (int) crc.getValue();
+ }
}
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index 8ccfc03d1a..6f97b3a6ac 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.deletes;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.function.LongConsumer;
import org.apache.iceberg.DeleteFile;
@@ -92,6 +93,26 @@ public interface PositionDeleteIndex {
throw new UnsupportedOperationException(getClass().getName() + " does not
support cardinality");
}
+ /**
+ * Serializes this index.
+ *
+ * @return a buffer containing the serialized index
+ */
+ default ByteBuffer serialize() {
+ throw new UnsupportedOperationException(getClass().getName() + " does not
support serialize");
+ }
+
+ /**
+ * Deserializes a position delete index.
+ *
+ * @param bytes an array containing the serialized index
+ * @param deleteFile the delete file that the index is created for
+ * @return the deserialized index
+ */
+ static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFile) {
+ return BitmapPositionDeleteIndex.deserialize(bytes, deleteFile);
+ }
+
/** Returns an empty immutable position delete index. */
static PositionDeleteIndex empty() {
return EmptyPositionDeleteIndex.get();
diff --git
a/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
b/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
index c8fc723deb..76b294f806 100644
---
a/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
+++
b/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
@@ -20,12 +20,21 @@ package org.apache.iceberg.deletes;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
public class TestBitmapPositionDeleteIndex {
+ private static final long BITMAP_OFFSET = 0xFFFFFFFFL + 1L;
+ private static final long CONTAINER_OFFSET = Character.MAX_VALUE + 1L;
+
@Test
public void testForEach() {
long pos1 = 10L; // Container 0 (high bits = 0)
@@ -105,6 +114,102 @@ public class TestBitmapPositionDeleteIndex {
assertThat(positions).containsExactly(pos1, pos2, pos3, pos4);
}
+ @Test
+ public void testEmptyIndexSerialization() throws Exception {
+ PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+ validate(index, "empty-position-index.bin");
+ }
+
+ @Test
+ public void testSmallAlternatingValuesIndexSerialization() throws Exception {
+ PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+ index.delete(1L);
+ index.delete(3L);
+ index.delete(5L);
+ index.delete(7L);
+ index.delete(9L);
+ validate(index, "small-alternating-values-position-index.bin");
+ }
+
+ @Test
+ public void testSmallAndLargeValuesIndexSerialization() throws Exception {
+ PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+ index.delete(100L);
+ index.delete(101L);
+ index.delete(Integer.MAX_VALUE + 100L);
+ index.delete(Integer.MAX_VALUE + 101L);
+ validate(index, "small-and-large-values-position-index.bin");
+ }
+
+ @Test
+ public void testAllContainerTypesIndexSerialization() throws Exception {
+ PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+
+ // bitmap 0, container 0 (array)
+ index.delete(position(0 /* bitmap */, 0 /* container */, 5L));
+ index.delete(position(0 /* bitmap */, 0 /* container */, 7L));
+
+ // bitmap 0, container 1 (array that can be compressed)
+ index.delete(
+ position(0 /* bitmap */, 1 /* container */, 1L),
+ position(0 /* bitmap */, 1 /* container */, 1000L));
+
+ // bitmap 1, container 2 (bitset)
+ index.delete(
+ position(0 /* bitmap */, 2 /* container */, 1L),
+ position(0 /* bitmap */, 2 /* container */, CONTAINER_OFFSET - 1L));
+
+ // bitmap 1, container 0 (array)
+ index.delete(position(1 /* bitmap */, 0 /* container */, 10L));
+ index.delete(position(1 /* bitmap */, 0 /* container */, 20L));
+
+ // bitmap 1, container 1 (array that can be compressed)
+ index.delete(
+ position(1 /* bitmap */, 1 /* container */, 10L),
+ position(1 /* bitmap */, 1 /* container */, 500L));
+
+ // bitmap 1, container 2 (bitset)
+ index.delete(
+ position(1 /* bitmap */, 2 /* container */, 1L),
+ position(1 /* bitmap */, 2 /* container */, CONTAINER_OFFSET - 1));
+
+ validate(index, "all-container-types-position-index.bin");
+ }
+
+ private static void validate(PositionDeleteIndex index, String goldenFile)
throws Exception {
+ ByteBuffer buffer = index.serialize();
+ byte[] bytes = buffer.array();
+ DeleteFile dv = mockDV(bytes.length, index.cardinality());
+ PositionDeleteIndex indexCopy = PositionDeleteIndex.deserialize(bytes, dv);
+ assertEqual(index, indexCopy);
+ byte[] goldenBytes = readTestResource(goldenFile);
+ assertThat(bytes).isEqualTo(goldenBytes);
+ PositionDeleteIndex goldenIndex =
PositionDeleteIndex.deserialize(goldenBytes, dv);
+ assertEqual(index, goldenIndex);
+ }
+
+ private static DeleteFile mockDV(long contentSize, long cardinality) {
+ DeleteFile mock = Mockito.mock(DeleteFile.class);
+ Mockito.when(mock.contentSizeInBytes()).thenReturn(contentSize);
+ Mockito.when(mock.recordCount()).thenReturn(cardinality);
+ return mock;
+ }
+
+ private static void assertEqual(PositionDeleteIndex index,
PositionDeleteIndex thatIndex) {
+ assertThat(index.cardinality()).isEqualTo(thatIndex.cardinality());
+ index.forEach(position ->
assertThat(thatIndex.isDeleted(position)).isTrue());
+ thatIndex.forEach(position ->
assertThat(index.isDeleted(position)).isTrue());
+ }
+
+ private static long position(int bitmapIndex, int containerIndex, long
value) {
+ return bitmapIndex * BITMAP_OFFSET + containerIndex * CONTAINER_OFFSET +
value;
+ }
+
+ private static byte[] readTestResource(String resourceName) throws
IOException {
+ URL resource = Resources.getResource(TestRoaringPositionBitmap.class,
resourceName);
+ return Resources.toByteArray(resource);
+ }
+
private List<Long> collect(PositionDeleteIndex index) {
List<Long> positions = Lists.newArrayList();
index.forEach(positions::add);
diff --git
a/core/src/test/resources/org/apache/iceberg/deletes/all-container-types-position-index.bin
b/core/src/test/resources/org/apache/iceberg/deletes/all-container-types-position-index.bin
new file mode 100644
index 0000000000..00d47303b1
Binary files /dev/null and
b/core/src/test/resources/org/apache/iceberg/deletes/all-container-types-position-index.bin
differ
diff --git
a/core/src/test/resources/org/apache/iceberg/deletes/empty-position-index.bin
b/core/src/test/resources/org/apache/iceberg/deletes/empty-position-index.bin
new file mode 100644
index 0000000000..8bbc1265dc
Binary files /dev/null and
b/core/src/test/resources/org/apache/iceberg/deletes/empty-position-index.bin
differ
diff --git
a/core/src/test/resources/org/apache/iceberg/deletes/small-alternating-values-position-index.bin
b/core/src/test/resources/org/apache/iceberg/deletes/small-alternating-values-position-index.bin
new file mode 100644
index 0000000000..80829fae22
Binary files /dev/null and
b/core/src/test/resources/org/apache/iceberg/deletes/small-alternating-values-position-index.bin
differ
diff --git
a/core/src/test/resources/org/apache/iceberg/deletes/small-and-large-values-position-index.bin
b/core/src/test/resources/org/apache/iceberg/deletes/small-and-large-values-position-index.bin
new file mode 100644
index 0000000000..989dabf6ad
Binary files /dev/null and
b/core/src/test/resources/org/apache/iceberg/deletes/small-and-large-values-position-index.bin
differ