This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch zorder-bytearr-npe
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 5659d2955e605d6c9e74ae16269b2eb20f5d67d1
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri Mar 22 10:52:27 2024 -0600

    Core, Spark: Fix handling of null binary values when sorting with zorder
---
 .../org/apache/iceberg/util/ZOrderByteUtils.java   |  5 +++
 .../apache/iceberg/util/TestZOrderByteUtil.java    | 10 ++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 38 +++++++++++++++++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 39 ++++++++++++++++++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 38 +++++++++++++++++++++
 5 files changed, 130 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java 
b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
index 923f3dc2d5..c687fc4e03 100644
--- a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
+++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
@@ -145,6 +145,11 @@ public class ZOrderByteUtils {
   @SuppressWarnings("ByteBufferBackingArray")
   public static ByteBuffer byteTruncateOrFill(byte[] val, int length, 
ByteBuffer reuse) {
     ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
+    if (val == null) {
+      Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
+      return bytes;
+    }
+
     if (val.length < length) {
       bytes.put(val, 0, val.length);
       Arrays.fill(bytes.array(), val.length, length, (byte) 0x00);
diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
index 13e7c843c7..d05843e208 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
@@ -391,4 +391,14 @@ public class TestZOrderByteUtil {
           .isEqualTo(stringCompare);
     }
   }
+
+  @Test
+  public void testByteTruncatedOrFillNullIsZeroArray() {
+    ByteBuffer buffer = ByteBuffer.allocate(128);
+    byte[] actualBytes = ZOrderByteUtils.byteTruncateOrFill(null, 128, 
buffer).array();
+    ByteBuffer expected = ByteBuffer.allocate(128);
+    Arrays.fill(expected.array(), 0, 128, (byte) 0x00);
+
+    assertThat(actualBytes).isEqualTo(expected.array());
+  }
 }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 0cdde158bd..ca7f7df3f0 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -218,6 +218,44 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", 
tableName);
+
+    for (int i = 0; i < 5; i++) {
+      sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", 
tableName);
+    }
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', "
+                + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null));
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
+  }
+
   @Test
   public void testRewriteDataFilesWithFilter() {
     createTable();
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 477aa2a1d9..aefe1269c4 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -261,6 +261,45 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", 
tableName);
+
+    for (int i = 0; i < 5; i++) {
+      sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", 
tableName);
+    }
+
+    List<Object[]> output =
+            sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', "
+                            + "strategy => 'sort', sort_order => 
'zorder(c2,c3)')",
+                    catalogName, tableIdent);
+
+    assertEquals(
+            "Action should rewrite 10 data files and add 1 data files",
+            row(10, 1),
+            Arrays.copyOf(output.get(0), 2));
+    assertThat(output.get(0)).hasSize(4);
+    assertThat(output.get(0)[2])
+            .isInstanceOf(Long.class)
+            
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+
+    ImmutableList<Object[]> expectedRows =
+            ImmutableList.of(
+                    row(2, "bar", null),
+                    row(2, "bar", null),
+                    row(2, "bar", null),
+                    row(2, "bar", null),
+                    row(2, "bar", null),
+                    row(1, "foo", null),
+                    row(1, "foo", null),
+                    row(1, "foo", null),
+                    row(1, "foo", null),
+                    row(1, "foo", null));
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
+  }
+
+
   @Test
   public void 
testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
     createTable();
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index b01438d39d..1482bc9664 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -259,6 +259,44 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
   }
 
+  @TestTemplate
+  public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", 
tableName);
+
+    for (int i = 0; i < 5; i++) {
+      sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", 
tableName);
+    }
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', "
+                + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    assertThat(output.get(0)).hasSize(4);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null));
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
+  }
+
   @TestTemplate
   public void 
testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
     createTable();

Reply via email to