This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 43b2f7d007 Core: Make PositionDeleteIndex serializable (#11463)
43b2f7d007 is described below

commit 43b2f7d007c26ad79ebdf60d37ccca144db1f08f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Nov 5 08:35:42 2024 +0100

    Core: Make PositionDeleteIndex serializable (#11463)
---
 .../iceberg/deletes/BitmapPositionDeleteIndex.java | 124 +++++++++++++++++++++
 .../iceberg/deletes/PositionDeleteIndex.java       |  21 ++++
 .../deletes/TestBitmapPositionDeleteIndex.java     | 105 +++++++++++++++++
 .../deletes/all-container-types-position-index.bin | Bin 0 -> 94 bytes
 .../iceberg/deletes/empty-position-index.bin       | Bin 0 -> 20 bytes
 .../small-alternating-values-position-index.bin    | Bin 0 -> 50 bytes
 .../small-and-large-values-position-index.bin      | Bin 0 -> 56 bytes
 7 files changed, 250 insertions(+)

diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index cfb163e837..376b391d9c 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -18,13 +18,23 @@
  */
 package org.apache.iceberg.deletes;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Collection;
 import java.util.List;
 import java.util.function.LongConsumer;
+import java.util.zip.CRC32;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 class BitmapPositionDeleteIndex implements PositionDeleteIndex {
+  private static final int LENGTH_SIZE_BYTES = 4;
+  private static final int MAGIC_NUMBER_SIZE_BYTES = 4;
+  private static final int CRC_SIZE_BYTES = 4;
+  private static final int BITMAP_DATA_OFFSET = 4;
+  private static final int MAGIC_NUMBER = 1681511377;
+
   private final RoaringPositionBitmap bitmap;
   private final List<DeleteFile> deleteFiles;
 
@@ -43,6 +53,11 @@ class BitmapPositionDeleteIndex implements 
PositionDeleteIndex {
     this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : 
Lists.newArrayList();
   }
 
+  BitmapPositionDeleteIndex(RoaringPositionBitmap bitmap, DeleteFile 
deleteFile) {
+    this.bitmap = bitmap;
+    this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : 
Lists.newArrayList();
+  }
+
   void merge(BitmapPositionDeleteIndex that) {
     bitmap.setAll(that.bitmap);
     deleteFiles.addAll(that.deleteFiles);
@@ -92,4 +107,113 @@ class BitmapPositionDeleteIndex implements 
PositionDeleteIndex {
   public long cardinality() {
     return bitmap.cardinality();
   }
+
+  /**
+   * Serializes the index using the following format:
+   *
+   * <ul>
+   *   <li>The length of the magic bytes and bitmap stored as 4 bytes 
(big-endian).
+   *   <li>A 4-byte {@link #MAGIC_NUMBER} (little-endian).
+   *   <li>The bitmap serialized using the portable Roaring spec 
(little-endian).
+   *   <li>A CRC-32 checksum of the magic bytes and bitmap as 4-bytes 
(big-endian).
+   * </ul>
+   *
+   * Note that the length and the checksum are computed for the bitmap data, 
which includes the
+   * magic bytes and bitmap for compatibility with Delta.
+   */
+  @Override
+  public ByteBuffer serialize() {
+    bitmap.runLengthEncode(); // run-length encode the bitmap before 
serializing
+    int bitmapDataLength = computeBitmapDataLength(bitmap); // magic bytes + 
bitmap
+    byte[] bytes = new byte[LENGTH_SIZE_BYTES + bitmapDataLength + 
CRC_SIZE_BYTES];
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    buffer.putInt(bitmapDataLength);
+    serializeBitmapData(bytes, bitmapDataLength, bitmap);
+    int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+    int crc = computeChecksum(bytes, bitmapDataLength);
+    buffer.putInt(crcOffset, crc);
+    buffer.rewind();
+    return buffer;
+  }
+
+  /**
+   * Deserializes the index from bytes, assuming the format described in 
{@link #serialize()}.
+   *
+   * @param bytes an array containing the serialized index
+   * @param deleteFile the DV file
+   * @return the deserialized index
+   */
+  public static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile 
deleteFile) {
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    int bitmapDataLength = readBitmapDataLength(buffer, deleteFile);
+    RoaringPositionBitmap bitmap = deserializeBitmap(bytes, bitmapDataLength, 
deleteFile);
+    int crc = computeChecksum(bytes, bitmapDataLength);
+    int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+    int expectedCrc = buffer.getInt(crcOffset);
+    Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC");
+    return new BitmapPositionDeleteIndex(bitmap, deleteFile);
+  }
+
+  // computes and validates the length of the bitmap data (magic bytes + 
bitmap)
+  private static int computeBitmapDataLength(RoaringPositionBitmap bitmap) {
+    long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes();
+    long bufferSize = LENGTH_SIZE_BYTES + length + CRC_SIZE_BYTES;
+    Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Can't serialize 
index > 2GB");
+    return (int) length;
+  }
+
+  // serializes the bitmap data (magic bytes + bitmap) using the little-endian 
byte order
+  private static void serializeBitmapData(
+      byte[] bytes, int bitmapDataLength, RoaringPositionBitmap bitmap) {
+    ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+    bitmapData.putInt(MAGIC_NUMBER);
+    bitmap.serialize(bitmapData);
+  }
+
+  // points to the bitmap data in the blob
+  private static ByteBuffer pointToBitmapData(byte[] bytes, int 
bitmapDataLength) {
+    ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET, 
bitmapDataLength);
+    bitmapData.order(ByteOrder.LITTLE_ENDIAN);
+    return bitmapData;
+  }
+
+  // checks the blob size is equal to the bitmap data length + extra bytes for 
length and CRC
+  private static int readBitmapDataLength(ByteBuffer buffer, DeleteFile 
deleteFile) {
+    int length = buffer.getInt();
+    long expectedLength = deleteFile.contentSizeInBytes() - LENGTH_SIZE_BYTES 
- CRC_SIZE_BYTES;
+    Preconditions.checkArgument(
+        length == expectedLength,
+        "Invalid bitmap data length: %s, expected %s",
+        length,
+        expectedLength);
+    return length;
+  }
+
+  // validates magic bytes and deserializes the bitmap
+  private static RoaringPositionBitmap deserializeBitmap(
+      byte[] bytes, int bitmapDataLength, DeleteFile deleteFile) {
+    ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+    int magicNumber = bitmapData.getInt();
+    Preconditions.checkArgument(
+        magicNumber == MAGIC_NUMBER,
+        "Invalid magic number: %s, expected %s",
+        magicNumber,
+        MAGIC_NUMBER);
+    RoaringPositionBitmap bitmap = 
RoaringPositionBitmap.deserialize(bitmapData);
+    long cardinality = bitmap.cardinality();
+    long expectedCardinality = deleteFile.recordCount();
+    Preconditions.checkArgument(
+        cardinality == expectedCardinality,
+        "Invalid cardinality: %s, expected %s",
+        cardinality,
+        expectedCardinality);
+    return bitmap;
+  }
+
+  // generates a 32-bit unsigned checksum for the magic bytes and serialized 
bitmap
+  private static int computeChecksum(byte[] bytes, int bitmapDataLength) {
+    CRC32 crc = new CRC32();
+    crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
+    return (int) crc.getValue();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index 8ccfc03d1a..6f97b3a6ac 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.deletes;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.function.LongConsumer;
 import org.apache.iceberg.DeleteFile;
@@ -92,6 +93,26 @@ public interface PositionDeleteIndex {
     throw new UnsupportedOperationException(getClass().getName() + " does not 
support cardinality");
   }
 
+  /**
+   * Serializes this index.
+   *
+   * @return a buffer containing the serialized index
+   */
+  default ByteBuffer serialize() {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support serialize");
+  }
+
+  /**
+   * Deserializes a position delete index.
+   *
+   * @param bytes an array containing the serialized index
+   * @param deleteFile the delete file that the index is created for
+   * @return the deserialized index
+   */
+  static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFile) {
+    return BitmapPositionDeleteIndex.deserialize(bytes, deleteFile);
+  }
+
   /** Returns an empty immutable position delete index. */
   static PositionDeleteIndex empty() {
     return EmptyPositionDeleteIndex.get();
diff --git 
a/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
 
b/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
index c8fc723deb..76b294f806 100644
--- 
a/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
+++ 
b/core/src/test/java/org/apache/iceberg/deletes/TestBitmapPositionDeleteIndex.java
@@ -20,12 +20,21 @@ package org.apache.iceberg.deletes;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.io.Resources;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class TestBitmapPositionDeleteIndex {
 
+  private static final long BITMAP_OFFSET = 0xFFFFFFFFL + 1L;
+  private static final long CONTAINER_OFFSET = Character.MAX_VALUE + 1L;
+
   @Test
   public void testForEach() {
     long pos1 = 10L; // Container 0 (high bits = 0)
@@ -105,6 +114,102 @@ public class TestBitmapPositionDeleteIndex {
     assertThat(positions).containsExactly(pos1, pos2, pos3, pos4);
   }
 
+  @Test
+  public void testEmptyIndexSerialization() throws Exception {
+    PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+    validate(index, "empty-position-index.bin");
+  }
+
+  @Test
+  public void testSmallAlternatingValuesIndexSerialization() throws Exception {
+    PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+    index.delete(1L);
+    index.delete(3L);
+    index.delete(5L);
+    index.delete(7L);
+    index.delete(9L);
+    validate(index, "small-alternating-values-position-index.bin");
+  }
+
+  @Test
+  public void testSmallAndLargeValuesIndexSerialization() throws Exception {
+    PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+    index.delete(100L);
+    index.delete(101L);
+    index.delete(Integer.MAX_VALUE + 100L);
+    index.delete(Integer.MAX_VALUE + 101L);
+    validate(index, "small-and-large-values-position-index.bin");
+  }
+
+  @Test
+  public void testAllContainerTypesIndexSerialization() throws Exception {
+    PositionDeleteIndex index = new BitmapPositionDeleteIndex();
+
+    // bitmap 0, container 0 (array)
+    index.delete(position(0 /* bitmap */, 0 /* container */, 5L));
+    index.delete(position(0 /* bitmap */, 0 /* container */, 7L));
+
+    // bitmap 0, container 1 (array that can be compressed)
+    index.delete(
+        position(0 /* bitmap */, 1 /* container */, 1L),
+        position(0 /* bitmap */, 1 /* container */, 1000L));
+
+    // bitmap 1, container 2 (bitset)
+    index.delete(
+        position(0 /* bitmap */, 2 /* container */, 1L),
+        position(0 /* bitmap */, 2 /* container */, CONTAINER_OFFSET - 1L));
+
+    // bitmap 1, container 0 (array)
+    index.delete(position(1 /* bitmap */, 0 /* container */, 10L));
+    index.delete(position(1 /* bitmap */, 0 /* container */, 20L));
+
+    // bitmap 1, container 1 (array that can be compressed)
+    index.delete(
+        position(1 /* bitmap */, 1 /* container */, 10L),
+        position(1 /* bitmap */, 1 /* container */, 500L));
+
+    // bitmap 1, container 2 (bitset)
+    index.delete(
+        position(1 /* bitmap */, 2 /* container */, 1L),
+        position(1 /* bitmap */, 2 /* container */, CONTAINER_OFFSET - 1));
+
+    validate(index, "all-container-types-position-index.bin");
+  }
+
+  private static void validate(PositionDeleteIndex index, String goldenFile) 
throws Exception {
+    ByteBuffer buffer = index.serialize();
+    byte[] bytes = buffer.array();
+    DeleteFile dv = mockDV(bytes.length, index.cardinality());
+    PositionDeleteIndex indexCopy = PositionDeleteIndex.deserialize(bytes, dv);
+    assertEqual(index, indexCopy);
+    byte[] goldenBytes = readTestResource(goldenFile);
+    assertThat(bytes).isEqualTo(goldenBytes);
+    PositionDeleteIndex goldenIndex = 
PositionDeleteIndex.deserialize(goldenBytes, dv);
+    assertEqual(index, goldenIndex);
+  }
+
+  private static DeleteFile mockDV(long contentSize, long cardinality) {
+    DeleteFile mock = Mockito.mock(DeleteFile.class);
+    Mockito.when(mock.contentSizeInBytes()).thenReturn(contentSize);
+    Mockito.when(mock.recordCount()).thenReturn(cardinality);
+    return mock;
+  }
+
+  private static void assertEqual(PositionDeleteIndex index, 
PositionDeleteIndex thatIndex) {
+    assertThat(index.cardinality()).isEqualTo(thatIndex.cardinality());
+    index.forEach(position -> 
assertThat(thatIndex.isDeleted(position)).isTrue());
+    thatIndex.forEach(position -> 
assertThat(index.isDeleted(position)).isTrue());
+  }
+
+  private static long position(int bitmapIndex, int containerIndex, long 
value) {
+    return bitmapIndex * BITMAP_OFFSET + containerIndex * CONTAINER_OFFSET + 
value;
+  }
+
+  private static byte[] readTestResource(String resourceName) throws 
IOException {
+    URL resource = Resources.getResource(TestRoaringPositionBitmap.class, 
resourceName);
+    return Resources.toByteArray(resource);
+  }
+
   private List<Long> collect(PositionDeleteIndex index) {
     List<Long> positions = Lists.newArrayList();
     index.forEach(positions::add);
diff --git 
a/core/src/test/resources/org/apache/iceberg/deletes/all-container-types-position-index.bin
 
b/core/src/test/resources/org/apache/iceberg/deletes/all-container-types-position-index.bin
new file mode 100644
index 0000000000..00d47303b1
Binary files /dev/null and 
b/core/src/test/resources/org/apache/iceberg/deletes/all-container-types-position-index.bin
 differ
diff --git 
a/core/src/test/resources/org/apache/iceberg/deletes/empty-position-index.bin 
b/core/src/test/resources/org/apache/iceberg/deletes/empty-position-index.bin
new file mode 100644
index 0000000000..8bbc1265dc
Binary files /dev/null and 
b/core/src/test/resources/org/apache/iceberg/deletes/empty-position-index.bin 
differ
diff --git 
a/core/src/test/resources/org/apache/iceberg/deletes/small-alternating-values-position-index.bin
 
b/core/src/test/resources/org/apache/iceberg/deletes/small-alternating-values-position-index.bin
new file mode 100644
index 0000000000..80829fae22
Binary files /dev/null and 
b/core/src/test/resources/org/apache/iceberg/deletes/small-alternating-values-position-index.bin
 differ
diff --git 
a/core/src/test/resources/org/apache/iceberg/deletes/small-and-large-values-position-index.bin
 
b/core/src/test/resources/org/apache/iceberg/deletes/small-and-large-values-position-index.bin
new file mode 100644
index 0000000000..989dabf6ad
Binary files /dev/null and 
b/core/src/test/resources/org/apache/iceberg/deletes/small-and-large-values-position-index.bin
 differ

Reply via email to