This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e3e1a  PARQUET-1968: FilterApi support In predicate (#923)
98e3e1a is described below

commit 98e3e1a770993903fbe37c8ac61321c8832d833f
Author: Huaxin Gao <[email protected]>
AuthorDate: Thu Sep 30 00:32:06 2021 -0700

    PARQUET-1968: FilterApi support In predicate (#923)
---
 .../java/org/apache/parquet/column/MinMax.java     |  56 +++++++
 .../parquet/filter2/predicate/FilterApi.java       |  53 ++++++
 .../parquet/filter2/predicate/FilterPredicate.java |   8 +
 .../filter2/predicate/LogicalInverseRewriter.java  |  12 ++
 .../parquet/filter2/predicate/LogicalInverter.java |  12 ++
 .../parquet/filter2/predicate/Operators.java       |  81 ++++++++-
 .../predicate/SchemaCompatibilityValidator.java    |  19 +++
 .../column/columnindex/ColumnIndexBuilder.java     |  70 ++++++++
 .../filter2/columnindex/ColumnIndexFilter.java     |  13 ++
 .../column/columnindex/TestColumnIndexBuilder.java | 182 +++++++++++++++++++++
 .../filter2/columnindex/TestColumnIndexFilter.java |  55 +++++++
 ...crementallyUpdatedFilterPredicateGenerator.java |  65 ++++++++
 .../filter2/bloomfilterlevel/BloomFilterImpl.java  |  37 +++++
 .../filter2/dictionarylevel/DictionaryFilter.java  | 102 ++++++++++++
 .../filter2/statisticslevel/StatisticsFilter.java  |  69 ++++++++
 .../filter2/TestFiltersWithMissingColumns.java     |  10 ++
 .../parquet/filter2/compat/TestRowGroupFilter.java |  28 +++-
 .../dictionarylevel/DictionaryFilterTest.java      | 116 +++++++++++++
 .../recordlevel/TestRecordLevelFilters.java        |  36 +++-
 .../statisticslevel/TestStatisticsFilter.java      |  91 +++++++++++
 .../apache/parquet/hadoop/TestBloomFiltering.java  |  26 +++
 .../parquet/hadoop/TestColumnIndexFiltering.java   |  39 +++++
 22 files changed, 1177 insertions(+), 3 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java 
b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java
new file mode 100644
index 0000000..c97b681
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.schema.PrimitiveComparator;
+
+/**
+ * This class calculates the max and min values of an iterable collection.
+ */
+public final class MinMax<T> {
+  private T min = null;
+  private T max = null;
+
+  public MinMax(PrimitiveComparator<T> comparator, Iterable<T> iterable) {
+    getMinAndMax(comparator, iterable);
+  }
+
+  public T getMin() {
+    return min;
+  }
+
+  public T getMax() {
+    return max;
+  }
+
+  private void getMinAndMax(PrimitiveComparator<T> comparator, Iterable<T> 
iterable) {
+    iterable.forEach(element -> {
+      if (max == null) {
+        max = element;
+      } else if (element != null && comparator.compare(max, element) < 0) {
+          max = element;
+      }
+      if (min == null) {
+        min = element;
+      } else if (element != null && comparator.compare(min, element) > 0) {
+          min = element;
+      }
+    });
+  }
+}
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
index b209fc7..a0490d9 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.filter2.predicate;
 
 import java.io.Serializable;
+import java.util.Set;
 
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.filter2.predicate.Operators.And;
@@ -30,12 +31,14 @@ import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.filter2.predicate.Operators.LongColumn;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq;
 import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt;
@@ -205,6 +208,56 @@ public final class FilterApi {
   }
 
   /**
+   * Keeps records if their value is in the provided values.
+   * The provided values set could not be null, but could contains a null 
value.
+   * <p>
+   * For example:
+   * <pre>
+   *   {@code
+   *   Set<Integer> set = new HashSet<>();
+   *   set.add(9);
+   *   set.add(null);
+   *   set.add(50);
+   *   in(column, set);}
+   * </pre>
+   * will keep all records whose values are 9, null, or 50.
+   *
+   * @param column a column reference created by FilterApi
+   * @param values a set of values that match the column's type
+   * @param <T> the Java type of values in the column
+   * @param <C> the column type that corresponds to values of type T
+   * @return an in predicate for the given column and value
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & 
SupportsEqNotEq> In<T> in(C column, Set<T> values) {
+    return new In<>(column, values);
+  }
+
+  /**
+   * Keeps records if their value is not in the provided values.
+   * The provided values set could not be null, but could contains a null 
value.
+   * <p>
+   * For example:
+   * <pre>
+   *   {@code
+   *   Set<Integer> set = new HashSet<>();
+   *   set.add(9);
+   *   set.add(null);
+   *   set.add(50);
+   *   notIn(column, set);}
+   * </pre>
+   * will keep all records whose values are not 9, null, and 50.
+   *
+   * @param column a column reference created by FilterApi
+   * @param values a set of values that match the column's type
+   * @param <T> the Java type of values in the column
+   * @param <C> the column type that corresponds to values of type T
+   * @return an notIn predicate for the given column and value
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & 
SupportsEqNotEq> NotIn<T> notIn(C column, Set<T> values) {
+    return new NotIn<>(column, values);
+  }
+
+  /**
    * Keeps records that pass the provided {@link UserDefinedPredicate}
    * <p>
    * The provided class must have a default constructor. To use an instance
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
index 211c71e..d9156c2 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
@@ -22,11 +22,13 @@ import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 
@@ -66,6 +68,12 @@ public interface FilterPredicate {
     <T extends Comparable<T>> R visit(LtEq<T> ltEq);
     <T extends Comparable<T>> R visit(Gt<T> gt);
     <T extends Comparable<T>> R visit(GtEq<T> gtEq);
+    default <T extends Comparable<T>> R visit(In<T> in) {
+      throw new UnsupportedOperationException("visit in is not supported.");
+    }
+    default <T extends Comparable<T>> R visit(NotIn<T> notIn) {
+      throw new UnsupportedOperationException("visit NotIn is not supported.");
+    }
     R visit(And and);
     R visit(Or or);
     R visit(Not not);
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
index 88cb836..49b862f 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
@@ -23,11 +23,13 @@ import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 
@@ -88,6 +90,16 @@ public final class LogicalInverseRewriter implements 
Visitor<FilterPredicate> {
   }
 
   @Override
+  public <T extends Comparable<T>> FilterPredicate visit(In<T> in) {
+    return in;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(NotIn<T> notIn) {
+    return notIn;
+  }
+
+  @Override
   public FilterPredicate visit(And and) {
     return and(and.getLeft().accept(this), and.getRight().accept(this));
   }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
index cc0186b..b95d473 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
@@ -23,11 +23,13 @@ import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 
@@ -82,6 +84,16 @@ public final class LogicalInverter implements 
Visitor<FilterPredicate> {
   }
 
   @Override
+  public <T extends Comparable<T>> FilterPredicate visit(In<T> in) {
+    return new NotIn<>(in.getColumn(), in.getValues());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(NotIn<T> notIn) {
+    return new In<>(notIn.getColumn(), notIn.getValues());
+  }
+
+  @Override
   public FilterPredicate visit(And and) {
     return new Or(and.getLeft().accept(this), and.getRight().accept(this));
   }
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
index 9a1696c..d52aa92 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
@@ -21,10 +21,13 @@ package org.apache.parquet.filter2.predicate;
 import java.io.Serializable;
 import java.util.Locale;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.io.api.Binary;
 
+import static org.apache.parquet.Preconditions.checkArgument;
+
 /**
  * These are the operators in a filter predicate expression tree.
  * They are constructed by using the methods in {@link FilterApi}
@@ -169,7 +172,7 @@ public final class Operators {
   public static final class Eq<T extends Comparable<T>> extends 
ColumnFilterPredicate<T> {
 
     // value can be null
-    Eq(Column<T> column, T value) {
+    public Eq(Column<T> column, T value) {
       super(column, value);
     }
 
@@ -247,6 +250,82 @@ public final class Operators {
     }
   }
 
+  /**
+   * Base class for {@link In} and {@link NotIn}. {@link In} is used to filter 
data based on a list of values.
+   * {@link NotIn} is used to filter data that are not in the list of values.
+   */
+  public static abstract class SetColumnFilterPredicate<T extends 
Comparable<T>> implements FilterPredicate, Serializable {
+    private final Column<T> column;
+    private final Set<T> values;
+
+    protected SetColumnFilterPredicate(Column<T> column, Set<T> values) {
+      this.column = Objects.requireNonNull(column, "column cannot be null");
+      this.values = Objects.requireNonNull(values, "values cannot be null");
+      checkArgument(!values.isEmpty(), "values in SetColumnFilterPredicate 
shouldn't be empty!");
+    }
+
+    public Column<T> getColumn() {
+      return column;
+    }
+
+    public Set<T> getValues() {
+      return values;
+    }
+
+    @Override
+    public String toString() {
+      String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH);
+      StringBuilder str = new StringBuilder();
+      
str.append(name).append("(").append(column.getColumnPath().toDotString()).append(",
 ");
+      int iter = 0;
+      for (T value : values) {
+        if (iter >= 100) break;
+        str.append(value).append(", ");
+        iter++;
+      }
+      int length = str.length();
+      str = values.size() <= 100 ? str.delete(length - 2, length) : 
str.append("...");
+      return str.append(")").toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      SetColumnFilterPredicate<?> that = (SetColumnFilterPredicate<?>) o;
+      return column.equals(that.column) && values.equals(that.values);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(column, values);
+    }
+  }
+
+  public static final class In<T extends Comparable<T>> extends 
SetColumnFilterPredicate<T> {
+
+    public In(Column<T> column, Set<T> values) {
+      super(column, values);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class NotIn<T extends Comparable<T>> extends 
SetColumnFilterPredicate<T> {
+
+    NotIn(Column<T> column, Set<T> values) {
+      super(column, values);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
   // base class for And, Or
   private static abstract class BinaryLogicalFilterPredicate implements 
FilterPredicate, Serializable {
     private final FilterPredicate left;
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
index c75036b..49fd10c 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
@@ -29,12 +29,15 @@ import 
org.apache.parquet.filter2.predicate.Operators.ColumnFilterPredicate;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.schema.MessageType;
@@ -115,6 +118,18 @@ public class SchemaCompatibilityValidator implements 
FilterPredicate.Visitor<Voi
   }
 
   @Override
+  public <T extends Comparable<T>> Void visit(In<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(NotIn<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
   public Void visit(And and) {
     and.getLeft().accept(this);
     and.getRight().accept(this);
@@ -149,6 +164,10 @@ public class SchemaCompatibilityValidator implements 
FilterPredicate.Visitor<Voi
     validateColumn(pred.getColumn());
   }
 
+  private <T extends Comparable<T>> void 
validateColumnFilterPredicate(SetColumnFilterPredicate<T> pred) {
+    validateColumn(pred.getColumn());
+  }
+
   private <T extends Comparable<T>> void validateColumn(Column<T> column) {
     ColumnPath path = column.getColumnPath();
 
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
 
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index 15be50e..fc3859b 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -23,21 +23,28 @@ import static java.util.Objects.requireNonNull;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Formatter;
+import java.util.Iterator;
 import java.util.List;
 import java.util.PrimitiveIterator;
+import java.util.Set;
+import java.util.function.IntConsumer;
 import java.util.function.IntPredicate;
 
+import org.apache.parquet.column.MinMax;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.io.api.Binary;
@@ -288,6 +295,69 @@ public abstract class ColumnIndexBuilder {
     }
 
     @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(In<T> in) {
+      Set<T> values = in.getValues();
+      IntSet matchingIndexesForNull = new IntOpenHashSet();  // for null
+      Iterator<T> it = values.iterator();
+      while(it.hasNext()) {
+        T value = it.next();
+        if (value == null) {
+          if (nullCounts == null) {
+            // Searching for nulls so if we don't have null related statistics 
we have to return all pages
+            return IndexIterator.all(getPageCount());
+          } else {
+            for (int i = 0; i < nullCounts.length; i++) {
+              if (nullCounts[i] > 0) {
+                matchingIndexesForNull.add(i);
+              }
+            }
+            if (values.size() == 1) {
+              return IndexIterator.filter(getPageCount(), pageIndex -> 
matchingIndexesForNull.contains(pageIndex));
+            }
+          }
+        }
+      }
+
+      IntSet matchingIndexesLessThanMax = new IntOpenHashSet();
+      IntSet matchingIndexesGreaterThanMin = new IntOpenHashSet();
+
+      MinMax<T> minMax = new MinMax(comparator, values);
+      T min = minMax.getMin();
+      T max = minMax.getMax();
+
+      // We don't want to iterate through each of the values in the IN set to 
compare,
+      // because the size of the IN set might be very large. Instead, we want 
to only
+      // compare the max and min value of the IN set to see if the page might 
contain the
+      // values in the IN set.
+      // If there might be values in a page that are <= the max value in the 
IN set,
+      // and >= the min value in the IN set, then the page might contain
+      // the values in the IN set.
+      getBoundaryOrder().ltEq(createValueComparator(max))
+        .forEachRemaining((int index) -> 
matchingIndexesLessThanMax.add(index));
+      getBoundaryOrder().gtEq(createValueComparator(min))
+        .forEachRemaining((int index) -> 
matchingIndexesGreaterThanMin.add(index));
+      matchingIndexesLessThanMax.retainAll(matchingIndexesGreaterThanMin);
+      IntSet matchingIndex = matchingIndexesLessThanMax;
+      matchingIndex.addAll(matchingIndexesForNull);  // add the matching null 
pages
+      return IndexIterator.filter(getPageCount(), pageIndex -> 
matchingIndex.contains(pageIndex));
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(NotIn<T> 
notIn) {
+      IntSet indexes = getMatchingIndexes(notIn);
+      return IndexIterator.filter(getPageCount(), pageIndex -> 
!indexes.contains(pageIndex));
+    }
+
+    private <T extends Comparable<T>> IntSet 
getMatchingIndexes(SetColumnFilterPredicate<T> in) {
+      IntSet matchingIndexes = new IntOpenHashSet();
+      for (T value : in.getValues()) {
+        Eq<T> eq = new Eq<>(in.getColumn(), value);
+        visit(eq).forEachRemaining((IntConsumer) matchingIndexes::add);
+      }
+      return matchingIndexes;
+    }
+
+    @Override
     public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
PrimitiveIterator.OfInt visit(
         UserDefined<T, U> udp) {
       final UserDefinedPredicate<T> predicate = udp.getUserDefinedPredicate();
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
 
b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
index 6dec774..6c27f98 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -27,6 +27,7 @@ import 
org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
 import 
org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
 import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators;
 import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Column;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
@@ -147,6 +148,18 @@ public class ColumnIndexFilter implements 
Visitor<RowRanges> {
   }
 
   @Override
+  public <T extends Comparable<T>> RowRanges visit(Operators.In<T> in) {
+    boolean isNull = in.getValues().contains(null);
+    return applyPredicate(in.getColumn(), ci -> ci.visit(in), isNull ? 
allRows() : RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(Operators.NotIn<T> notIn) {
+    boolean isNull = notIn.getValues().contains(null);
+    return applyPredicate(notIn.getColumn(), ci -> ci.visit(notIn), isNull ? 
RowRanges.EMPTY : allRows());
+  }
+
+  @Override
   public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
RowRanges visit(UserDefined<T, U> udp) {
     return applyPredicate(udp.getColumn(), ci -> ci.visit(udp),
         udp.getUserDefinedPredicate().acceptsNullValue() ? allRows() : 
RowRanges.EMPTY);
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
 
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 5a3947c..9c1d4dc 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -26,11 +26,13 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.gt;
 import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.lt;
 import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
 import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
 import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
 import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
 import static org.apache.parquet.schema.OriginalType.DECIMAL;
@@ -55,7 +57,9 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.statistics.Statistics;
@@ -274,6 +278,13 @@ public class TestColumnIndexBuilder {
         decimalBinary("87656273"));
     assertCorrectFiltering(columnIndex, eq(col, decimalBinary("0.0")), 1, 4);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6);
+    Set<Binary> set1 = new HashSet<>();
+    set1.add(Binary.fromString("0.0"));
+    assertCorrectFiltering(columnIndex, in(col, set1), 1, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 2, 3, 5, 6, 7);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 7);
     assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 
0, 1, 2, 3, 4, 5, 6);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 7);
     assertCorrectFiltering(columnIndex, gt(col, decimalBinary("2348978.45")), 
1);
@@ -319,6 +330,13 @@ public class TestColumnIndexBuilder {
         null);
     assertCorrectFiltering(columnIndex, eq(col, decimalBinary("87656273")), 2, 
4);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 3, 5, 6, 7);
+    Set<Binary> set2 = new HashSet<>();
+    set2.add(decimalBinary("87656273"));
+    assertCorrectFiltering(columnIndex, in(col, set2), 2, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 3, 5, 6, 7);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 1);
     assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 
0, 1, 2, 3, 5, 6, 7);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 6);
     assertCorrectFiltering(columnIndex, gt(col, decimalBinary("87656273")), 6);
@@ -364,6 +382,13 @@ public class TestColumnIndexBuilder {
         decimalBinary("-9999293.23"));
     assertCorrectFiltering(columnIndex, eq(col, 
decimalBinary("1234567890.12")), 2, 4);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 6);
+    Set<Binary> set3 = new HashSet<>();
+    set3.add(decimalBinary("1234567890.12"));
+    assertCorrectFiltering(columnIndex, in(col, set3), 2, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 3, 5, 6, 7);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 1, 2, 3, 4, 6);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 5, 7);
     assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("0.0")), 0, 
1, 2, 3, 4, 5, 6, 7);
     assertCorrectFiltering(columnIndex, notEq(col, null), 2, 4, 5, 7);
     assertCorrectFiltering(columnIndex, gt(col, 
decimalBinary("1234567890.12")));
@@ -417,6 +442,13 @@ public class TestColumnIndexBuilder {
         null);
     assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 1, 4, 
5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 5, 7);
+    Set<Binary> set1 = new HashSet<>();
+    set1.add(stringBinary("Marvin"));
+    assertCorrectFiltering(columnIndex, in(col, set1), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 2, 3, 6, 7);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 6);
     assertCorrectFiltering(columnIndex, notEq(col, 
stringBinary("Beeblebrox")), 0, 1, 2, 3, 4, 5, 7);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5, 6);
     assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1, 
5);
@@ -462,6 +494,13 @@ public class TestColumnIndexBuilder {
         null);
     assertCorrectFiltering(columnIndex, eq(col, stringBinary("Jeltz")), 3, 4);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 4, 5, 7);
+    Set<Binary> set2 = new HashSet<>();
+    set2.add( stringBinary("Jeltz"));
+    assertCorrectFiltering(columnIndex, in(col, set2), 3, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 5, 6, 7);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 6);
     assertCorrectFiltering(columnIndex, notEq(col, 
stringBinary("Slartibartfast")), 0, 1, 2, 3, 4, 5, 7);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 3, 4, 6);
     assertCorrectFiltering(columnIndex, gt(col, stringBinary("Marvin")), 4, 6);
@@ -507,6 +546,13 @@ public class TestColumnIndexBuilder {
         stringBinary("Beeblebrox"));
     assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 3);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6, 7);
+    Set<Binary> set3 = new HashSet<>();
+    set3.add(stringBinary("Marvin"));
+    assertCorrectFiltering(columnIndex, in(col, set3), 3);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 4, 5, 6, 7);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 4);
     assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 
1, 2, 3, 5, 6, 7);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 4, 7);
     assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1);
@@ -615,6 +661,13 @@ public class TestColumnIndexBuilder {
     BinaryColumn col = binaryColumn("test_col");
     assertCorrectFiltering(columnIndex, eq(col, stringBinary("Dent")), 2, 3);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 5, 6, 7);
+    Set<Binary> set = new HashSet<>();
+    set.add(stringBinary("Dent"));
+    assertCorrectFiltering(columnIndex, in(col, set), 2, 3);
+    assertCorrectFiltering(columnIndex, notIn(col, set), 0, 1, 4, 5, 6, 7);
+    set.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set), 0, 1, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set), new int[0]);
     assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 
1, 2, 3, 4, 5, 6, 7);
     assertCorrectFiltering(columnIndex, notEq(col, null), 2, 3, 5, 7);
     assertCorrectFiltering(columnIndex, userDefined(col, 
BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 4, 5, 6, 7);
@@ -646,6 +699,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, 
false);
     assertCorrectFiltering(columnIndex, eq(col, true), 0, 1, 2);
     assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    Set<Boolean> set1 = new HashSet<>();
+    set1.add(true);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 3, 4);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 4);
     assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4);
     assertCorrectFiltering(columnIndex, userDefined(col, 
BooleanIsTrueOrNull.class), 0, 1, 2, 3);
@@ -670,6 +730,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, 
false, false, null);
     assertCorrectFiltering(columnIndex, eq(col, true), 4, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6);
+    Set<Boolean> set2 = new HashSet<>();
+    set2.add(true);
+    assertCorrectFiltering(columnIndex, in(col, set2), 4, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 3, 6);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 1);
     assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4, 5, 6);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5);
     assertCorrectFiltering(columnIndex, userDefined(col, 
BooleanIsTrueOrNull.class), 0, 2, 3, 4, 5, 6);
@@ -694,6 +761,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, true, null, null, 
false, false, null);
     assertCorrectFiltering(columnIndex, eq(col, true), 1, 4);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6);
+    Set<Boolean> set3 = new HashSet<>();
+    set3.add(true);
+    assertCorrectFiltering(columnIndex, in(col, set3), 1, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 2, 3, 5, 6);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 1, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), new int[0]);
     assertCorrectFiltering(columnIndex, notEq(col, true), 0, 2, 3, 4, 5, 6);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5);
     assertCorrectFiltering(columnIndex, userDefined(col, 
BooleanIsTrueOrNull.class), 0, 1, 2, 3, 4, 5, 6);
@@ -741,6 +815,14 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 
1.9, -21.0);
     assertCorrectFiltering(columnIndex, eq(col, 0.0), 1, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    Set<Double> set1 = new HashSet<>();
+    set1.add(0.0);
+    set1.add(-4.2);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 2, 3, 4);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 4);
     assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
     assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 4, 5);
@@ -771,6 +853,15 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, 
null, null, -234.6, null, 3.0, null);
     assertCorrectFiltering(columnIndex, eq(col, 0.0), 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    Set<Double> set2 = new HashSet<>();
+    set2.add(0.0);
+    set2.add(3.5);
+    set2.add(-346.0);
+    assertCorrectFiltering(columnIndex, in(col, set2), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 2, 3, 4, 6, 8);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 7, 
8);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), new int[0]);
     assertCorrectFiltering(columnIndex, notEq(col, 0.0), 0, 1, 2, 3, 4, 5, 6, 
7, 8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
     assertCorrectFiltering(columnIndex, gt(col, 2.99999), 7);
@@ -801,6 +892,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, 345.2, null, 234.6, 
null, -2.99999, null, null, -42.83);
     assertCorrectFiltering(columnIndex, eq(col, 234.6), 3, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    Set<Double> set3 = new HashSet<>();
+    set3.add(234.6);
+    assertCorrectFiltering(columnIndex, in(col, set3), 3, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 4, 6, 7, 8);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8);
     assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5, 6, 
7, 8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
     assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 3, 5);
@@ -871,6 +969,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 
1.9f, -21.0f);
     assertCorrectFiltering(columnIndex, eq(col, 0.0f), 1, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    Set<Float> set1 = new HashSet<>();
+    set1.add(0.0f);
+    assertCorrectFiltering(columnIndex, in(col, set1), 1, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 2, 3, 4);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 1, 2, 3, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 4);
     assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
     assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 4, 5);
@@ -901,6 +1006,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, 
null, null, -234.6f, null, 3.0f, null);
     assertCorrectFiltering(columnIndex, eq(col, 0.0f), 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    Set<Float> set2 = new HashSet<>();
+    set2.add(0.0f);
+    assertCorrectFiltering(columnIndex, in(col, set2), 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 3, 4, 6, 7, 
8);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 8);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 7);
     assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 
7, 8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
     assertCorrectFiltering(columnIndex, gt(col, 2.2f), 5, 7);
@@ -931,6 +1043,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, 345.2f, null, 
234.6f, null, -2.99999f, null, null, -42.83f);
     assertCorrectFiltering(columnIndex, eq(col, 234.65f), 3);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    Set<Float> set3 = new HashSet<>();
+    set3.add(234.65f);
+    assertCorrectFiltering(columnIndex, in(col, set3), 3);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 4, 5, 6, 7, 
8);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 5, 8);
     assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 
7, 8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
     assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 3, 5);
@@ -1001,6 +1120,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
     assertCorrectFiltering(columnIndex, eq(col, 2), 0, 1, 2, 4, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    Set<Integer> set1 = new HashSet<>();
+    set1.add(2);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 3);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), new int[0]);
     assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
     assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 5);
@@ -1031,6 +1157,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, 
null, -42, null, 3, null);
     assertCorrectFiltering(columnIndex, eq(col, 2), 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    Set<Integer> set2 = new HashSet<>();
+    set2.add(2);
+    assertCorrectFiltering(columnIndex, in(col, set2), 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 3, 4, 6, 7, 
8);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 8);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 7);
     assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 
8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
     assertCorrectFiltering(columnIndex, gt(col, 2), 7);
@@ -1062,6 +1195,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, 345, null, 42, null, 
-2, null, null, -42);
     assertCorrectFiltering(columnIndex, eq(col, 2), 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    Set<Integer> set3 = new HashSet<>();
+    set3.add(2);
+    assertCorrectFiltering(columnIndex, in(col, set3), 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 3, 4, 6, 7, 
8);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8);
     assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 
8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
     assertCorrectFiltering(columnIndex, gt(col, 2), 1, 3, 5);
@@ -1114,6 +1254,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
     assertCorrectFiltering(columnIndex, eq(col, 2), 2, 4);
     assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    Set<Integer> set1 = new HashSet<>();
+    set1.add(2);
+    assertCorrectFiltering(columnIndex, in(col, set1), 2, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 1, 3, 5);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 1, 2, 3, 4);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 5);
     assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
     assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 4, 5);
@@ -1144,6 +1291,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 
42, null, 0xEF, null);
     assertCorrectFiltering(columnIndex, eq(col, 2), 2);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    Set<Integer> set2 = new HashSet<>();
+    set2.add(2);
+    assertCorrectFiltering(columnIndex, in(col, set2), 2);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 3, 4, 5, 6, 7, 
8);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 6, 8);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 5, 7);
     assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 
8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
     assertCorrectFiltering(columnIndex, gt(col, 0xEE), 7);
@@ -1175,6 +1329,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, 0xFF, null, 0xEA, 
null, 42, null, null, 0);
     assertCorrectFiltering(columnIndex, eq(col, 0xAB), 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    Set<Integer> set3 = new HashSet<>();
+    set3.add(0xAB);
+    assertCorrectFiltering(columnIndex, in(col, set3), 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 3, 4, 6, 7, 
8);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8);
     assertCorrectFiltering(columnIndex, notEq(col, 0xFF), 0, 2, 3, 4, 5, 6, 7, 
8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
     assertCorrectFiltering(columnIndex, gt(col, 0xFF));
@@ -1211,6 +1372,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, 
-21l);
     assertCorrectFiltering(columnIndex, eq(col, 0l), 0, 1, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    Set<Long> set1 = new HashSet<>();
+    set1.add(0l);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 2, 3, 4);
+    set1.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set1), 4);
     assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5);
     assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
     assertCorrectFiltering(columnIndex, gt(col, 2l), 0, 1, 5);
@@ -1241,6 +1409,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, 
null, -42l, null, -3l, null);
     assertCorrectFiltering(columnIndex, eq(col, -42l), 2, 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    Set<Long> set2 = new HashSet<>();
+    set2.add(-42l);
+    assertCorrectFiltering(columnIndex, in(col, set2), 2, 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 3, 4, 6, 7, 8);
+    set2.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 8);
+    assertCorrectFiltering(columnIndex, notIn(col, set2), 7);
     assertCorrectFiltering(columnIndex, notEq(col, -42l), 0, 1, 2, 3, 4, 5, 6, 
7, 8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
     assertCorrectFiltering(columnIndex, gt(col, 2l), 7);
@@ -1272,6 +1447,13 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMinValues(), null, 345l, null, 42l, 
null, -2l, null, null, -42l);
     assertCorrectFiltering(columnIndex, eq(col, 0l), 5);
     assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    Set<Long> set3 = new HashSet<>();
+    set3.add(0l);
+    assertCorrectFiltering(columnIndex, in(col, set3), 5);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 3, 4, 6, 7, 
8);
+    set3.add(null);
+    assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8);
     assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5, 6, 
7, 8);
     assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
     assertCorrectFiltering(columnIndex, gt(col, 2l), 1, 3, 5);
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
 
b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
index f37a343..47ea5fc 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
@@ -26,11 +26,13 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.gt;
 import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.lt;
 import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
 import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
 import static org.apache.parquet.filter2.predicate.FilterApi.or;
 import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
 import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
@@ -68,6 +70,7 @@ import 
org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import 
org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.BinaryUtf8StartsWithB;
 import 
org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.DoubleIsInteger;
 import 
org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.IntegerIsDivisableWith3;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 import org.junit.Test;
 
@@ -360,6 +363,58 @@ public class TestColumnIndexFilter {
         calculateRowRanges(FilterCompat.get(
             userDefined(intColumn("column1"), AnyInt.class)), STORE, paths, 
TOTAL_ROW_COUNT),
         TOTAL_ROW_COUNT);
+
+    Set<Integer> set1 = new HashSet<>();
+    set1.add(7);
+    assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column1"), 
set1)), STORE, paths, TOTAL_ROW_COUNT),
+      7, 8, 9, 10, 11, 12, 13);
+    set1.add(1);
+    assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column1"), 
set1)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13);
+    assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column1"), 
set1)), STORE, paths, TOTAL_ROW_COUNT),
+      1, 2, 3, 4, 5, 6, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
27, 28, 29);
+
+    Set<Binary> set2 = new HashSet<>();
+    set2.add(fromString("Zulu"));
+    set2.add(fromString("Alfa"));
+    assertRows(calculateRowRanges(FilterCompat.get(in(binaryColumn("column2"), 
set2)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 
23, 24, 25, 26, 27, 28, 29);
+    
assertRows(calculateRowRanges(FilterCompat.get(notIn(binaryColumn("column2"), 
set2)), STORE, paths, TOTAL_ROW_COUNT),
+      1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
21, 22, 23, 24, 25, 26, 27, 28);
+
+    Set<Double> set3 = new HashSet<>();
+    set3.add(2.03);
+    assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), 
set3)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 16, 17, 18, 19, 20, 21, 22);
+    
assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), 
set3)), STORE, paths, TOTAL_ROW_COUNT),
+      6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 23, 24, 25, 26, 27, 28, 29);
+    set3.add(9.98);
+    assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), 
set3)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
20, 21, 22, 23, 24, 25);
+    
assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), 
set3)), STORE, paths, TOTAL_ROW_COUNT),
+      6, 7, 8, 9, 23, 24, 25, 26, 27, 28, 29);
+    set3.add(null);
+    assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), 
set3)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
+    
assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), 
set3)), STORE, paths, TOTAL_ROW_COUNT),
+      23, 24, 25);
+
+    Set<Boolean> set4 = new HashSet<>();
+    set4.add(null);
+    
assertRows(calculateRowRanges(FilterCompat.get(in(booleanColumn("column4"), 
set4)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
+    // no column index, can't filter this row
+    
assertRows(calculateRowRanges(FilterCompat.get(notIn(booleanColumn("column4"), 
set4)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
+
+    Set<Integer> set5 = new HashSet<>();
+    set5.add(7);
+    set5.add(20);
+    assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column5"), 
set5)), STORE, paths, TOTAL_ROW_COUNT),
+      new long[0]);
+    assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column5"), 
set5)), STORE, paths, TOTAL_ROW_COUNT),
+      0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 
20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
+
     assertRows(calculateRowRanges(FilterCompat.get(
         and(
             and(
diff --git 
a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
 
b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
index 3c1cf48..2827167 100644
--- 
a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
+++ 
b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
@@ -67,15 +67,18 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
     add("package org.apache.parquet.filter2.recordlevel;\n" +
         "\n" +
         "import java.util.List;\n" +
+        "import java.util.Set;\n" +
         "\n" +
         "import org.apache.parquet.hadoop.metadata.ColumnPath;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Eq;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Gt;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.GtEq;\n" +
+        "import org.apache.parquet.filter2.predicate.Operators.In;\n" +
         "import 
org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Lt;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.LtEq;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.NotEq;\n" +
+        "import org.apache.parquet.filter2.predicate.Operators.NotIn;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.UserDefined;\n" 
+
         "import org.apache.parquet.filter2.predicate.UserDefinedPredicate;\n" +
         "import 
org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n"
 +
@@ -106,6 +109,18 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
     }
     addVisitEnd();
 
+    addVisitBegin("In");
+    for (TypeInfo info : TYPES) {
+      addInNotInCase(info, true);
+    }
+    addVisitEnd();
+
+    addVisitBegin("NotIn");
+    for (TypeInfo info : TYPES) {
+      addInNotInCase(info, false);
+    }
+    addVisitEnd();
+
     addVisitBegin("Lt");
     for (TypeInfo info : TYPES) {
       addInequalityCase(info, "<");
@@ -233,6 +248,56 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
         "    }\n\n");
   }
 
+  private void addInNotInCase(TypeInfo info, boolean isEq) throws IOException {
+    add("    if (clazz.equals(" + info.className + ".class)) {\n" +
+      "      if (pred.getValues().contains(null)) {\n" +
+      "        valueInspector = new ValueInspector() {\n" +
+      "          @Override\n" +
+      "          public void updateNull() {\n" +
+      "            setResult(" + isEq + ");\n" +
+      "          }\n" +
+      "\n" +
+      "          @Override\n" +
+      "          public void update(" + info.primitiveName + " value) {\n" +
+      "            setResult(" + !isEq + ");\n" +
+      "          }\n" +
+      "        };\n" +
+      "      } else {\n" +
+      "        final Set<" + info.className + "> target = (Set<" + 
info.className + ">) pred.getValues();\n" +
+      "        final PrimitiveComparator<" + info.className + "> comparator = 
getComparator(columnPath);\n" +
+      "\n" +
+      "        valueInspector = new ValueInspector() {\n" +
+      "          @Override\n" +
+      "          public void updateNull() {\n" +
+      "            setResult(" + !isEq +");\n" +
+      "          }\n" +
+      "\n" +
+      "          @Override\n" +
+      "          public void update(" + info.primitiveName + " value) {\n" +
+      "            boolean set = false;\n");
+
+    add("            for (" + info.primitiveName + " i : target) {\n");
+
+    add("              if(" + compareEquality("value", "i", isEq) + ") {\n");
+
+    add("                 setResult(true);\n");
+
+    add("                 set = true;\n");
+
+    add("                 break;\n");
+
+    add("               }\n");
+
+    add("             }\n");
+    add("             if (!set) setResult(false);\n");
+    add("           }\n");
+
+    add("         };\n" +
+      "       }\n" +
+      "    }\n\n");
+  }
+
+
   private void addUdpBegin() throws IOException {
     add("    ColumnPath columnPath = pred.getColumn().getColumnPath();\n" +
         "    Class<T> clazz = pred.getColumn().getColumnType();\n" +
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
index d984164..d069836 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
@@ -22,6 +22,8 @@ package org.apache.parquet.filter2.bloomfilterlevel;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,6 +121,41 @@ public class BloomFilterImpl implements 
FilterPredicate.Visitor<Boolean>{
   }
 
   @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.In<T> in) {
+    Set<T> values = in.getValues();
+
+    if (values.contains(null)) {
+      // the bloom filter bitset contains only non-null values so isn't 
helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Operators.Column<T> filterColumn = in.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta);
+    if (bloomFilter != null) {
+      for (T value : values) {
+        if (bloomFilter.findHash(bloomFilter.hash(value))) {
+          return BLOCK_MIGHT_MATCH;
+        }
+      }
+      return BLOCK_CANNOT_MATCH;
+    }
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.NotIn<T> notIn) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
   public Boolean visit(Operators.And and) {
     return and.getLeft().accept(this) || and.getRight().accept(this);
   }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index 2f69fa6..c21212a 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -365,6 +365,108 @@ public class DictionaryFilter implements 
FilterPredicate.Visitor<Boolean> {
   }
 
   @Override
+  public <T extends Comparable<T>> Boolean visit(In<T> in) {
+    Set<T> values = in.getValues();
+
+    if (values.contains(null)) {
+      // the dictionary contains only non-null values so isn't helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Column<T> filterColumn = in.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet != null) {
+        return drop(dictSet, values);
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+    return BLOCK_MIGHT_MATCH; // cannot drop the row group based on this 
dictionary
+  }
+
+  private <T extends Comparable<T>> Boolean drop(Set<T> dictSet, Set<T> 
values) {
+    // we need to find out the smaller set to iterate through
+    Set<T> smallerSet;
+    Set<T> biggerSet;
+
+    if (values.size() < dictSet.size()) {
+      smallerSet = values;
+      biggerSet = dictSet;
+    } else {
+      smallerSet = dictSet;
+      biggerSet = values;
+    }
+
+    for (T e : smallerSet) {
+      if (biggerSet.contains(e)) {
+        // value sets intersect so rows match
+        return BLOCK_MIGHT_MATCH;
+      }
+    }
+    return BLOCK_CANNOT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(NotIn<T> notIn) {
+    Set<T> values = notIn.getValues();
+
+    Column<T> filterColumn = notIn.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    if (values.size() == 1 && values.contains(null) && meta == null) {
+      // the predicate value is null and all rows have a null value, so the
+      // predicate is always false (null != null)
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    if (values.contains(null)) {
+      // the dictionary contains only non-null values so isn't helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet != null) {
+        if (dictSet.size() > values.size()) return BLOCK_MIGHT_MATCH;
+        // ROWS_CANNOT_MATCH if no values in the dictionary that are not also 
in the set
+        return values.containsAll(dictSet) ? BLOCK_CANNOT_MATCH : 
BLOCK_MIGHT_MATCH;
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
   public Boolean visit(And and) {
     return and.getLeft().accept(this) || and.getRight().accept(this);
   }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
index 4db2eb9..23609a9 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -31,15 +32,18 @@ import 
org.apache.parquet.filter2.predicate.Operators.Column;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.In;
 import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
 import org.apache.parquet.filter2.predicate.Operators.Lt;
 import org.apache.parquet.filter2.predicate.Operators.LtEq;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.NotIn;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.column.MinMax;
 
 /**
  * Applies a {@link org.apache.parquet.filter2.predicate.FilterPredicate} to 
statistics about a group of
@@ -146,6 +150,71 @@ public class StatisticsFilter implements 
FilterPredicate.Visitor<Boolean> {
 
   @Override
   @SuppressWarnings("unchecked")
+  public <T extends Comparable<T>> Boolean visit(In<T> in) {
+    Column<T> filterColumn = in.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    Set<T> values = in.getValues();
+
+    if (meta == null) {
+      // the column isn't in this file so all values are null.
+      if (!values.contains(null)) {
+        // non-null is never null
+        return BLOCK_CANNOT_MATCH;
+      }
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    if (isAllNulls(meta)) {
+      // we are looking for records where v in(someNonNull)
+      // and this is a column of all nulls, so drop it unless in set contains 
null.
+      if (values.contains(null)) {
+        return BLOCK_MIGHT_MATCH;
+      } else {
+        return BLOCK_CANNOT_MATCH;
+      }
+    }
+
+    if (!stats.hasNonNullValue()) {
+      // stats does not contain min/max values, we cannot drop any chunks
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    if (stats.isNumNullsSet()) {
+      if (stats.getNumNulls() == 0) {
+        if (values.contains(null) && values.size() == 1) return 
BLOCK_CANNOT_MATCH;
+      } else {
+        if (values.contains(null)) return BLOCK_MIGHT_MATCH;
+      }
+    }
+
+    MinMax<T> minMax = new MinMax(meta.getPrimitiveType().comparator(), 
values);
+    T min = minMax.getMin();
+    T max = minMax.getMax();
+
+    // drop if all the element in value < min || all the element in value > max
+    if (stats.compareMinToValue(max) <= 0 &&
+      stats.compareMaxToValue(min) >= 0) {
+      return BLOCK_MIGHT_MATCH;
+    } else {
+      return BLOCK_CANNOT_MATCH;
+    }
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(NotIn<T> notIn) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
     Column<T> filterColumn = notEq.getColumn();
     ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
index 3282f27..3d18e1c 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
@@ -37,6 +37,8 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.apache.parquet.filter2.predicate.FilterApi.and;
 import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
@@ -44,10 +46,12 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.gt;
 import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.lt;
 import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
 import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
 import static org.apache.parquet.filter2.predicate.FilterApi.or;
 import static org.apache.parquet.io.api.Binary.fromString;
 import static org.apache.parquet.schema.OriginalType.UTF8;
@@ -98,6 +102,12 @@ public class TestFiltersWithMissingColumns {
   @Test
   public void testSimpleMissingColumnFilter() throws Exception {
     assertEquals(0, countFilteredRecords(path, lt(longColumn("missing"), 
500L)));
+    Set<Long> values = new HashSet<>();
+    values.add(1L);
+    values.add(2L);
+    values.add(5L);
+    assertEquals(0, countFilteredRecords(path, in(longColumn("missing"), 
values)));
+    assertEquals(1000, countFilteredRecords(path, notIn(longColumn("missing"), 
values)));
   }
 
   @Test
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
index 14877ab..4052777 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java
@@ -21,6 +21,8 @@ package org.apache.parquet.filter2.compat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
 
 import org.junit.Test;
 
@@ -32,8 +34,10 @@ import org.apache.parquet.schema.MessageTypeParser;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
 import static org.apache.parquet.hadoop.TestInputFormat.makeBlockFromStats;
 
 public class TestRowGroupFilter {
@@ -83,7 +87,29 @@ public class TestRowGroupFilter {
     MessageType schema = MessageTypeParser.parseMessageType("message Document 
{ optional int32 foo; }");
     IntColumn foo = intColumn("foo");
 
-    List<BlockMetaData> filtered = 
RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), blocks, schema);
+    Set<Integer> set1 = new HashSet<>();
+    set1.add(9);
+    set1.add(10);
+    set1.add(50);
+    List<BlockMetaData> filtered = 
RowGroupFilter.filterRowGroups(FilterCompat.get(in(foo, set1)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b5), filtered);
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notIn(foo, 
set1)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+
+    Set<Integer> set2 = new HashSet<>();
+    set2.add(null);
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(in(foo, set2)), 
blocks, schema);
+    assertEquals(Arrays.asList(b1, b3, b4, b5, b6), filtered);
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notIn(foo, 
set2)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+
+    set2.add(8);
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(in(foo, set2)), 
blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notIn(foo, 
set2)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), 
blocks, schema);
     assertEquals(Arrays.asList(b1, b2, b5), filtered);
 
     filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 
50)), blocks, schema);
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index 1e243f8..65cefe4 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -465,6 +465,122 @@ public class DictionaryFilterTest {
   }
 
   @Test
+  public void testInBinary() throws Exception {
+    BinaryColumn b = binaryColumn("binary_field");
+
+    Set<Binary> set1 = new HashSet<>();
+    set1.add(Binary.fromString("F"));
+    set1.add(Binary.fromString("C"));
+    set1.add(Binary.fromString("h"));
+    set1.add(Binary.fromString("E"));
+    FilterPredicate predIn1 = in(b, set1);
+    FilterPredicate predNotIn1 = notIn(b, set1);
+    assertFalse("Should not drop block", canDrop(predIn1, ccmd, dictionaries));
+    assertFalse("Should not drop block", canDrop(predNotIn1, ccmd, 
dictionaries));
+
+    Set<Binary> set2 = new HashSet<>();
+    for (int  i = 0; i < 26; i++) {
+      set2.add(Binary.fromString(Character.toString((char) (i + 97))));
+    }
+    set2.add(Binary.fromString("A"));
+    FilterPredicate predIn2 = in(b, set2);
+    FilterPredicate predNotIn2 = notIn(b, set2);
+    assertFalse("Should not drop block", canDrop(predIn2, ccmd, dictionaries));
+    assertTrue("Should not drop block", canDrop(predNotIn2, ccmd, 
dictionaries));
+
+    Set<Binary> set3 = new HashSet<>();
+    set3.add(Binary.fromString("F"));
+    set3.add(Binary.fromString("C"));
+    set3.add(Binary.fromString("A"));
+    set3.add(Binary.fromString("E"));
+    FilterPredicate predIn3 = in(b, set3);
+    FilterPredicate predNotIn3 = notIn(b, set3);
+    assertTrue("Should drop block", canDrop(predIn3, ccmd, dictionaries));
+    assertFalse("Should not drop block", canDrop(predNotIn3, ccmd, 
dictionaries));
+
+    Set<Binary> set4 = new HashSet<>();
+    set4.add(null);
+    FilterPredicate predIn4 = in(b, set4);
+    FilterPredicate predNotIn4 = notIn(b, set4);
+    assertFalse("Should not drop block for null", canDrop(predIn4, ccmd, 
dictionaries));
+    assertFalse("Should not drop block for null", canDrop(predNotIn4, ccmd, 
dictionaries));
+  }
+
+  @Test
+  public void testInFixed() throws Exception {
+    BinaryColumn b = binaryColumn("fixed_field");
+
+    // Only V2 supports dictionary encoding for FIXED_LEN_BYTE_ARRAY values
+    if (version == PARQUET_2_0) {
+      Set<Binary> set1 = new HashSet<>();
+      set1.add(toBinary("-2", 17));
+      set1.add(toBinary("-22", 17));
+      set1.add(toBinary("12345", 17));
+      FilterPredicate predIn1 = in(b, set1);
+      FilterPredicate predNotIn1 = notIn(b, set1);
+      assertTrue("Should drop block for in (-2, -22, 12345)",
+        canDrop(predIn1, ccmd, dictionaries));
+      assertFalse("Should not drop block for notIn (-2, -22, 12345)",
+        canDrop(predNotIn1, ccmd, dictionaries));
+
+      Set<Binary> set2 = new HashSet<>();
+      set2.add(toBinary("-1", 17));
+      set2.add(toBinary("0", 17));
+      set2.add(toBinary("12345", 17));
+      assertFalse("Should not drop block for in (-1, 0, 12345)",
+        canDrop(in(b, set2), ccmd, dictionaries));
+      assertFalse("Should not drop block for in (-1, 0, 12345)",
+        canDrop(notIn(b, set2), ccmd, dictionaries));
+    }
+
+    Set<Binary> set3 = new HashSet<>();
+    set3.add(null);
+    FilterPredicate predIn3 = in(b, set3);
+    FilterPredicate predNotIn3 = notIn(b, set3);
+    assertFalse("Should not drop block for null",
+      canDrop(predIn3, ccmd, dictionaries));
+    assertFalse("Should not drop block for null",
+      canDrop(predNotIn3, ccmd, dictionaries));
+  }
+
+  @Test
+  public void testInInt96() throws Exception {
+    // INT96 ordering is undefined => no filtering shall be done
+    BinaryColumn b = binaryColumn("int96_field");
+
+    Set<Binary> set1 = new HashSet<>();
+    set1.add(toBinary("-2", 12));
+    set1.add(toBinary("-0", 12));
+    set1.add(toBinary("12345", 12));
+    FilterPredicate predIn1 = in(b, set1);
+    FilterPredicate predNotIn1 = notIn(b, set1);
+    assertFalse("Should not drop block for in (-2, -0, 12345)",
+      canDrop(predIn1, ccmd, dictionaries));
+    assertFalse("Should not drop block for notIn (-2, -0, 12345)",
+      canDrop(predNotIn1, ccmd, dictionaries));
+
+    Set<Binary> set2 = new HashSet<>();
+    set2.add(toBinary("-2", 17));
+    set2.add(toBinary("12345", 17));
+    set2.add(toBinary("-789", 17));
+    FilterPredicate predIn2 = in(b, set2);
+    FilterPredicate predNotIn2 = notIn(b, set2);
+    assertFalse("Should not drop block for in (-2, 12345, -789)",
+      canDrop(predIn2, ccmd, dictionaries));
+    assertFalse("Should not drop block for notIn (-2, 12345, -789)",
+      canDrop(predNotIn2, ccmd, dictionaries));
+
+    Set<Binary> set3 = new HashSet<>();
+    set3.add(null);
+    FilterPredicate predIn3 = in(b, set3);
+    FilterPredicate predNotIn3 = notIn(b, set3);
+    assertFalse("Should not drop block for null",
+      canDrop(predIn3, ccmd, dictionaries));
+    assertFalse("Should not drop block for null",
+      canDrop(predNotIn3, ccmd, dictionaries));
+  }
+
+  @Test
   public void testAnd() throws Exception {
     BinaryColumn col = binaryColumn("binary_field");
 
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
index 5a7d02f..4c3538c 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
@@ -50,6 +50,7 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.not;
 import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
 import static org.apache.parquet.filter2.predicate.FilterApi.or;
@@ -147,6 +148,39 @@ public class TestRecordLevelFilters {
   }
 
   @Test
+  public void testInFilter() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    HashSet<Binary> nameSet = new HashSet<>();
+    nameSet.add(Binary.fromString("thing2"));
+    nameSet.add(Binary.fromString("thing1"));
+    for (int i = 100; i < 200; i++) {
+      nameSet.add(Binary.fromString("p" + i));
+    }
+    FilterPredicate pred = in(name, nameSet);
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, 
FilterCompat.get(pred));
+
+    List<String> expectedNames = new ArrayList<>();
+    expectedNames.add("thing1");
+    expectedNames.add("thing2");
+    for (int i = 100; i < 200; i++) {
+      expectedNames.add("p" + i);
+    }
+    expectedNames.add("dummy1");
+    expectedNames.add("dummy2");
+    expectedNames.add("dummy3");
+
+    // validate that all the values returned by the reader fulfills the filter 
and there are no values left out,
+    // i.e. "thing1", "thing2" and from "p100" to "p199" and nothing else.
+    assertEquals(expectedNames.get(0), 
((Group)(found.get(0))).getString("name", 0));
+    assertEquals(expectedNames.get(1), 
((Group)(found.get(1))).getString("name", 0));
+    for (int i = 2; i < 102; i++) {
+      assertEquals(expectedNames.get(i), 
((Group)(found.get(i))).getString("name", 0));
+    }
+    assert(found.size() == 102);
+  }
+
+  @Test
   public void testNameNotNull() throws Exception {
     BinaryColumn name = binaryColumn("name");
 
@@ -232,7 +266,7 @@ public class TestRecordLevelFilters {
     LongColumn name = longColumn("id");
 
     final HashSet<Long> h = new HashSet<Long>();
-    h.add(20L); 
+    h.add(20L);
     h.add(27L);
     h.add(28L);
     
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
index 97dd169..e9682e6 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.filter2.statisticslevel;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.junit.Test;
 
@@ -37,6 +38,7 @@ import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Types;
 
@@ -51,11 +53,13 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.gt;
 import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.lt;
 import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
 import static org.apache.parquet.filter2.predicate.FilterApi.not;
 import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
 import static org.apache.parquet.filter2.predicate.FilterApi.or;
 import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
 import static 
org.apache.parquet.filter2.statisticslevel.StatisticsFilter.canDrop;
@@ -279,6 +283,93 @@ public class TestStatisticsFilter {
   }
 
   @Test
+  public void testInNotIn() {
+    Set<Integer> values1 = new HashSet<>();
+    values1.add(10);
+    values1.add(12);
+    values1.add(15);
+    values1.add(17);
+    values1.add(19);
+    assertFalse(canDrop(in(intColumn, values1), columnMetas));
+    assertFalse(canDrop(notIn(intColumn, values1), columnMetas));
+
+    Set<Integer> values2 = new HashSet<>();
+    values2.add(109);
+    values2.add(2);
+    values2.add(5);
+    values2.add(117);
+    values2.add(101);
+    assertFalse(canDrop(in(intColumn, values2), columnMetas));
+    assertFalse(canDrop(notIn(intColumn, values2), columnMetas));
+
+    Set<Integer> values3 = new HashSet<>();
+    values3.add(1);
+    values3.add(2);
+    values3.add(5);
+    values3.add(7);
+    values3.add(10);
+    assertFalse(canDrop(in(intColumn, values3), columnMetas));
+    assertFalse(canDrop(notIn(intColumn, values3), columnMetas));
+
+    Set<Integer> values4 = new HashSet<>();
+    values4.add(50);
+    values4.add(60);
+    assertFalse(canDrop(in(intColumn, values4), missingMinMaxColumnMetas));
+    assertFalse(canDrop(notIn(intColumn, values4), missingMinMaxColumnMetas));
+
+    Set<Double> values5 = new HashSet<>();
+    values5.add(1.0);
+    values5.add(2.0);
+    values5.add(95.0);
+    values5.add(107.0);
+    values5.add(99.0);
+    assertFalse(canDrop(in(doubleColumn, values5), columnMetas));
+    assertFalse(canDrop(notIn(doubleColumn, values5), columnMetas));
+
+    Set<Binary> values6 = new HashSet<>();
+    values6.add(Binary.fromString("test1"));
+    values6.add(Binary.fromString("test2"));
+    assertTrue(canDrop(in(missingColumn, values6), columnMetas));
+    assertFalse(canDrop(notIn(missingColumn, values6), columnMetas));
+
+    Set<Integer> values7 = new HashSet<>();
+    values7.add(null);
+    assertFalse(canDrop(in(intColumn, values7), nullColumnMetas));
+    assertFalse(canDrop(notIn(intColumn, values7), nullColumnMetas));
+
+    Set<Binary> values8 = new HashSet<>();
+    values8.add(null);
+    assertFalse(canDrop(in(missingColumn, values8), columnMetas));
+    assertFalse(canDrop(notIn(missingColumn, values8), columnMetas));
+
+    IntStatistics statsNoNulls = new IntStatistics();
+    statsNoNulls.setMinMax(10, 100);
+    statsNoNulls.setNumNulls(0);
+
+    IntStatistics statsSomeNulls = new IntStatistics();
+    statsSomeNulls.setMinMax(10, 100);
+    statsSomeNulls.setNumNulls(3);
+
+    Set<Integer> values9 = new HashSet<>();
+    values9.add(null);
+    assertTrue(canDrop(in(intColumn, values9), Arrays.asList(
+      getIntColumnMeta(statsNoNulls, 177L),
+      getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(notIn(intColumn, values9), Arrays.asList(
+      getIntColumnMeta(statsNoNulls, 177L),
+      getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(in(intColumn, values9), Arrays.asList(
+      getIntColumnMeta(statsSomeNulls, 177L),
+      getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(notIn(intColumn, values9), Arrays.asList(
+      getIntColumnMeta(statsSomeNulls, 177L),
+      getDoubleColumnMeta(doubleStats, 177L))));
+  }
+
+  @Test
   public void testAnd() {
     FilterPredicate yes = eq(intColumn, 9);
     FilterPredicate no = eq(doubleColumn, 50D);
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index 4ebe15a..b07fccd 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -48,10 +48,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -311,6 +313,30 @@ public class TestBloomFiltering {
     assertCorrectFiltering(
       record -> "miller".equals(record.getName()),
       eq(binaryColumn("name"), Binary.fromString("miller")));
+
+    Set<Binary> values1 = new HashSet<>();
+    values1.add(Binary.fromString("miller"));
+    values1.add(Binary.fromString("anderson"));
+
+    assertCorrectFiltering(
+      record -> "miller".equals(record.getName()) || 
"anderson".equals(record.getName()),
+      in(binaryColumn("name"), values1));
+
+    Set<Binary> values2 = new HashSet<>();
+    values2.add(Binary.fromString("miller"));
+    values2.add(Binary.fromString("alien"));
+
+    assertCorrectFiltering(
+      record -> "miller".equals(record.getName()),
+      in(binaryColumn("name"), values2));
+
+    Set<Binary> values3 = new HashSet<>();
+    values3.add(Binary.fromString("alien"));
+    values3.add(Binary.fromString("predator"));
+
+    assertCorrectFiltering(
+      record -> "dummy".equals(record.getName()),
+      in(binaryColumn("name"), values3));
   }
 
   @Test
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index a66533d..5e18105 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -25,6 +25,7 @@ import static 
org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
 import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
 import static org.apache.parquet.filter2.predicate.FilterApi.lt;
 import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
@@ -53,10 +54,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -377,12 +380,48 @@ public class TestColumnIndexFiltering {
     assertCorrectFiltering(
         record -> record.getId() == 1234,
         eq(longColumn("id"), 1234l));
+
+    Set<Long> idSet = new HashSet<>();
+    idSet.add(1234l);
+    idSet.add(5678l);
+    idSet.add(1357l);
+    idSet.add(111l);
+    idSet.add(6666l);
+    idSet.add(2l);
+    idSet.add(2468l);
+
+    assertCorrectFiltering(
+      record -> (record.getId() == 1234 || record.getId() == 5678 || 
record.getId() == 1357 ||
+        record.getId() == 111 || record.getId() == 6666 || record.getId() == 2 
|| record.getId() == 2468),
+      in(longColumn("id"), idSet)
+    );
+
     assertCorrectFiltering(
         record -> "miller".equals(record.getName()),
         eq(binaryColumn("name"), Binary.fromString("miller")));
+
+    Set<Binary> nameSet = new HashSet<>();
+    nameSet.add(Binary.fromString("anderson"));
+    nameSet.add(Binary.fromString("miller"));
+    nameSet.add(Binary.fromString("thomas"));
+    nameSet.add(Binary.fromString("williams"));
+
+    assertCorrectFiltering(
+      record -> ("anderson".equals(record.getName()) || 
"miller".equals(record.getName()) ||
+        "thomas".equals(record.getName()) || 
"williams".equals(record.getName())),
+      in(binaryColumn("name"), nameSet)
+    );
+
     assertCorrectFiltering(
         record -> record.getName() == null,
         eq(binaryColumn("name"), null));
+
+    Set<Binary> nullSet = new HashSet<>();
+    nullSet.add(null);
+
+    assertCorrectFiltering(
+      record -> record.getName() == null,
+      in(binaryColumn("name"), nullSet));
   }
 
   @Test

Reply via email to