This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f29e2  Support filtering based on nested struct fields (#123)
81f29e2 is described below

commit 81f29e2f80cc2e6b3b6f715573893aff696f579a
Author: Gautam <[email protected]>
AuthorDate: Tue May 28 12:40:04 2019 -0700

    Support filtering based on nested struct fields (#123)
---
 api/src/main/java/org/apache/iceberg/Accessor.java |  29 ++
 .../main/java/org/apache/iceberg/Accessors.java    | 228 +++++++++++++
 api/src/main/java/org/apache/iceberg/Schema.java   |  45 ++-
 .../apache/iceberg/expressions/BoundReference.java |  38 +--
 .../expressions/InclusiveManifestEvaluator.java    |  25 +-
 .../expressions/InclusiveMetricsEvaluator.java     |  28 +-
 .../iceberg/expressions/UnboundPredicate.java      |  22 +-
 .../apache/iceberg/expressions/TestEvaluatior.java | 174 ----------
 .../apache/iceberg/expressions/TestEvaluator.java  | 365 +++++++++++++++++++++
 .../parquet/ParquetDictionaryRowGroupFilter.java   |  12 -
 .../parquet/ParquetMetricsRowGroupFilter.java      |  14 -
 .../parquet/TestDictionaryRowGroupFilter.java      | 171 +++++++++-
 .../iceberg/parquet/TestMetricsRowGroupFilter.java | 166 +++++++++-
 13 files changed, 1038 insertions(+), 279 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Accessor.java 
b/api/src/main/java/org/apache/iceberg/Accessor.java
new file mode 100644
index 0000000..37f9f14
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/Accessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import org.apache.iceberg.types.Type;
+
+public interface Accessor<T> extends Serializable {
+  Object get(T container);
+
+  Type type();
+}
diff --git a/api/src/main/java/org/apache/iceberg/Accessors.java 
b/api/src/main/java/org/apache/iceberg/Accessors.java
new file mode 100644
index 0000000..0e0d275
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/Accessors.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+
+public class Accessors {
+  private Accessors() {
+  }
+
+  public static Integer toPosition(Accessor<StructLike> accessor) {
+    if (accessor instanceof PositionAccessor) {
+      return ((PositionAccessor) accessor).position();
+    }
+    throw new IllegalArgumentException("Cannot convert nested accessor to 
position");
+  }
+
+  static Map<Integer, Accessor<StructLike>> forSchema(Schema schema) {
+    return TypeUtil.visit(schema, new BuildPositionAccessors());
+  }
+
+  private static class PositionAccessor implements Accessor<StructLike> {
+    private int position;
+    private final Type type;
+    private final Class<?> javaClass;
+
+    PositionAccessor(int pos, Type type) {
+      this.position = pos;
+      this.type = type;
+      this.javaClass = type.typeId().javaClass();
+    }
+
+    @Override
+    public Object get(StructLike row) {
+      return row.get(position, javaClass);
+    }
+
+    @Override
+    public Type type() {
+      return type;
+    }
+
+    public int position() {
+      return position;
+    }
+
+    public Class<?> javaClass() {
+      return javaClass;
+    }
+
+    @Override
+    public String toString() {
+      return "Accessor(positions=[" + position + "], type=" + type + ")";
+    }
+  }
+
+  private static class Position2Accessor implements Accessor<StructLike> {
+    private final int p0;
+    private final int p1;
+    private final Type type;
+    private final Class<?> javaClass;
+
+    Position2Accessor(int pos, PositionAccessor wrapped) {
+      this.p0 = pos;
+      this.p1 = wrapped.position();
+      this.type = wrapped.type();
+      this.javaClass = wrapped.javaClass();
+    }
+
+    @Override
+    public Object get(StructLike row) {
+      return row.get(p0, StructLike.class).get(p1, javaClass);
+    }
+
+    @Override
+    public Type type() {
+      return type;
+    }
+
+    public Class<?> javaClass() {
+      return javaClass;
+    }
+
+    @Override
+    public String toString() {
+      return "Accessor(positions=[" + p0 + ", " + p1 + "], type=" + type + ")";
+    }
+  }
+
+  private static class Position3Accessor implements Accessor<StructLike> {
+    private final int p0;
+    private final int p1;
+    private final int p2;
+    private final Type type;
+    private final Class<?> javaClass;
+
+    Position3Accessor(int pos, Position2Accessor wrapped) {
+      this.p0 = pos;
+      this.p1 = wrapped.p0;
+      this.p2 = wrapped.p1;
+      this.type = wrapped.type();
+      this.javaClass = wrapped.javaClass();
+    }
+
+    @Override
+    public Object get(StructLike row) {
+      return row.get(p0, StructLike.class).get(p1, StructLike.class).get(p2, 
javaClass);
+    }
+
+    @Override
+    public Type type() {
+      return type;
+    }
+
+    @Override
+    public String toString() {
+      return "Accessor(positions=[" + p0 + ", " + p1 + ", " + p2 + "], type=" 
+ type + ")";
+    }
+  }
+
+  private static class WrappedPositionAccessor implements Accessor<StructLike> 
{
+    private final int position;
+    private final Accessor<StructLike> accessor;
+
+    WrappedPositionAccessor(int pos, Accessor<StructLike> accessor) {
+      this.position = pos;
+      this.accessor = accessor;
+    }
+
+    @Override
+    public Object get(StructLike row) {
+      StructLike inner = row.get(position, StructLike.class);
+      if (inner != null) {
+        return accessor.get(inner);
+      }
+      return null;
+    }
+
+    @Override
+    public Type type() {
+      return accessor.type();
+    }
+
+    @Override
+    public String toString() {
+      return "WrappedAccessor(position=" + position + ", wrapped=" + accessor 
+ ")";
+    }
+  }
+
+  private static Accessor<StructLike> newAccessor(int pos, Type type) {
+    return new PositionAccessor(pos, type);
+  }
+
+  private static Accessor<StructLike> newAccessor(int pos, boolean isOptional,
+                                                  Accessor<StructLike> 
accessor) {
+    if (isOptional) {
+      // the wrapped position handles null layers
+      return new WrappedPositionAccessor(pos, accessor);
+    } else if (accessor instanceof PositionAccessor) {
+      return new Position2Accessor(pos, (PositionAccessor) accessor);
+    } else if (accessor instanceof Position2Accessor) {
+      return new Position3Accessor(pos, (Position2Accessor) accessor);
+    } else {
+      return new WrappedPositionAccessor(pos, accessor);
+    }
+  }
+
+  private static class BuildPositionAccessors extends 
TypeUtil.SchemaVisitor<Map<Integer, Accessor<StructLike>>> {
+
+    @Override
+    public Map<Integer, Accessor<StructLike>> schema(
+        Schema schema, Map<Integer, Accessor<StructLike>> structResult) {
+      return structResult;
+    }
+
+    @Override
+    public Map<Integer, Accessor<StructLike>> struct(
+        Types.StructType struct, List<Map<Integer, Accessor<StructLike>>> 
fieldResults) {
+      Map<Integer, Accessor<StructLike>> accessors = Maps.newHashMap();
+      List<Types.NestedField> fields = struct.fields();
+      for (int i = 0; i < fieldResults.size(); i += 1) {
+        Types.NestedField field = fields.get(i);
+        Map<Integer, Accessor<StructLike>> result = fieldResults.get(i);
+        if (result != null) {
+          for (Map.Entry<Integer, Accessor<StructLike>> entry : 
result.entrySet()) {
+            accessors.put(entry.getKey(), newAccessor(i, field.isOptional(), 
entry.getValue()));
+          }
+        } else {
+          accessors.put(field.fieldId(), newAccessor(i, field.type()));
+        }
+      }
+
+      if (accessors.isEmpty()) {
+        return null;
+      }
+
+      return accessors;
+    }
+
+    @Override
+    public Map<Integer, Accessor<StructLike>> field(
+        Types.NestedField field, Map<Integer, Accessor<StructLike>> 
fieldResult) {
+      return fieldResult;
+    }
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java 
b/api/src/main/java/org/apache/iceberg/Schema.java
index 1915728..dbfbe28 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -48,6 +48,7 @@ public class Schema implements Serializable {
   private transient Map<Integer, Types.NestedField> idToField = null;
   private transient BiMap<String, Integer> nameToId = null;
   private transient BiMap<String, Integer> lowerCaseNameToId = null;
+  private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
 
   public Schema(List<Types.NestedField> columns, Map<String, Integer> aliases) 
{
     this.struct = Types.StructType.of(columns);
@@ -58,6 +59,10 @@ public class Schema implements Serializable {
     this.struct = Types.StructType.of(columns);
   }
 
+  public Schema(Types.NestedField... columns) {
+    this(Arrays.asList(columns));
+  }
+
   private Map<Integer, Types.NestedField> lazyIdToField() {
     if (idToField == null) {
       this.idToField = TypeUtil.indexById(struct);
@@ -79,8 +84,11 @@ public class Schema implements Serializable {
     return lowerCaseNameToId;
   }
 
-  public Schema(Types.NestedField... columns) {
-    this(Arrays.asList(columns));
+  private Map<Integer, Accessor<StructLike>> lazyIdToAccessor() {
+    if (idToAccessor == null) {
+      idToAccessor = Accessors.forSchema(this);
+    }
+    return idToAccessor;
   }
 
   /**
@@ -141,9 +149,9 @@ public class Schema implements Serializable {
   }
 
   /**
-   * Returns a sub-field field by name as a {@link Types.NestedField}.
+   * Returns a sub-field by name as a {@link Types.NestedField}.
    * <p>
-   * The result may be a nested field.
+   * The result may be a top-level or a nested field.
    *
    * @param name a String name
    * @return a Type for the sub-field or null if it is not found
@@ -158,6 +166,23 @@ public class Schema implements Serializable {
   }
 
   /**
+   * Returns a sub-field by name as a {@link Types.NestedField}.
+   * <p>
+   * The result may be a top-level or a nested field.
+   *
+   * @param name a String name
+   * @return the sub-field or null if it is not found
+   */
+  public Types.NestedField caseInsensitiveFindField(String name) {
+    Preconditions.checkArgument(!name.isEmpty(), "Invalid column name: 
(empty)");
+    Integer id = lazyLowerCaseNameToId().get(name.toLowerCase(Locale.ROOT));
+    if (id != null) {
+      return lazyIdToField().get(id);
+    }
+    return null;
+  }
+
+  /**
    * Returns the full column name for the given id.
    *
    * @param id a field id
@@ -196,6 +221,18 @@ public class Schema implements Serializable {
   }
 
   /**
+   * Return an accessor for retrieving the data from {@link StructLike}.
+   * <p>
+   * Accessors do not retrieve data contained in lists or maps.
+   *
+   * @param id a column id in this schema
+   * @return an {@link Accessor} to retrieve values from a {@link StructLike} 
row
+   */
+  public Accessor<StructLike> accessorForField(int id) {
+    return lazyIdToAccessor().get(id);
+  }
+
+  /**
    * Creates a projection schema for a subset of columns, selected by name.
    * <p>
    * Names that identify nested fields will select part or all of the field's 
top-level column.
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java 
b/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
index 192d365..f917a8f 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
@@ -19,57 +19,39 @@
 
 package org.apache.iceberg.expressions;
 
-import java.util.List;
+import org.apache.iceberg.Accessor;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types;
 
 public class BoundReference<T> implements Reference {
   private final int fieldId;
-  private final Type type;
-  private final int pos;
+  private final Accessor<StructLike> accessor;
 
-  BoundReference(Types.StructType struct, int fieldId) {
+  BoundReference(int fieldId, Accessor<StructLike> accessor) {
     this.fieldId = fieldId;
-    this.pos = find(fieldId, struct);
-    this.type = struct.fields().get(pos).type();
-  }
-
-  private int find(int id, Types.StructType struct) {
-    List<Types.NestedField> fields = struct.fields();
-    for (int i = 0; i < fields.size(); i += 1) {
-      if (fields.get(i).fieldId() == id) {
-        return i;
-      }
-    }
-    throw new ValidationException(
-        "Cannot find top-level field id %d in struct: %s", id, struct);
+    this.accessor = accessor;
   }
 
   public Type type() {
-    return type;
+    return accessor.type();
   }
 
   public int fieldId() {
     return fieldId;
   }
 
-  public int pos() {
-    return pos;
+  public Accessor<StructLike> accessor() {
+    return accessor;
   }
 
+  @SuppressWarnings("unchecked")
   public T get(StructLike struct) {
-    return struct.get(pos, javaType());
+    return (T) accessor.get(struct);
   }
 
   @Override
   public String toString() {
-    return String.format("ref(id=%d, pos=%d, type=%s)", fieldId, pos, type);
+    return String.format("ref(id=%d, accessor-type=%s)", fieldId, 
accessor.type());
   }
 
-  @SuppressWarnings("unchecked")
-  private Class<T> javaType() {
-    return (Class<T>) type.asPrimitiveType().typeId().javaClass();
-  }
 }
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
index 3d8fc68..7550ea5 100644
--- 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
+++ 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.expressions;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.iceberg.Accessors;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import 
org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types.StructType;
@@ -42,6 +44,7 @@ import static 
org.apache.iceberg.expressions.Expressions.rewriteNot;
  */
 public class InclusiveManifestEvaluator {
   private final StructType struct;
+  private final Schema schema;
   private final Expression expr;
   private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
 
@@ -62,6 +65,7 @@ public class InclusiveManifestEvaluator {
       struct,
       rewriteNot(Projections.inclusive(spec, 
caseSensitive).project(rowFilter)),
       caseSensitive);
+    this.schema = new Schema(struct.fields());
   }
 
   /**
@@ -116,9 +120,10 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean isNull(BoundReference<T> ref) {
+      int pos = Accessors.toPosition(ref.accessor());
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no null values, the expression cannot match
-      if (!stats.get(ref.pos()).containsNull()) {
+      if (!stats.get(pos).containsNull()) {
         return ROWS_CANNOT_MATCH;
       }
 
@@ -127,9 +132,10 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean notNull(BoundReference<T> ref) {
+      int pos = Accessors.toPosition(ref.accessor());
       // containsNull encodes whether at least one partition value is null, 
lowerBound is null if
       // all partition values are null.
-      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      ByteBuffer lowerBound = stats.get(pos).lowerBound();
       if (lowerBound == null) {
         return ROWS_CANNOT_MATCH; // all values are null
       }
@@ -139,7 +145,8 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
-      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      int pos = Accessors.toPosition(ref.accessor());
+      ByteBuffer lowerBound = stats.get(pos).lowerBound();
       if (lowerBound == null) {
         return ROWS_CANNOT_MATCH; // values are all null
       }
@@ -156,7 +163,8 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
-      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      int pos = Accessors.toPosition(ref.accessor());
+      ByteBuffer lowerBound = stats.get(pos).lowerBound();
       if (lowerBound == null) {
         return ROWS_CANNOT_MATCH; // values are all null
       }
@@ -173,7 +181,8 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
-      ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+      int pos = Accessors.toPosition(ref.accessor());
+      ByteBuffer upperBound = stats.get(pos).upperBound();
       if (upperBound == null) {
         return ROWS_CANNOT_MATCH; // values are all null
       }
@@ -190,7 +199,8 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
-      ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+      int pos = Accessors.toPosition(ref.accessor());
+      ByteBuffer upperBound = stats.get(pos).upperBound();
       if (upperBound == null) {
         return ROWS_CANNOT_MATCH; // values are all null
       }
@@ -207,7 +217,8 @@ public class InclusiveManifestEvaluator {
 
     @Override
     public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
-      PartitionFieldSummary fieldStats = stats.get(ref.pos());
+      int pos = Accessors.toPosition(ref.accessor());
+      PartitionFieldSummary fieldStats = stats.get(pos);
       if (fieldStats.lowerBound() == null) {
         return ROWS_CANNOT_MATCH; // values are all null and literal cannot 
contain null
       }
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 81e7756..64d34e9 100644
--- 
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ 
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -19,14 +19,12 @@
 
 package org.apache.iceberg.expressions;
 
-import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Schema;
 import 
org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
 import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
 
 import static org.apache.iceberg.expressions.Expressions.rewriteNot;
@@ -128,8 +126,6 @@ public class InclusiveMetricsEvaluator {
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no null values, the expression cannot match
       Integer id = ref.fieldId();
-      Preconditions.checkNotNull(struct.field(id),
-          "Cannot filter by nested column: %s", schema.findField(id));
 
       if (nullCounts != null && nullCounts.containsKey(id) && 
nullCounts.get(id) == 0) {
         return ROWS_CANNOT_MATCH;
@@ -143,8 +139,6 @@ public class InclusiveMetricsEvaluator {
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no non-null values, the expression cannot match
       Integer id = ref.fieldId();
-      Preconditions.checkNotNull(struct.field(id),
-          "Cannot filter by nested column: %s", schema.findField(id));
 
       if (valueCounts != null && valueCounts.containsKey(id) &&
           nullCounts != null && nullCounts.containsKey(id) &&
@@ -158,11 +152,9 @@ public class InclusiveMetricsEvaluator {
     @Override
     public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(field.type(), 
lowerBounds.get(id));
+        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
 
         int cmp = lit.comparator().compare(lower, lit.value());
         if (cmp >= 0) {
@@ -176,11 +168,9 @@ public class InclusiveMetricsEvaluator {
     @Override
     public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(field.type(), 
lowerBounds.get(id));
+        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
 
         int cmp = lit.comparator().compare(lower, lit.value());
         if (cmp > 0) {
@@ -194,11 +184,9 @@ public class InclusiveMetricsEvaluator {
     @Override
     public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(field.type(), 
upperBounds.get(id));
+        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
 
         int cmp = lit.comparator().compare(upper, lit.value());
         if (cmp <= 0) {
@@ -212,11 +200,9 @@ public class InclusiveMetricsEvaluator {
     @Override
     public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(field.type(), 
upperBounds.get(id));
+        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
 
         int cmp = lit.comparator().compare(upper, lit.value());
         if (cmp < 0) {
@@ -230,11 +216,9 @@ public class InclusiveMetricsEvaluator {
     @Override
     public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       if (lowerBounds != null && lowerBounds.containsKey(id)) {
-        T lower = Conversions.fromByteBuffer(struct.field(id).type(), 
lowerBounds.get(id));
+        T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
 
         int cmp = lit.comparator().compare(lower, lit.value());
         if (cmp > 0) {
@@ -243,7 +227,7 @@ public class InclusiveMetricsEvaluator {
       }
 
       if (upperBounds != null && upperBounds.containsKey(id)) {
-        T upper = Conversions.fromByteBuffer(field.type(), 
upperBounds.get(id));
+        T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
 
         int cmp = lit.comparator().compare(upper, lit.value());
         if (cmp < 0) {
diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java 
b/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java
index e5dd33c..13fdffb 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.expressions;
 
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.types.Types;
 
@@ -66,15 +67,13 @@ public class UnboundPredicate<T> extends Predicate<T, 
NamedReference> {
    * @throws ValidationException if literals do not match bound references, or 
if comparison on expression is invalid
    */
   public Expression bind(Types.StructType struct, boolean caseSensitive) {
-    Types.NestedField field;
-    if (caseSensitive) {
-      field = struct.field(ref().name());
-    } else {
-      field = struct.caseInsensitiveField(ref().name());
-    }
+    Schema schema = new Schema(struct.fields());
+    Types.NestedField field = caseSensitive ?
+        schema.findField(ref().name()) :
+        schema.caseInsensitiveFindField(ref().name());
 
     ValidationException.check(field != null,
-        "Cannot find field '%s' in struct: %s", ref().name(), struct);
+        "Cannot find field '%s' in struct: %s", ref().name(), 
schema.asStruct());
 
     if (literal() == null) {
       switch (op()) {
@@ -82,12 +81,14 @@ public class UnboundPredicate<T> extends Predicate<T, 
NamedReference> {
           if (field.isRequired()) {
             return Expressions.alwaysFalse();
           }
-          return new BoundPredicate<>(IS_NULL, new BoundReference<>(struct, 
field.fieldId()));
+          return new BoundPredicate<>(IS_NULL, new 
BoundReference<>(field.fieldId(),
+              schema.accessorForField(field.fieldId())));
         case NOT_NULL:
           if (field.isRequired()) {
             return Expressions.alwaysTrue();
           }
-          return new BoundPredicate<>(NOT_NULL, new BoundReference<>(struct, 
field.fieldId()));
+          return new BoundPredicate<>(NOT_NULL, new 
BoundReference<>(field.fieldId(),
+              schema.accessorForField(field.fieldId())));
         default:
           throw new ValidationException("Operation must be IS_NULL or 
NOT_NULL");
       }
@@ -130,6 +131,7 @@ public class UnboundPredicate<T> extends Predicate<T, 
NamedReference> {
 //          break;
       }
     }
-    return new BoundPredicate<>(op(), new BoundReference<>(struct, 
field.fieldId()), lit);
+    return new BoundPredicate<>(op(), new BoundReference<>(field.fieldId(),
+        schema.accessorForField(field.fieldId())), lit);
   }
 }
diff --git 
a/api/src/test/java/org/apache/iceberg/expressions/TestEvaluatior.java 
b/api/src/test/java/org/apache/iceberg/expressions/TestEvaluatior.java
deleted file mode 100644
index a0aacef..0000000
--- a/api/src/test/java/org/apache/iceberg/expressions/TestEvaluatior.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.expressions;
-
-import org.apache.avro.util.Utf8;
-import org.apache.iceberg.TestHelpers;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.StructType;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
-import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
-import static org.apache.iceberg.expressions.Expressions.and;
-import static org.apache.iceberg.expressions.Expressions.equal;
-import static org.apache.iceberg.expressions.Expressions.greaterThan;
-import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
-import static org.apache.iceberg.expressions.Expressions.isNull;
-import static org.apache.iceberg.expressions.Expressions.lessThan;
-import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
-import static org.apache.iceberg.expressions.Expressions.not;
-import static org.apache.iceberg.expressions.Expressions.notEqual;
-import static org.apache.iceberg.expressions.Expressions.notNull;
-import static org.apache.iceberg.expressions.Expressions.or;
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-public class TestEvaluatior {
-  private static final StructType STRUCT = StructType.of(
-      required(13, "x", Types.IntegerType.get()),
-      required(14, "y", Types.IntegerType.get()),
-      optional(15, "z", Types.IntegerType.get())
-  );
-
-  @Test
-  public void testLessThan() {
-    Evaluator evaluator = new Evaluator(STRUCT, lessThan("x", 7));
-    Assert.assertFalse("7 < 7 => false", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
-    Assert.assertTrue("6 < 7 => true", evaluator.eval(TestHelpers.Row.of(6, 8, 
null)));
-  }
-
-  @Test
-  public void testLessThanOrEqual() {
-    Evaluator evaluator = new Evaluator(STRUCT, lessThanOrEqual("x", 7));
-    Assert.assertTrue("7 <= 7 => true", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
-    Assert.assertTrue("6 <= 7 => true", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
-    Assert.assertFalse("8 <= 7 => false", evaluator.eval(TestHelpers.Row.of(8, 
8, null)));
-  }
-
-  @Test
-  public void testGreaterThan() {
-    Evaluator evaluator = new Evaluator(STRUCT, greaterThan("x", 7));
-    Assert.assertFalse("7 > 7 => false", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
-    Assert.assertFalse("6 > 7 => false", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
-    Assert.assertTrue("8 > 7 => true", evaluator.eval(TestHelpers.Row.of(8, 8, 
null)));
-  }
-
-  @Test
-  public void testGreaterThanOrEqual() {
-    Evaluator evaluator = new Evaluator(STRUCT, greaterThanOrEqual("x", 7));
-    Assert.assertTrue("7 >= 7 => true", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
-    Assert.assertFalse("6 >= 7 => false", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
-    Assert.assertTrue("8 >= 7 => true", evaluator.eval(TestHelpers.Row.of(8, 
8, null)));
-  }
-
-  @Test
-  public void testEqual() {
-    Evaluator evaluator = new Evaluator(STRUCT, equal("x", 7));
-    Assert.assertTrue("7 == 7 => true", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
-    Assert.assertFalse("6 == 7 => false", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
-  }
-
-  @Test
-  public void testNotEqual() {
-    Evaluator evaluator = new Evaluator(STRUCT, notEqual("x", 7));
-    Assert.assertFalse("7 != 7 => false", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
-    Assert.assertTrue("6 != 7 => true", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
-  }
-
-  @Test
-  public void testAlwaysTrue() {
-    Evaluator evaluator = new Evaluator(STRUCT, alwaysTrue());
-    Assert.assertTrue("always true", evaluator.eval(TestHelpers.Row.of()));
-  }
-
-  @Test
-  public void testAlwaysFalse() {
-    Evaluator evaluator = new Evaluator(STRUCT, alwaysFalse());
-    Assert.assertFalse("always false", evaluator.eval(TestHelpers.Row.of()));
-  }
-
-  @Test
-  public void testIsNull() {
-    Evaluator evaluator = new Evaluator(STRUCT, isNull("z"));
-    Assert.assertTrue("null is null", evaluator.eval(TestHelpers.Row.of(1, 2, 
null)));
-    Assert.assertFalse("3 is not null", evaluator.eval(TestHelpers.Row.of(1, 
2, 3)));
-  }
-
-  @Test
-  public void testNotNull() {
-    Evaluator evaluator = new Evaluator(STRUCT, notNull("z"));
-    Assert.assertFalse("null is null", evaluator.eval(TestHelpers.Row.of(1, 2, 
null)));
-    Assert.assertTrue("3 is not null", evaluator.eval(TestHelpers.Row.of(1, 2, 
3)));
-  }
-
-  @Test
-  public void testAnd() {
-    Evaluator evaluator = new Evaluator(STRUCT, and(equal("x", 7), 
notNull("z")));
-    Assert.assertTrue("7, 3 => true", evaluator.eval(TestHelpers.Row.of(7, 0, 
3)));
-    Assert.assertFalse("8, 3 => false", evaluator.eval(TestHelpers.Row.of(8, 
0, 3)));
-    Assert.assertFalse("7, null => false", 
evaluator.eval(TestHelpers.Row.of(7, 0, null)));
-    Assert.assertFalse("8, null => false", 
evaluator.eval(TestHelpers.Row.of(8, 0, null)));
-  }
-
-  @Test
-  public void testOr() {
-    Evaluator evaluator = new Evaluator(STRUCT, or(equal("x", 7), 
notNull("z")));
-    Assert.assertTrue("7, 3 => true", evaluator.eval(TestHelpers.Row.of(7, 0, 
3)));
-    Assert.assertTrue("8, 3 => true", evaluator.eval(TestHelpers.Row.of(8, 0, 
3)));
-    Assert.assertTrue("7, null => true", evaluator.eval(TestHelpers.Row.of(7, 
0, null)));
-    Assert.assertFalse("8, null => false", 
evaluator.eval(TestHelpers.Row.of(8, 0, null)));
-  }
-
-  @Test
-  public void testNot() {
-    Evaluator evaluator = new Evaluator(STRUCT, not(equal("x", 7)));
-    Assert.assertFalse("not(7 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(7)));
-    Assert.assertTrue("not(8 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(8)));
-  }
-
-  @Test
-  public void testCaseInsensitiveNot() {
-    Evaluator evaluator = new Evaluator(STRUCT, not(equal("X", 7)), false);
-    Assert.assertFalse("not(7 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(7)));
-    Assert.assertTrue("not(8 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(8)));
-  }
-
-  @Test
-  public void testCaseSensitiveNot() {
-    TestHelpers.assertThrows(
-        "X != x when case sensitivity is on",
-        ValidationException.class,
-        "Cannot find field 'X' in struct",
-        () -> new Evaluator(STRUCT, not(equal("X", 7)), true));
-  }
-
-  @Test
-  public void testCharSeqValue() {
-    StructType struct = StructType.of(required(34, "s", 
Types.StringType.get()));
-    Evaluator evaluator = new Evaluator(struct, equal("s", "abc"));
-    Assert.assertTrue("string(abc) == utf8(abc) => true",
-        evaluator.eval(TestHelpers.Row.of(new Utf8("abc"))));
-    Assert.assertFalse("string(abc) == utf8(abcd) => false",
-        evaluator.eval(TestHelpers.Row.of(new Utf8("abcd"))));
-  }
-}
diff --git 
a/api/src/test/java/org/apache/iceberg/expressions/TestEvaluator.java 
b/api/src/test/java/org/apache/iceberg/expressions/TestEvaluator.java
new file mode 100644
index 0000000..42adb9a
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestEvaluator.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import org.apache.avro.util.Utf8;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.StructType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
+import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThan;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notEqual;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestEvaluator {
+  private static final StructType STRUCT = StructType.of(
+      required(13, "x", Types.IntegerType.get()),
+      required(14, "y", Types.IntegerType.get()),
+      optional(15, "z", Types.IntegerType.get()),
+      optional(16, "s1", Types.StructType.of(
+          Types.NestedField.required(17, "s2", Types.StructType.of(
+              Types.NestedField.required(18, "s3", Types.StructType.of(
+                  Types.NestedField.required(19, "s4", Types.StructType.of(
+                      Types.NestedField.required(20, "i", 
Types.IntegerType.get()))))))))));
+
+  @Test
+  public void testLessThan() {
+    Evaluator evaluator = new Evaluator(STRUCT, lessThan("x", 7));
+    Assert.assertFalse("7 < 7 => false", evaluator.eval(TestHelpers.Row.of(7, 
8, null, null)));
+    Assert.assertTrue("6 < 7 => true", evaluator.eval(TestHelpers.Row.of(6, 8, 
null, null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
lessThan("s1.s2.s3.s4.i", 7));
+    Assert.assertFalse("7 < 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(7)))))));
+    Assert.assertTrue("6 < 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(6, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(6)))))));
+  }
+
+  @Test
+  public void testLessThanOrEqual() {
+    Evaluator evaluator = new Evaluator(STRUCT, lessThanOrEqual("x", 7));
+    Assert.assertTrue("7 <= 7 => true", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
+    Assert.assertTrue("6 <= 7 => true", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
+    Assert.assertFalse("8 <= 7 => false", evaluator.eval(TestHelpers.Row.of(8, 
8, null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
lessThanOrEqual("s1.s2.s3.s4.i", 7));
+    Assert.assertTrue("7 <= 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(7)))))));
+
+    Assert.assertTrue("6 <= 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(6, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(6)))))));
+
+    Assert.assertFalse("8 <= 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(6, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(8)))))));
+  }
+
+  @Test
+  public void testGreaterThan() {
+    Evaluator evaluator = new Evaluator(STRUCT, greaterThan("x", 7));
+    Assert.assertFalse("7 > 7 => false", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
+    Assert.assertFalse("6 > 7 => false", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
+    Assert.assertTrue("8 > 7 => true", evaluator.eval(TestHelpers.Row.of(8, 8, 
null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
greaterThan("s1.s2.s3.s4.i", 7));
+    Assert.assertFalse("7 > 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(7)))))));
+    Assert.assertFalse("6 > 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(6)))))));
+    Assert.assertTrue("8 > 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(8)))))));
+  }
+
+  @Test
+  public void testGreaterThanOrEqual() {
+    Evaluator evaluator = new Evaluator(STRUCT, greaterThanOrEqual("x", 7));
+    Assert.assertTrue("7 >= 7 => true", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
+    Assert.assertFalse("6 >= 7 => false", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
+    Assert.assertTrue("8 >= 7 => true", evaluator.eval(TestHelpers.Row.of(8, 
8, null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
greaterThanOrEqual("s1.s2.s3.s4.i", 7));
+    Assert.assertTrue("7 >= 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(7)))))));
+    Assert.assertFalse("6 >= 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(6)))))));
+    Assert.assertTrue("8 >= 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(8)))))));
+  }
+
+  @Test
+  public void testEqual() {
+    Evaluator evaluator = new Evaluator(STRUCT, equal("x", 7));
+    Assert.assertTrue("7 == 7 => true", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
+    Assert.assertFalse("6 == 7 => false", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, equal("s1.s2.s3.s4.i", 
7));
+    Assert.assertTrue("7 == 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(7)))))));
+    Assert.assertFalse("6 == 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(6, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(6)))))));
+  }
+
+  @Test
+  public void testNotEqual() {
+    Evaluator evaluator = new Evaluator(STRUCT, notEqual("x", 7));
+    Assert.assertFalse("7 != 7 => false", evaluator.eval(TestHelpers.Row.of(7, 
8, null)));
+    Assert.assertTrue("6 != 7 => true", evaluator.eval(TestHelpers.Row.of(6, 
8, null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
notEqual("s1.s2.s3.s4.i", 7));
+    Assert.assertFalse("7 != 7 => false",
+        structEvaluator.eval(TestHelpers.Row.of(7, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(7)))))));
+    Assert.assertTrue("6 != 7 => true",
+        structEvaluator.eval(TestHelpers.Row.of(6, 8, null,
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(
+                        TestHelpers.Row.of(6)))))));
+
+  }
+
+  @Test
+  public void testAlwaysTrue() {
+    Evaluator evaluator = new Evaluator(STRUCT, alwaysTrue());
+    Assert.assertTrue("always true", evaluator.eval(TestHelpers.Row.of()));
+  }
+
+  @Test
+  public void testAlwaysFalse() {
+    Evaluator evaluator = new Evaluator(STRUCT, alwaysFalse());
+    Assert.assertFalse("always false", evaluator.eval(TestHelpers.Row.of()));
+  }
+
+  @Test
+  public void testIsNull() {
+    Evaluator evaluator = new Evaluator(STRUCT, isNull("z"));
+    Assert.assertTrue("null is null", evaluator.eval(TestHelpers.Row.of(1, 2, 
null)));
+    Assert.assertFalse("3 is not null", evaluator.eval(TestHelpers.Row.of(1, 
2, 3)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, isNull("s1.s2.s3.s4.i"));
+    Assert.assertFalse("3 is not null", 
structEvaluator.eval(TestHelpers.Row.of(1, 2, 3,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(3)))))));
+  }
+
+  @Test
+  public void testNotNull() {
+    Evaluator evaluator = new Evaluator(STRUCT, notNull("z"));
+    Assert.assertFalse("null is null", evaluator.eval(TestHelpers.Row.of(1, 2, 
null)));
+    Assert.assertTrue("3 is not null", evaluator.eval(TestHelpers.Row.of(1, 2, 
3)));
+
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
notNull("s1.s2.s3.s4.i"));
+    Assert.assertTrue("3 is not null", 
structEvaluator.eval(TestHelpers.Row.of(1, 2, 3,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(3)))))));
+  }
+
+  @Test
+  public void testAnd() {
+    Evaluator evaluator = new Evaluator(STRUCT, and(equal("x", 7), 
notNull("z")));
+    Assert.assertTrue("7, 3 => true", evaluator.eval(TestHelpers.Row.of(7, 0, 
3)));
+    Assert.assertFalse("8, 3 => false", evaluator.eval(TestHelpers.Row.of(8, 
0, 3)));
+    Assert.assertFalse("7, null => false", 
evaluator.eval(TestHelpers.Row.of(7, 0, null)));
+    Assert.assertFalse("8, null => false", 
evaluator.eval(TestHelpers.Row.of(8, 0, null)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
and(equal("s1.s2.s3.s4.i", 7),
+        notNull("s1.s2.s3.s4.i")));
+
+    Assert.assertTrue("7, 7 => true", 
structEvaluator.eval(TestHelpers.Row.of(7, 0, 3,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(7)))))));
+    Assert.assertFalse("8, 8 => false", 
structEvaluator.eval(TestHelpers.Row.of(8, 0, 3,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(8)))))));
+
+    Assert.assertFalse("7, null => false", 
structEvaluator.eval(TestHelpers.Row.of(7, 0, null, null)));
+
+    Assert.assertFalse("8, notnull => false", 
structEvaluator.eval(TestHelpers.Row.of(8, 0, null,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(8)))))));
+  }
+
+  @Test
+  public void testOr() {
+    Evaluator evaluator = new Evaluator(STRUCT, or(equal("x", 7), 
notNull("z")));
+    Assert.assertTrue("7, 3 => true", evaluator.eval(TestHelpers.Row.of(7, 0, 
3)));
+    Assert.assertTrue("8, 3 => true", evaluator.eval(TestHelpers.Row.of(8, 0, 
3)));
+    Assert.assertTrue("7, null => true", evaluator.eval(TestHelpers.Row.of(7, 
0, null)));
+    Assert.assertFalse("8, null => false", 
evaluator.eval(TestHelpers.Row.of(8, 0, null)));
+
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
or(equal("s1.s2.s3.s4.i", 7),
+        notNull("s1.s2.s3.s4.i")));
+
+    Assert.assertTrue("7, 7 => true", 
structEvaluator.eval(TestHelpers.Row.of(7, 0, 3,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(7)))))));
+    Assert.assertTrue("8, 8 => false", 
structEvaluator.eval(TestHelpers.Row.of(8, 0, 3,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(8)))))));
+
+    Assert.assertTrue("7, notnull => false", 
structEvaluator.eval(TestHelpers.Row.of(7, 0, null,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(7)))))));
+  }
+
+  @Test
+  public void testNot() {
+    Evaluator evaluator = new Evaluator(STRUCT, not(equal("x", 7)));
+    Assert.assertFalse("not(7 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(7)));
+    Assert.assertTrue("not(8 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(8)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
not(equal("s1.s2.s3.s4.i", 7)));
+    Assert.assertFalse("not(7 == 7) => false", 
structEvaluator.eval(TestHelpers.Row.of(7, null, null,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(7)))))));
+    Assert.assertTrue("not(8 == 7) => false", 
structEvaluator.eval(TestHelpers.Row.of(8, null, null,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(8)))))));
+  }
+
+  @Test
+  public void testCaseInsensitiveNot() {
+    Evaluator evaluator = new Evaluator(STRUCT, not(equal("X", 7)), false);
+    Assert.assertFalse("not(7 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(7)));
+    Assert.assertTrue("not(8 == 7) => false", 
evaluator.eval(TestHelpers.Row.of(8)));
+
+    Evaluator structEvaluator = new Evaluator(STRUCT, 
not(equal("s1.s2.s3.s4.i", 7)), false);
+    Assert.assertFalse("not(7 == 7) => false", 
structEvaluator.eval(TestHelpers.Row.of(7, null, null,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(7)))))));
+    Assert.assertTrue("not(8 == 7) => false", 
structEvaluator.eval(TestHelpers.Row.of(8, null, null,
+        TestHelpers.Row.of(
+            TestHelpers.Row.of(
+                TestHelpers.Row.of(
+                    TestHelpers.Row.of(8)))))));
+  }
+
+  @Test
+  public void testCaseSensitiveNot() {
+    TestHelpers.assertThrows(
+        "X != x when case sensitivity is on",
+        ValidationException.class,
+        "Cannot find field 'X' in struct",
+        () -> new Evaluator(STRUCT, not(equal("X", 7)), true));
+  }
+
+  @Test
+  public void testCharSeqValue() {
+    StructType struct = StructType.of(required(34, "s", 
Types.StringType.get()));
+    Evaluator evaluator = new Evaluator(struct, equal("s", "abc"));
+    Assert.assertTrue("string(abc) == utf8(abc) => true",
+        evaluator.eval(TestHelpers.Row.of(new Utf8("abc"))));
+    Assert.assertFalse("string(abc) == utf8(abcd) => false",
+        evaluator.eval(TestHelpers.Row.of(new Utf8("abcd"))));
+  }
+}
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 58fe902..ae84cb2 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -168,8 +168,6 @@ public class ParquetDictionaryRowGroupFilter {
     @Override
     public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Boolean hasNonDictPage = isFallback.get(id);
       if (hasNonDictPage == null || hasNonDictPage) {
@@ -192,8 +190,6 @@ public class ParquetDictionaryRowGroupFilter {
     @Override
     public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Boolean hasNonDictPage = isFallback.get(id);
       if (hasNonDictPage == null || hasNonDictPage) {
@@ -216,8 +212,6 @@ public class ParquetDictionaryRowGroupFilter {
     @Override
     public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Boolean hasNonDictPage = isFallback.get(id);
       if (hasNonDictPage == null || hasNonDictPage) {
@@ -240,8 +234,6 @@ public class ParquetDictionaryRowGroupFilter {
     @Override
     public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Boolean hasNonDictPage = isFallback.get(id);
       if (hasNonDictPage == null || hasNonDictPage) {
@@ -264,8 +256,6 @@ public class ParquetDictionaryRowGroupFilter {
     @Override
     public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Boolean hasNonDictPage = isFallback.get(id);
       if (hasNonDictPage == null || hasNonDictPage) {
@@ -280,8 +270,6 @@ public class ParquetDictionaryRowGroupFilter {
     @Override
     public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Boolean hasNonDictPage = isFallback.get(id);
       if (hasNonDictPage == null || hasNonDictPage) {
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 4fe93dd..884ac16 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -135,8 +135,6 @@ public class ParquetMetricsRowGroupFilter {
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no null values, the expression cannot match
       Integer id = ref.fieldId();
-      Preconditions.checkNotNull(struct.field(id),
-          "Cannot filter by nested column: %s", schema.findField(id));
 
       Long valueCount = valueCounts.get(id);
       if (valueCount == null) {
@@ -158,8 +156,6 @@ public class ParquetMetricsRowGroupFilter {
       // no need to check whether the field is required because binding 
evaluates that case
       // if the column has no non-null values, the expression cannot match
       Integer id = ref.fieldId();
-      Preconditions.checkNotNull(struct.field(id),
-          "Cannot filter by nested column: %s", schema.findField(id));
 
       // When filtering nested types notNull() is implicit filter passed even 
though complex
       // filters aren't pushed down in Parquet. Leave all nested column type 
filters to be
@@ -186,8 +182,6 @@ public class ParquetMetricsRowGroupFilter {
     @Override
     public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Long valueCount = valueCounts.get(id);
       if (valueCount == null) {
@@ -214,8 +208,6 @@ public class ParquetMetricsRowGroupFilter {
     @Override
     public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Long valueCount = valueCounts.get(id);
       if (valueCount == null) {
@@ -242,8 +234,6 @@ public class ParquetMetricsRowGroupFilter {
     @Override
     public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Long valueCount = valueCounts.get(id);
       if (valueCount == null) {
@@ -270,8 +260,6 @@ public class ParquetMetricsRowGroupFilter {
     @Override
     public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       Long valueCount = valueCounts.get(id);
       if (valueCount == null) {
@@ -298,8 +286,6 @@ public class ParquetMetricsRowGroupFilter {
     @Override
     public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
       Integer id = ref.fieldId();
-      Types.NestedField field = struct.field(id);
-      Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", 
schema.findField(id));
 
       // When filtering nested types notNull() is implicit filter passed even 
though complex
       // filters aren't pushed down in Parquet. Leave all nested column type 
filters to be
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index 20c4553..bf1fb6f 100644
--- 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -27,11 +27,13 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.FloatType;
 import org.apache.iceberg.types.Types.IntegerType;
 import org.apache.iceberg.types.Types.LongType;
@@ -60,6 +62,10 @@ import static 
org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestDictionaryRowGroupFilter {
+
+  private static final Types.StructType structFieldType =
+          Types.StructType.of(Types.NestedField.required(9, "int_field", 
IntegerType.get()));
+
   private static final Schema SCHEMA = new Schema(
       required(1, "id", IntegerType.get()),
       optional(2, "no_stats", StringType.get()),
@@ -68,9 +74,13 @@ public class TestDictionaryRowGroupFilter {
       optional(5, "some_nulls", StringType.get()),
       optional(6, "no_nulls", StringType.get()),
       optional(7, "non_dict", StringType.get()),
-      optional(8, "not_in_file", FloatType.get())
+      optional(8, "struct_not_null", structFieldType),
+      optional(10, "not_in_file", FloatType.get())
   );
 
+  private static final Types.StructType _structFieldType =
+          Types.StructType.of(Types.NestedField.required(9, "_int_field", 
IntegerType.get()));
+
   private static final Schema FILE_SCHEMA = new Schema(
       required(1, "_id", IntegerType.get()),
       optional(2, "_no_stats", StringType.get()),
@@ -78,7 +88,8 @@ public class TestDictionaryRowGroupFilter {
       optional(4, "_all_nulls", LongType.get()),
       optional(5, "_some_nulls", StringType.get()),
       optional(6, "_no_nulls", StringType.get()),
-      optional(7, "_non_dict", StringType.get())
+      optional(7, "_non_dict", StringType.get()),
+      optional(8, "_struct_not_null", _structFieldType)
   );
 
   private static final String TOO_LONG_FOR_STATS;
@@ -101,6 +112,9 @@ public class TestDictionaryRowGroupFilter {
       Assert.assertTrue(PARQUET_FILE.delete());
     }
 
+    // build struct field schema
+    org.apache.avro.Schema structSchema = 
AvroSchemaUtil.convert(_structFieldType);
+
     OutputFile outFile = Files.localOutput(PARQUET_FILE);
     try (FileAppender<Record> appender = Parquet.write(outFile)
         .schema(FILE_SCHEMA)
@@ -117,6 +131,11 @@ public class TestDictionaryRowGroupFilter {
           builder.set("_some_nulls", (i % 10 == 0) ? null : "some"); // 
includes some null values
           builder.set("_no_nulls", ""); // optional, but always non-null
           builder.set("_non_dict", UUID.randomUUID().toString()); // not 
dictionary-encoded
+
+          Record struct_not_null = new Record(structSchema);
+          struct_not_null.put("_int_field", 30 + i);
+          builder.set("_struct_not_null", struct_not_null); // struct with int
+
           appender.add(builder.build());
         }
       }
@@ -176,6 +195,10 @@ public class TestDictionaryRowGroupFilter {
     shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notNull("no_nulls"))
         .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
     Assert.assertTrue("Should read: dictionary filter doesn't help", 
shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notNull("struct_not_null"))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: dictionary filter doesn't help", 
shouldRead);
   }
 
   @Test
@@ -191,6 +214,10 @@ public class TestDictionaryRowGroupFilter {
     shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
isNull("no_nulls"))
         .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
     Assert.assertTrue("Should read: dictionary filter doesn't help", 
shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
isNull("struct_not_null"))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: dictionary filter doesn't help", 
shouldRead);
   }
 
   @Test
@@ -471,6 +498,146 @@ public class TestDictionaryRowGroupFilter {
   }
 
   @Test
+  public void testStructFieldLt() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", 
shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range below lower bound (30 is not 
< 30)", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 31))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldLtEq() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", 
shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 29))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range below lower bound (29 < 
30)", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: many possible ids", shouldRead);
+
+  }
+
+  @Test
+  public void testStructFieldGt() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 
79)", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range above upper bound (79 is not 
> 79)", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 78))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldGtEq() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA,
+        greaterThanOrEqual("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 
79)", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 80))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id range above upper bound (80 > 
79)", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldEq() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 29))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", 
shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 80))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldNotEq() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 5))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 29))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 30))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 75))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", 
shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 79))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, notEqual("id", 
80))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+    shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 85))
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+  }
+
+  @Test
   public void testCaseInsensitive() {
     boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, 
notEqual("no_Nulls", ""), false)
         .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
index 41a091c..90adcbb 100644
--- 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
@@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.FileAppender;
@@ -60,6 +61,10 @@ import static 
org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestMetricsRowGroupFilter {
+
+  private static final Types.StructType structFieldType =
+          Types.StructType.of(Types.NestedField.required(8, "int_field", 
IntegerType.get()));
+
   private static final Schema SCHEMA = new Schema(
       required(1, "id", IntegerType.get()),
       optional(2, "no_stats", StringType.get()),
@@ -67,20 +72,23 @@ public class TestMetricsRowGroupFilter {
       optional(4, "all_nulls", LongType.get()),
       optional(5, "some_nulls", StringType.get()),
       optional(6, "no_nulls", StringType.get()),
-      optional(7, "not_in_file", FloatType.get()),
-      optional(8, "map_not_null",
-              Types.MapType.ofRequired(9, 10, StringType.get(), 
IntegerType.get())),
-      optional(9, "struct_not_null",
-              Types.StructType.of(Types.NestedField.required(10, 
"struct_subfield", StringType.get())))
+      optional(7, "struct_not_null", structFieldType),
+      optional(9, "not_in_file", FloatType.get()),
+      optional(10, "map_not_null",
+          Types.MapType.ofRequired(11, 12, StringType.get(), 
IntegerType.get()))
   );
 
+  private static final Types.StructType _structFieldType =
+          Types.StructType.of(Types.NestedField.required(8, "_int_field", 
IntegerType.get()));
+
   private static final Schema FILE_SCHEMA = new Schema(
       required(1, "_id", IntegerType.get()),
       optional(2, "_no_stats", StringType.get()),
       required(3, "_required", StringType.get()),
       optional(4, "_all_nulls", LongType.get()),
       optional(5, "_some_nulls", StringType.get()),
-      optional(6, "_no_nulls", StringType.get())
+      optional(6, "_no_nulls", StringType.get()),
+      optional(7, "_struct_not_null", _structFieldType)
   );
 
   private static final String TOO_LONG_FOR_STATS;
@@ -102,6 +110,9 @@ public class TestMetricsRowGroupFilter {
       Assert.assertTrue(PARQUET_FILE.delete());
     }
 
+    // build struct field schema
+    org.apache.avro.Schema structSchema = 
AvroSchemaUtil.convert(_structFieldType);
+
     OutputFile outFile = Files.localOutput(PARQUET_FILE);
     try (FileAppender<Record> appender = Parquet.write(outFile)
         .schema(FILE_SCHEMA)
@@ -115,6 +126,11 @@ public class TestMetricsRowGroupFilter {
         builder.set("_all_nulls", null); // never non-null
         builder.set("_some_nulls", (i % 10 == 0) ? null : "some"); // includes 
some null values
         builder.set("_no_nulls", ""); // optional, but always non-null
+
+        Record struct_not_null = new Record(structSchema);
+        struct_not_null.put("_int_field", 30 + i);
+        builder.set("_struct_not_null", struct_not_null ); // struct with int
+
         appender.add(builder.build());
       }
     }
@@ -462,6 +478,144 @@ public class TestMetricsRowGroupFilter {
   }
 
   @Test
+  public void testStructFieldLt() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", 
shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range below lower bound (30 is not 
< 30)", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 31))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThan("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldLtEq() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", 
shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 29))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range below lower bound (29 < 
30)", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
lessThanOrEqual("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: many possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldGt() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 
79)", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range above upper bound (79 is not 
> 79)", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 78))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThan("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldGtEq() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 
79)", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 80))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id range above upper bound (80 > 
79)", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
greaterThanOrEqual("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldEq() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 29))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id between lower and upper bounds", 
shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 80))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
equal("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+  }
+
+  @Test
+  public void testStructFieldNotEq() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 5))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 29))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 30))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 75))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id between lower and upper bounds", 
shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 79))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, notEqual("id", 80))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+    shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, 
notEqual("struct_not_null.int_field", 85))
+            .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+  }
+
+  @Test
   public void testCaseInsensitive() {
     boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, equal("ID", 
5), false)
         .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);

Reply via email to