This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 602186bedc Core, Spark: Fix handling of null binary values when 
sorting with zorder (#10026)
602186bedc is described below

commit 602186bedc7f5be270f9a0cd5c2c15da7bddcdb9
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Mar 25 15:03:33 2024 -0700

    Core, Spark: Fix handling of null binary values when sorting with zorder 
(#10026)
---
 .../org/apache/iceberg/util/ZOrderByteUtils.java   |  5 ++++
 .../apache/iceberg/util/TestZOrderByteUtil.java    | 10 +++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 35 ++++++++++++++++++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 35 ++++++++++++++++++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 35 ++++++++++++++++++++++
 5 files changed, 120 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java 
b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
index 923f3dc2d5..c687fc4e03 100644
--- a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
+++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
@@ -145,6 +145,11 @@ public class ZOrderByteUtils {
   @SuppressWarnings("ByteBufferBackingArray")
   public static ByteBuffer byteTruncateOrFill(byte[] val, int length, 
ByteBuffer reuse) {
     ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
+    if (val == null) {
+      Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
+      return bytes;
+    }
+
     if (val.length < length) {
       bytes.put(val, 0, val.length);
       Arrays.fill(bytes.array(), val.length, length, (byte) 0x00);
diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
index 13e7c843c7..d05843e208 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
@@ -391,4 +391,14 @@ public class TestZOrderByteUtil {
           .isEqualTo(stringCompare);
     }
   }
+
+  @Test
+  public void testByteTruncatedOrFillNullIsZeroArray() {
+    ByteBuffer buffer = ByteBuffer.allocate(128);
+    byte[] actualBytes = ZOrderByteUtils.byteTruncateOrFill(null, 128, 
buffer).array();
+    ByteBuffer expected = ByteBuffer.allocate(128);
+    Arrays.fill(expected.array(), 0, 128, (byte) 0x00);
+
+    assertThat(actualBytes).isEqualTo(expected.array());
+  }
 }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 0cdde158bd..80cacbf376 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -218,6 +218,41 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", 
tableName);
+
+    for (int i = 0; i < 5; i++) {
+      sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", 
tableName);
+    }
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', "
+                + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(snapshotSummary())
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, 
String.valueOf(output.get(0)[2]));
+    assertThat(sql("SELECT * FROM %s", tableName))
+        .containsExactly(
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null));
+  }
+
   @Test
   public void testRewriteDataFilesWithFilter() {
     createTable();
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 477aa2a1d9..009ab41228 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -261,6 +261,41 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", 
tableName);
+
+    for (int i = 0; i < 5; i++) {
+      sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", 
tableName);
+    }
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', "
+                + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    assertThat(output.get(0)).hasSize(4);
+    assertThat(snapshotSummary())
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, 
String.valueOf(output.get(0)[2]));
+    assertThat(sql("SELECT * FROM %s", tableName))
+        .containsExactly(
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null));
+  }
+
   @Test
   public void 
testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
     createTable();
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index b01438d39d..9ba886db45 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -259,6 +259,41 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM 
%s", tableName));
   }
 
+  @TestTemplate
+  public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+    sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", 
tableName);
+
+    for (int i = 0; i < 5; i++) {
+      sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", 
tableName);
+    }
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s', "
+                + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    assertThat(output.get(0)).hasSize(4);
+    assertThat(snapshotSummary())
+        .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, 
String.valueOf(output.get(0)[2]));
+    assertThat(sql("SELECT * FROM %s", tableName))
+        .containsExactly(
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null));
+  }
+
   @TestTemplate
   public void 
testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
     createTable();

Reply via email to