This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5218f43 Parquet: Fix row group filters with promoted types (#2232)
5218f43 is described below
commit 5218f43c245323e6fb4a5c48cb71848b178b4478
Author: Ryan Blue <[email protected]>
AuthorDate: Fri Feb 12 20:38:57 2021 -0800
Parquet: Fix row group filters with promoted types (#2232)
This fixes Parquet row group filters when types have been promoted from int
to long or from float to double.
The filters are passed the file schema after ids are added, which is used
to convert dictionary values or lower/upper bounds. That conversion currently
uses the file's types to deserialize, but the filter expression is bound to the
table types. If the types differ, then comparison in the evaluator fails.
This updates the conversion to first deserialize the Parquet value and then
promote it if the table's type has changed. Only int to long and float to
double are needed because those are the only type promotions that use a
different representation.
---
.../iceberg/data/TestMetricsRowGroupFilter.java | 9 +++++++++
.../apache/iceberg/parquet/ParquetConversions.java | 15 +++++++++++++++
.../parquet/ParquetDictionaryRowGroupFilter.java | 6 +++++-
.../parquet/ParquetMetricsRowGroupFilter.java | 20 ++++++++++++++++++--
.../parquet/TestDictionaryRowGroupFilter.java | 8 ++++++++
5 files changed, 55 insertions(+), 3 deletions(-)
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index b82d6eb..f03544d 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -812,6 +812,15 @@ public class TestMetricsRowGroupFilter {
Assert.assertTrue("Should read if IN is not evaluated", shouldRead);
}
+ @Test
+ public void testParquetTypePromotion() {
+ Assume.assumeTrue("Only valid for Parquet", format == FileFormat.PARQUET);
+ Schema promotedSchema = new Schema(required(1, "id",
Types.LongType.get()));
+ boolean shouldRead = new ParquetMetricsRowGroupFilter(promotedSchema,
equal("id", INT_MIN_VALUE + 1), true)
+ .shouldRead(parquetSchema, rowGroupMetadata);
+ Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+ }
+
private boolean shouldRead(Expression expression) {
return shouldRead(expression, true);
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index 431c636..78b3a31 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -68,6 +68,21 @@ class ParquetConversions {
}
}
+ static Function<Object, Object> converterFromParquet(PrimitiveType
parquetType, Type icebergType) {
+ Function<Object, Object> fromParquet = converterFromParquet(parquetType);
+ if (icebergType != null) {
+ if (icebergType.typeId() == Type.TypeID.LONG &&
+ parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT32) {
+ return value -> ((Integer) fromParquet.apply(value)).longValue();
+ } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+ parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.FLOAT) {
+ return value -> ((Float) fromParquet.apply(value)).doubleValue();
+ }
+ }
+
+ return fromParquet;
+ }
+
static Function<Object, Object> converterFromParquet(PrimitiveType type) {
if (type.getOriginalType() != null) {
switch (type.getOriginalType()) {
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index d72cf49..37c7d6e 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -37,6 +37,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.NaNUtil;
import org.apache.parquet.column.ColumnDescriptor;
@@ -49,6 +50,7 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
public class ParquetDictionaryRowGroupFilter {
+ private final Schema schema;
private final Expression expr;
public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
@@ -56,6 +58,7 @@ public class ParquetDictionaryRowGroupFilter {
}
public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound,
boolean caseSensitive) {
+ this.schema = schema;
StructType struct = schema.asStruct();
this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound),
caseSensitive);
}
@@ -96,8 +99,9 @@ public class ParquetDictionaryRowGroupFilter {
PrimitiveType colType =
fileSchema.getType(desc.getPath()).asPrimitiveType();
if (colType.getId() != null) {
int id = colType.getId().intValue();
+ Type icebergType = schema.findType(id);
cols.put(id, desc);
- conversions.put(id,
ParquetConversions.converterFromParquet(colType));
+ conversions.put(id, ParquetConversions.converterFromParquet(colType,
icebergType));
}
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index ee026cc..f83d701 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -77,7 +77,7 @@ public class ParquetMetricsRowGroupFilter {
private static final boolean ROWS_CANNOT_MATCH = false;
private class MetricsEvalVisitor extends BoundExpressionVisitor<Boolean> {
- private Map<Integer, Statistics> stats = null;
+ private Map<Integer, Statistics<?>> stats = null;
private Map<Integer, Long> valueCounts = null;
private Map<Integer, Function<Object, Object>> conversions = null;
@@ -93,9 +93,10 @@ public class ParquetMetricsRowGroupFilter {
PrimitiveType colType =
fileSchema.getType(col.getPath().toArray()).asPrimitiveType();
if (colType.getId() != null) {
int id = colType.getId().intValue();
+ Type icebergType = schema.findType(id);
stats.put(id, col.getStatistics());
valueCounts.put(id, col.getValueCount());
- conversions.put(id,
ParquetConversions.converterFromParquet(colType));
+ conversions.put(id, ParquetConversions.converterFromParquet(colType,
icebergType));
}
}
@@ -502,4 +503,19 @@ public class ParquetMetricsRowGroupFilter {
return statistics.getNumNulls() < valueCount &&
(statistics.getMaxBytes() == null || statistics.getMinBytes() == null);
}
+
+ private static Function<Object, Object> converterFor(PrimitiveType
parquetType, Type icebergType) {
+ Function<Object, Object> fromParquet =
ParquetConversions.converterFromParquet(parquetType);
+ if (icebergType != null) {
+ if (icebergType.typeId() == Type.TypeID.LONG &&
+ parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT32) {
+ return value -> ((Integer) fromParquet.apply(value)).longValue();
+ } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+ parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.FLOAT) {
+ return value -> ((Float) fromParquet.apply(value)).doubleValue();
+ }
+ }
+
+ return fromParquet;
+ }
}
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index a5e7a35..c02bf27 100644
---
a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -884,4 +884,12 @@ public class TestDictionaryRowGroupFilter {
.shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
Assert.assertFalse("Should not read: notIn on no nulls column (empty
string is within the set)", shouldRead);
}
+
+ @Test
+ public void testTypePromotion() {
+ Schema promotedSchema = new Schema(required(1, "id", LongType.get()));
+ boolean shouldRead = new ParquetDictionaryRowGroupFilter(promotedSchema,
equal("id", INT_MIN_VALUE + 1), true)
+ .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
+ Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+ }
}