This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 602186bedc Core, Spark: Fix handling of null binary values when
sorting with zorder (#10026)
602186bedc is described below
commit 602186bedc7f5be270f9a0cd5c2c15da7bddcdb9
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Mar 25 15:03:33 2024 -0700
Core, Spark: Fix handling of null binary values when sorting with zorder
(#10026)
---
.../org/apache/iceberg/util/ZOrderByteUtils.java | 5 ++++
.../apache/iceberg/util/TestZOrderByteUtil.java | 10 +++++++
.../extensions/TestRewriteDataFilesProcedure.java | 35 ++++++++++++++++++++++
.../extensions/TestRewriteDataFilesProcedure.java | 35 ++++++++++++++++++++++
.../extensions/TestRewriteDataFilesProcedure.java | 35 ++++++++++++++++++++++
5 files changed, 120 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
index 923f3dc2d5..c687fc4e03 100644
--- a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
+++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java
@@ -145,6 +145,11 @@ public class ZOrderByteUtils {
@SuppressWarnings("ByteBufferBackingArray")
public static ByteBuffer byteTruncateOrFill(byte[] val, int length,
ByteBuffer reuse) {
ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
+ if (val == null) {
+ Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
+ return bytes;
+ }
+
if (val.length < length) {
bytes.put(val, 0, val.length);
Arrays.fill(bytes.array(), val.length, length, (byte) 0x00);
diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
index 13e7c843c7..d05843e208 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
@@ -391,4 +391,14 @@ public class TestZOrderByteUtil {
.isEqualTo(stringCompare);
}
}
+
+ @Test
+ public void testByteTruncatedOrFillNullIsZeroArray() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ byte[] actualBytes = ZOrderByteUtils.byteTruncateOrFill(null, 128,
buffer).array();
+ ByteBuffer expected = ByteBuffer.allocate(128);
+ Arrays.fill(expected.array(), 0, 128, (byte) 0x00);
+
+ assertThat(actualBytes).isEqualTo(expected.array());
+ }
}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 0cdde158bd..80cacbf376 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -218,6 +218,41 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM
%s", tableName));
}
+ @Test
+ public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg",
tableName);
+
+ for (int i = 0; i < 5; i++) {
+ sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)",
tableName);
+ }
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', "
+ + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+ catalogName, tableIdent);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(snapshotSummary())
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP,
String.valueOf(output.get(0)[2]));
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .containsExactly(
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null));
+ }
+
@Test
public void testRewriteDataFilesWithFilter() {
createTable();
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 477aa2a1d9..009ab41228 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -261,6 +261,41 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM
%s", tableName));
}
+ @Test
+ public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg",
tableName);
+
+ for (int i = 0; i < 5; i++) {
+ sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)",
tableName);
+ }
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', "
+ + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+ catalogName, tableIdent);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ assertThat(output.get(0)).hasSize(4);
+ assertThat(snapshotSummary())
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP,
String.valueOf(output.get(0)[2]));
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .containsExactly(
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null));
+ }
+
@Test
public void
testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
createTable();
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index b01438d39d..9ba886db45 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -259,6 +259,41 @@ public class TestRewriteDataFilesProcedure extends
ExtensionsTestBase {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM
%s", tableName));
}
+ @TestTemplate
+ public void testRewriteDataFilesWithZOrderNullBinaryColumn() {
+ sql("CREATE TABLE %s (c1 int, c2 string, c3 binary) USING iceberg",
tableName);
+
+ for (int i = 0; i < 5; i++) {
+ sql("INSERT INTO %s values (1, 'foo', null), (2, 'bar', null)",
tableName);
+ }
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', "
+ + "strategy => 'sort', sort_order => 'zorder(c2,c3)')",
+ catalogName, tableIdent);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ assertThat(output.get(0)).hasSize(4);
+ assertThat(snapshotSummary())
+ .containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP,
String.valueOf(output.get(0)[2]));
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .containsExactly(
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null));
+ }
+
@TestTemplate
public void
testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
createTable();