This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a99678369 Parquet: Support reading INT96 column in row group filter 
(#8988)
4a99678369 is described below

commit 4a996783697faa1c0bd6b4fc6ceb97260f6aeb7a
Author: Manu Zhang <[email protected]>
AuthorDate: Mon Jan 8 18:08:07 2024 +0800

    Parquet: Support reading INT96 column in row group filter (#8988)
---
 .../apache/iceberg/parquet/ParquetConversions.java |  5 ++++
 .../parquet/ParquetDictionaryRowGroupFilter.java   |  3 ++
 .../spark/source/TestIcebergSourceTablesBase.java  | 32 ++++++++++++++--------
 .../spark/source/TestIcebergSourceTablesBase.java  | 32 ++++++++++++++--------
 .../spark/source/TestIcebergSourceTablesBase.java  | 28 +++++++++++++------
 5 files changed, 69 insertions(+), 31 deletions(-)

diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index e1a342b632..0f9878d201 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.parquet;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 import java.util.UUID;
 import java.util.function.Function;
@@ -112,6 +113,10 @@ class ParquetConversions {
       case FIXED_LEN_BYTE_ARRAY:
       case BINARY:
         return binary -> ByteBuffer.wrap(((Binary) binary).getBytes());
+      case INT96:
+        return binary ->
+            ParquetUtil.extractTimestampInt96(
+                ByteBuffer.wrap(((Binary) 
binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN));
       default:
     }
 
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 33ec2f6817..1d24b7ccd7 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -453,6 +453,9 @@ public class ParquetDictionaryRowGroupFilter {
           case DOUBLE:
             dictSet.add((T) conversion.apply(dict.decodeToDouble(i)));
             break;
+          case INT96:
+            dictSet.add((T) conversion.apply(dict.decodeToBinary(i)));
+            break;
           default:
             throw new IllegalArgumentException(
                 "Cannot decode dictionary of type: "
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 3c52652748..111da882fe 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -2156,22 +2156,32 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             stagingLocation);
 
         // validate we get the expected results back
-        List<Row> expected = 
spark.table("parquet_table").select("tmp_col").collectAsList();
-        List<Row> actual =
-            spark
-                .read()
-                .format("iceberg")
-                .load(loadLocation(tableIdentifier))
-                .select("tmp_col")
-                .collectAsList();
-        Assertions.assertThat(actual)
-            .as("Rows must match")
-            .containsExactlyInAnyOrderElementsOf(expected);
+        testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
         dropTable(tableIdentifier);
       }
     }
   }
 
+  private void testWithFilter(String filterExpr, TableIdentifier 
tableIdentifier) {
+    List<Row> expected =
+        
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier))
+            .select("tmp_col")
+            .filter(filterExpr)
+            .collectAsList();
+    Assertions.assertThat(actual)
+        .as("Rows must match")
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
+
   private GenericData.Record manifestRecord(
       Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
     GenericRecordBuilder builder =
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ebd0933a95..b5038f981b 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -2154,22 +2154,32 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
             stagingLocation);
 
         // validate we get the expected results back
-        List<Row> expected = 
spark.table("parquet_table").select("tmp_col").collectAsList();
-        List<Row> actual =
-            spark
-                .read()
-                .format("iceberg")
-                .load(loadLocation(tableIdentifier))
-                .select("tmp_col")
-                .collectAsList();
-        Assertions.assertThat(actual)
-            .as("Rows must match")
-            .containsExactlyInAnyOrderElementsOf(expected);
+        testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
         dropTable(tableIdentifier);
       }
     }
   }
 
+  private void testWithFilter(String filterExpr, TableIdentifier 
tableIdentifier) {
+    List<Row> expected =
+        
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier))
+            .select("tmp_col")
+            .filter(filterExpr)
+            .collectAsList();
+    Assertions.assertThat(actual)
+        .as("Rows must match")
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
+
   private GenericData.Record manifestRecord(
       Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
     GenericRecordBuilder builder =
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 4f585eee51..d37d6a8616 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -2181,20 +2181,30 @@ public abstract class TestIcebergSourceTablesBase 
extends TestBase {
             stagingLocation);
 
         // validate we get the expected results back
-        List<Row> expected = 
spark.table("parquet_table").select("tmp_col").collectAsList();
-        List<Row> actual =
-            spark
-                .read()
-                .format("iceberg")
-                .load(loadLocation(tableIdentifier))
-                .select("tmp_col")
-                .collectAsList();
-        assertThat(actual).as("Rows must 
match").containsExactlyInAnyOrderElementsOf(expected);
+        testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
+        testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", 
tableIdentifier);
         dropTable(tableIdentifier);
       }
     }
   }
 
+  private void testWithFilter(String filterExpr, TableIdentifier 
tableIdentifier) {
+    List<Row> expected =
+        
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier))
+            .select("tmp_col")
+            .filter(filterExpr)
+            .collectAsList();
+    assertThat(actual).as("Rows must 
match").containsExactlyInAnyOrderElementsOf(expected);
+  }
+
   private GenericData.Record manifestRecord(
       Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
     GenericRecordBuilder builder =

Reply via email to