This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5218f43  Parquet: Fix row group filters with promoted types (#2232)
5218f43 is described below

commit 5218f43c245323e6fb4a5c48cb71848b178b4478
Author: Ryan Blue <[email protected]>
AuthorDate: Fri Feb 12 20:38:57 2021 -0800

    Parquet: Fix row group filters with promoted types (#2232)
    
    This fixes Parquet row group filters when types have been promoted from int 
to long or from float to double.
    
    The filters are passed the file schema after ids are added, which is used 
to convert dictionary values or lower/upper bounds. That conversion currently 
uses the file's types to deserialize, but the filter expression is bound to the 
table types. If the types differ, then comparison in the evaluator fails.
    
    This updates the conversion to first deserialize the Parquet value and then 
promote it if the table's type has changed. Only int to long and float to 
double are needed because those are the only type promotions that use a 
different representation.
---
 .../iceberg/data/TestMetricsRowGroupFilter.java      |  9 +++++++++
 .../apache/iceberg/parquet/ParquetConversions.java   | 15 +++++++++++++++
 .../parquet/ParquetDictionaryRowGroupFilter.java     |  6 +++++-
 .../parquet/ParquetMetricsRowGroupFilter.java        | 20 ++++++++++++++++++--
 .../parquet/TestDictionaryRowGroupFilter.java        |  8 ++++++++
 5 files changed, 55 insertions(+), 3 deletions(-)

diff --git 
a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java 
b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index b82d6eb..f03544d 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -812,6 +812,15 @@ public class TestMetricsRowGroupFilter {
     Assert.assertTrue("Should read if IN is not evaluated", shouldRead);
   }
 
+  @Test
+  public void testParquetTypePromotion() {
+    Assume.assumeTrue("Only valid for Parquet", format == FileFormat.PARQUET);
+    Schema promotedSchema = new Schema(required(1, "id", 
Types.LongType.get()));
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(promotedSchema, 
equal("id", INT_MIN_VALUE + 1), true)
+        .shouldRead(parquetSchema, rowGroupMetadata);
+    Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+  }
+
   private boolean shouldRead(Expression expression) {
     return shouldRead(expression, true);
   }
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index 431c636..78b3a31 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -68,6 +68,21 @@ class ParquetConversions {
     }
   }
 
+  static Function<Object, Object> converterFromParquet(PrimitiveType 
parquetType, Type icebergType) {
+    Function<Object, Object> fromParquet = converterFromParquet(parquetType);
+    if (icebergType != null) {
+      if (icebergType.typeId() == Type.TypeID.LONG &&
+          parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT32) {
+        return value -> ((Integer) fromParquet.apply(value)).longValue();
+      } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+          parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return value -> ((Float) fromParquet.apply(value)).doubleValue();
+      }
+    }
+
+    return fromParquet;
+  }
+
   static Function<Object, Object> converterFromParquet(PrimitiveType type) {
     if (type.getOriginalType() != null) {
       switch (type.getOriginalType()) {
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index d72cf49..37c7d6e 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -37,6 +37,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.NaNUtil;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -49,6 +50,7 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 
 public class ParquetDictionaryRowGroupFilter {
+  private final Schema schema;
   private final Expression expr;
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
@@ -56,6 +58,7 @@ public class ParquetDictionaryRowGroupFilter {
   }
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, 
boolean caseSensitive) {
+    this.schema = schema;
     StructType struct = schema.asStruct();
     this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), 
caseSensitive);
   }
@@ -96,8 +99,9 @@ public class ParquetDictionaryRowGroupFilter {
         PrimitiveType colType = 
fileSchema.getType(desc.getPath()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
+          Type icebergType = schema.findType(id);
           cols.put(id, desc);
-          conversions.put(id, 
ParquetConversions.converterFromParquet(colType));
+          conversions.put(id, ParquetConversions.converterFromParquet(colType, 
icebergType));
         }
       }
 
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index ee026cc..f83d701 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -77,7 +77,7 @@ public class ParquetMetricsRowGroupFilter {
   private static final boolean ROWS_CANNOT_MATCH = false;
 
   private class MetricsEvalVisitor extends BoundExpressionVisitor<Boolean> {
-    private Map<Integer, Statistics> stats = null;
+    private Map<Integer, Statistics<?>> stats = null;
     private Map<Integer, Long> valueCounts = null;
     private Map<Integer, Function<Object, Object>> conversions = null;
 
@@ -93,9 +93,10 @@ public class ParquetMetricsRowGroupFilter {
         PrimitiveType colType = 
fileSchema.getType(col.getPath().toArray()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
+          Type icebergType = schema.findType(id);
           stats.put(id, col.getStatistics());
           valueCounts.put(id, col.getValueCount());
-          conversions.put(id, 
ParquetConversions.converterFromParquet(colType));
+          conversions.put(id, ParquetConversions.converterFromParquet(colType, 
icebergType));
         }
       }
 
@@ -502,4 +503,19 @@ public class ParquetMetricsRowGroupFilter {
     return statistics.getNumNulls() < valueCount &&
         (statistics.getMaxBytes() == null || statistics.getMinBytes() == null);
   }
+
+  private static Function<Object, Object> converterFor(PrimitiveType 
parquetType, Type icebergType) {
+    Function<Object, Object> fromParquet = 
ParquetConversions.converterFromParquet(parquetType);
+    if (icebergType != null) {
+      if (icebergType.typeId() == Type.TypeID.LONG &&
+          parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT32) {
+        return value -> ((Integer) fromParquet.apply(value)).longValue();
+      } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+          parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return value -> ((Float) fromParquet.apply(value)).doubleValue();
+      }
+    }
+
+    return fromParquet;
+  }
 }
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index a5e7a35..c02bf27 100644
--- 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -884,4 +884,12 @@ public class TestDictionaryRowGroupFilter {
         .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
     Assert.assertFalse("Should not read: notIn on no nulls column (empty 
string is within the set)", shouldRead);
   }
+
+  @Test
+  public void testTypePromotion() {
+    Schema promotedSchema = new Schema(required(1, "id", LongType.get()));
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(promotedSchema, 
equal("id", INT_MIN_VALUE + 1), true)
+        .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
+    Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+  }
 }

Reply via email to