This is an automated email from the ASF dual-hosted git repository. amoghj pushed a commit to branch zorder-bytearr-npe in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 5659d2955e605d6c9e74ae16269b2eb20f5d67d1 Author: Amogh Jahagirdar <[email protected]> AuthorDate: Fri Mar 22 10:52:27 2024 -0600 Core, Spark: Fix handling of null binary values when sorting with zorder --- .../org/apache/iceberg/util/ZOrderByteUtils.java | 5 +++ .../apache/iceberg/util/TestZOrderByteUtil.java | 10 ++++++ .../extensions/TestRewriteDataFilesProcedure.java | 38 +++++++++++++++++++++ .../extensions/TestRewriteDataFilesProcedure.java | 39 ++++++++++++++++++++++ .../extensions/TestRewriteDataFilesProcedure.java | 38 +++++++++++++++++++++ 5 files changed, 130 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java index 923f3dc2d5..c687fc4e03 100644 --- a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java +++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java @@ -145,6 +145,11 @@ public class ZOrderByteUtils { @SuppressWarnings("ByteBufferBackingArray") public static ByteBuffer byteTruncateOrFill(byte[] val, int length, ByteBuffer reuse) { ByteBuffer bytes = ByteBuffers.reuse(reuse, length); + if (val == null) { + Arrays.fill(bytes.array(), 0, length, (byte) 0x00); + return bytes; + } + if (val.length < length) { bytes.put(val, 0, val.length); Arrays.fill(bytes.array(), val.length, length, (byte) 0x00); diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java index 13e7c843c7..d05843e208 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java @@ -391,4 +391,14 @@ public class TestZOrderByteUtil { .isEqualTo(stringCompare); } } + + @Test + public void testByteTruncatedOrFillNullIsZeroArray() { + ByteBuffer buffer = ByteBuffer.allocate(128); + byte[] actualBytes = ZOrderByteUtils.byteTruncateOrFill(null, 128, buffer).array(); + ByteBuffer expected = ByteBuffer.allocate(128); + Arrays.fill(expected.array(), 0, 128, (byte) 0x00); + + assertThat(actualBytes).isEqualTo(expected.array()); + } } diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 0cdde158bd..ca7f7df3f0 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -218,6 +218,44 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase { assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); } + @Test + public void testRewriteDataFilesWithZOrderNullBinaryColumn() { + sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", tableName); + + for (int i = 0; i < 5; i++) { + sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", tableName); + } + + List<Object[]> output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', " + + "strategy => 'sort', sort_order => 'zorder(c2,c3)')", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + + ImmutableList<Object[]> expectedRows = + ImmutableList.of( + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null)); + assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); + } + @Test public void testRewriteDataFilesWithFilter() { createTable(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 477aa2a1d9..aefe1269c4 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -261,6 +261,45 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase { assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); } + @Test + public void testRewriteDataFilesWithZOrderNullBinaryColumn() { + sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", tableName); + + for (int i = 0; i < 5; i++) { + sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", tableName); + } + + List<Object[]> output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', " + + "strategy => 'sort', sort_order => 'zorder(c2,c3)')", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + assertThat(output.get(0)).hasSize(4); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + + ImmutableList<Object[]> expectedRows = + ImmutableList.of( + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null)); + assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); + } + + @Test public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index b01438d39d..1482bc9664 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -259,6 +259,44 @@ public class TestRewriteDataFilesProcedure extends ExtensionsTestBase { assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); } + @TestTemplate + public void testRewriteDataFilesWithZOrderNullBinaryColumn() { + sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg", tableName); + + for (int i = 0; i < 5; i++) { + sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)", tableName); + } + + List<Object[]> output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', " + + "strategy => 'sort', sort_order => 'zorder(c2,c3)')", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + assertThat(output.get(0)).hasSize(4); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + + ImmutableList<Object[]> expectedRows = + ImmutableList.of( + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null)); + assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); + } + @TestTemplate public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() { createTable();
