kamcheungting-db commented on code in PR #13694:
URL: https://github.com/apache/iceberg/pull/13694#discussion_r2304519599


##########
api/src/main/java/org/apache/iceberg/expressions/InclusiveStatsEvaluator.java:
##########
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.expressions;
+
+import static org.apache.iceberg.expressions.Expressions.rewriteNot;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.stats.ContentStats;
+import org.apache.iceberg.stats.FieldStats;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.NaNUtil;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantObject;
+
+/**
+ * Evaluates an {@link Expression} on a {@link DataFile} to test whether rows 
in the file may match.
+ *
+ * <p>This evaluation is inclusive: it returns true if a file may match and 
false if it cannot
+ * match.
+ *
+ * <p>Files are passed to {@link #eval(ContentFile)}, which returns true if 
the file may contain
+ * matching rows and false if the file cannot contain matching rows. Files may 
be skipped if and
+ * only if the return value of {@code eval} is false.
+ *
+ * <p>Due to the comparison implementation of ORC stats, for float/double 
columns in ORC files, if
+ * the first value in a file is NaN, metrics of this file will report NaN for 
both upper and lower
+ * bound despite that the column could contain non-NaN data. Thus in some 
scenarios explicitly
+ * checks for NaN is necessary in order to not skip files that may contain 
matching data.
+ */
+public class InclusiveStatsEvaluator {
+  private static final int IN_PREDICATE_LIMIT = 200;
+
+  private final Expression expr;
+
+  public InclusiveStatsEvaluator(Schema schema, Expression unbound) {
+    this(schema, unbound, true);
+  }
+
+  public InclusiveStatsEvaluator(Schema schema, Expression unbound, boolean 
caseSensitive) {
+    StructType struct = schema.asStruct();
+    this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
+  }
+
+  /**
+   * Test whether the file may contain records that match the expression.
+   *
+   * @param file a data file
+   * @return false if the file cannot contain rows that match the expression, 
true otherwise.
+   */
+  public boolean eval(ContentFile<?> file) {
+    // TODO: detect the case where a column is missing from the file using 
file's max field id.
+    return new StatsEvalVisitor().eval(file);
+  }
+
+  private static final boolean ROWS_MIGHT_MATCH = true;
+  private static final boolean ROWS_CANNOT_MATCH = false;
+
+  private class StatsEvalVisitor extends 
ExpressionVisitors.BoundVisitor<Boolean> {
+    private ContentStats stats;
+
+    private boolean eval(ContentFile<?> file) {
+      if (file.recordCount() == 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      if (file.recordCount() < 0) {
+        // we haven't implemented parsing record count from avro file and thus 
set record count -1
+        // when importing avro tables to iceberg tables. This should be 
updated once we implemented
+        // and set correct record count.
+        return ROWS_MIGHT_MATCH;
+      }
+
+      this.stats = file.contentStats();
+
+      return ExpressionVisitors.visitEvaluator(expr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH; // all rows match
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH; // all rows fail
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(Bound<T> term) {
+      // no need to check whether the field is required because binding 
evaluates that case
+      // if the column has no null values, the expression cannot match
+      if (isNonNullPreserving(term)) {
+        // number of non-nulls is the same as for the ref
+        int id = term.ref().fieldId();
+        if (!mayContainNull(id)) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(Bound<T> term) {
+      // no need to check whether the field is required because binding 
evaluates that case
+      // if the column has no non-null values, the expression cannot match
+
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(Bound<T> term) {
+      // when there's no nanCounts information, but we already know the column 
only contains null,
+      // it's guaranteed that there's no NaN value
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      if (!(term instanceof BoundReference)) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      FieldStats stat = stats.statsFor(id);
+      if (null != stat && null != stat.nanValueCount() && stat.nanValueCount() 
== 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNaN(Bound<T> term) {
+      if (!(term instanceof BoundReference)) {
+        // identity transforms are already removed by this time
+        return ROWS_MIGHT_MATCH;
+      }
+
+      int id = term.ref().fieldId();
+
+      if (containsNaNsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id) || containsNaNsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T lower = lowerBound(term);
+      if (null == lower || NaNUtil.isNaN(lower)) {
+        // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator 
docs for more.
+        return ROWS_MIGHT_MATCH;
+      }
+
+      // this also works for transforms that are order preserving:
+      // if a transform f is order preserving, a < b means that f(a) <= f(b).
+      // because lower <= a for all values of a in the file, f(lower) <= f(a).
+      // when f(lower) >= X then f(a) >= f(lower) >= X, so there is no a such 
that f(a) < X
+      // f(lower) >= X means rows cannot match
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp >= 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean ltEq(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id) || containsNaNsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T lower = lowerBound(term);
+      if (null == lower || NaNUtil.isNaN(lower)) {
+        // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator 
docs for more.
+        return ROWS_MIGHT_MATCH;
+      }
+
+      // this also works for transforms that are order preserving:
+      // if a transform f is order preserving, a < b means that f(a) <= f(b).
+      // because lower <= a for all values of a in the file, f(lower) <= f(a).
+      // when f(lower) > X then f(a) >= f(lower) > X, so there is no a such 
that f(a) <= X
+      // f(lower) > X means rows cannot match
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean gt(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id) || containsNaNsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp <= 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean gtEq(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id) || containsNaNsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean eq(Bound<T> term, Literal<T> lit) {
+      // all terms are null preserving. see #isNullPreserving(Bound)
+      int id = term.ref().fieldId();
+      if (containsNullsOnly(id) || containsNaNsOnly(id)) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T lower = lowerBound(term);
+      if (lower != null && !NaNUtil.isNaN(lower)) {
+        int cmp = lit.comparator().compare(lower, lit.value());
+        if (cmp > 0) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+
+      T upper = upperBound(term);
+      if (null == upper) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notEq(Bound<T> term, Literal<T> lit) {
+      // because the bounds are not necessarily a min or max value, this 
cannot be answered using
+      // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.

Review Comment:
   Could you provide more detail on this `the bounds are not necessarily a min 
or max value`?
   I think I miss some critical part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to