This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4a99678369 Parquet: Support reading INT96 column in row group filter
(#8988)
4a99678369 is described below
commit 4a996783697faa1c0bd6b4fc6ceb97260f6aeb7a
Author: Manu Zhang <[email protected]>
AuthorDate: Mon Jan 8 18:08:07 2024 +0800
Parquet: Support reading INT96 column in row group filter (#8988)
---
.../apache/iceberg/parquet/ParquetConversions.java | 5 ++++
.../parquet/ParquetDictionaryRowGroupFilter.java | 3 ++
.../spark/source/TestIcebergSourceTablesBase.java | 32 ++++++++++++++--------
.../spark/source/TestIcebergSourceTablesBase.java | 32 ++++++++++++++--------
.../spark/source/TestIcebergSourceTablesBase.java | 28 +++++++++++++------
5 files changed, 69 insertions(+), 31 deletions(-)
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index e1a342b632..0f9878d201 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.parquet;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.function.Function;
@@ -112,6 +113,10 @@ class ParquetConversions {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return binary -> ByteBuffer.wrap(((Binary) binary).getBytes());
+ case INT96:
+ return binary ->
+ ParquetUtil.extractTimestampInt96(
+ ByteBuffer.wrap(((Binary)
binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN));
default:
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 33ec2f6817..1d24b7ccd7 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -453,6 +453,9 @@ public class ParquetDictionaryRowGroupFilter {
case DOUBLE:
dictSet.add((T) conversion.apply(dict.decodeToDouble(i)));
break;
+ case INT96:
+ dictSet.add((T) conversion.apply(dict.decodeToBinary(i)));
+ break;
default:
throw new IllegalArgumentException(
"Cannot decode dictionary of type: "
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 3c52652748..111da882fe 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -2156,22 +2156,32 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
stagingLocation);
// validate we get the expected results back
- List<Row> expected =
spark.table("parquet_table").select("tmp_col").collectAsList();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier))
- .select("tmp_col")
- .collectAsList();
- Assertions.assertThat(actual)
- .as("Rows must match")
- .containsExactlyInAnyOrderElementsOf(expected);
+ testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
dropTable(tableIdentifier);
}
}
}
+ private void testWithFilter(String filterExpr, TableIdentifier
tableIdentifier) {
+ List<Row> expected =
+
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .select("tmp_col")
+ .filter(filterExpr)
+ .collectAsList();
+ Assertions.assertThat(actual)
+ .as("Rows must match")
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ebd0933a95..b5038f981b 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -2154,22 +2154,32 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
stagingLocation);
// validate we get the expected results back
- List<Row> expected =
spark.table("parquet_table").select("tmp_col").collectAsList();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier))
- .select("tmp_col")
- .collectAsList();
- Assertions.assertThat(actual)
- .as("Rows must match")
- .containsExactlyInAnyOrderElementsOf(expected);
+ testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
dropTable(tableIdentifier);
}
}
}
+ private void testWithFilter(String filterExpr, TableIdentifier
tableIdentifier) {
+ List<Row> expected =
+
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .select("tmp_col")
+ .filter(filterExpr)
+ .collectAsList();
+ Assertions.assertThat(actual)
+ .as("Rows must match")
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 4f585eee51..d37d6a8616 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -2181,20 +2181,30 @@ public abstract class TestIcebergSourceTablesBase
extends TestBase {
stagingLocation);
// validate we get the expected results back
- List<Row> expected =
spark.table("parquet_table").select("tmp_col").collectAsList();
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier))
- .select("tmp_col")
- .collectAsList();
- assertThat(actual).as("Rows must
match").containsExactlyInAnyOrderElementsOf(expected);
+ testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
+ testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')",
tableIdentifier);
dropTable(tableIdentifier);
}
}
}
+ private void testWithFilter(String filterExpr, TableIdentifier
tableIdentifier) {
+ List<Row> expected =
+
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .select("tmp_col")
+ .filter(filterExpr)
+ .collectAsList();
+ assertThat(actual).as("Rows must
match").containsExactlyInAnyOrderElementsOf(expected);
+ }
+
private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =