This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new dab5aae88 PARQUET-34: Add #contains FilterPredicate for Array columns
(#1328)
dab5aae88 is described below
commit dab5aae880ff27734351c9bb576fe503a6d6bf4f
Author: Claire McGinty <[email protected]>
AuthorDate: Tue Jun 4 10:34:35 2024 -0400
PARQUET-34: Add #contains FilterPredicate for Array columns (#1328)
---
.../parquet/filter2/compat/FilterCompat.java | 8 +-
...lInverseRewriter.java => ContainsRewriter.java} | 95 ++++++--
.../parquet/filter2/predicate/FilterApi.java | 5 +
.../parquet/filter2/predicate/FilterPredicate.java | 5 +
.../filter2/predicate/LogicalInverseRewriter.java | 6 +
.../parquet/filter2/predicate/LogicalInverter.java | 6 +
.../parquet/filter2/predicate/Operators.java | 139 ++++++++++-
.../predicate/SchemaCompatibilityValidator.java | 21 +-
.../IncrementallyUpdatedFilterPredicate.java | 2 +-
.../column/columnindex/ColumnIndexBuilder.java | 6 +
.../internal/column/columnindex/IndexIterator.java | 113 +++++++++
.../filter2/columnindex/ColumnIndexFilter.java | 6 +
.../filter2/predicate/TestContainsRewriter.java | 89 ++++++++
.../filter2/predicate/TestFilterApiMethods.java | 28 +++
.../TestSchemaCompatibilityValidator.java | 26 ++-
.../column/columnindex/TestColumnIndexBuilder.java | 57 +++++
.../column/columnindex/TestIndexIterator.java | 91 ++++++++
.../filter2/columnindex/TestColumnIndexFilter.java | 70 +++++-
...crementallyUpdatedFilterPredicateGenerator.java | 254 +++++++++++++++++++--
.../filter2/bloomfilterlevel/BloomFilterImpl.java | 5 +
.../filter2/dictionarylevel/DictionaryFilter.java | 6 +
.../filter2/statisticslevel/StatisticsFilter.java | 6 +
.../dictionarylevel/DictionaryFilterTest.java | 56 ++++-
.../filter2/recordlevel/PhoneBookWriter.java | 56 ++++-
.../recordlevel/TestRecordLevelFilters.java | 107 ++++++++-
.../statisticslevel/TestStatisticsFilter.java | 34 +++
.../apache/parquet/hadoop/TestBloomFiltering.java | 57 ++++-
.../parquet/hadoop/TestParquetWriterError.java | 9 +-
28 files changed, 1303 insertions(+), 60 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
index 76d51af89..7e265bbb0 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.Preconditions.checkArgument;
import java.util.Objects;
import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.ContainsRewriter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
import org.slf4j.Logger;
@@ -82,7 +83,12 @@ public class FilterCompat {
LOG.info("Predicate has been collapsed to: {}", collapsedPredicate);
}
- return new FilterPredicateCompat(collapsedPredicate);
+ FilterPredicate rewrittenContainsPredicate =
ContainsRewriter.rewrite(collapsedPredicate);
+ if (!collapsedPredicate.equals(rewrittenContainsPredicate)) {
+ LOG.info("Contains() Predicate has been rewritten to: {}",
rewrittenContainsPredicate);
+ }
+
+ return new FilterPredicateCompat(rewrittenContainsPredicate);
}
/**
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java
similarity index 54%
copy from
parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
copy to
parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java
index 175f9b4b7..ea2d70e8e 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java
@@ -18,12 +18,10 @@
*/
package org.apache.parquet.filter2.predicate;
-import static org.apache.parquet.filter2.predicate.FilterApi.and;
-import static org.apache.parquet.filter2.predicate.FilterApi.or;
-
import java.util.Objects;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -38,25 +36,21 @@ import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;
/**
- * Recursively removes all use of the not() operator in a predicate
- * by replacing all instances of not(x) with the inverse(x),
- * eg: not(and(eq(), not(eq(y))) -> or(notEq(), eq(y))
- * <p>
- * The returned predicate should have the same meaning as the original, but
- * without the use of the not() operator.
- * <p>
- * See also {@link LogicalInverter}, which is used
- * to do the inversion.
+ * Recursively rewrites Contains predicates composed using And or Or into a
single Contains predicate
+ * containing all predicate assertions.
+ *
+ * This is a performance optimization, as all composed Contains sub-predicates
must share the same column, and
+ * can therefore be applied efficiently as a single predicate pass.
*/
-public final class LogicalInverseRewriter implements Visitor<FilterPredicate> {
- private static final LogicalInverseRewriter INSTANCE = new
LogicalInverseRewriter();
+public final class ContainsRewriter implements Visitor<FilterPredicate> {
+ private static final ContainsRewriter INSTANCE = new ContainsRewriter();
public static FilterPredicate rewrite(FilterPredicate pred) {
Objects.requireNonNull(pred, "pred cannot be null");
return pred.accept(INSTANCE);
}
- private LogicalInverseRewriter() {}
+ private ContainsRewriter() {}
@Override
public <T extends Comparable<T>> FilterPredicate visit(Eq<T> eq) {
@@ -98,19 +92,84 @@ public final class LogicalInverseRewriter implements
Visitor<FilterPredicate> {
return notIn;
}
+ @Override
+ public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains)
{
+ return contains;
+ }
+
@Override
public FilterPredicate visit(And and) {
- return and(and.getLeft().accept(this), and.getRight().accept(this));
+ final FilterPredicate left;
+ if (and.getLeft() instanceof And) {
+ left = visit((And) and.getLeft());
+ } else if (and.getLeft() instanceof Or) {
+ left = visit((Or) and.getLeft());
+ } else if (and.getLeft() instanceof Contains) {
+ left = and.getLeft();
+ } else {
+ return and;
+ }
+
+ final FilterPredicate right;
+ if (and.getRight() instanceof And) {
+ right = visit((And) and.getRight());
+ } else if (and.getRight() instanceof Or) {
+ right = visit((Or) and.getRight());
+ } else if (and.getRight() instanceof Contains) {
+ right = and.getRight();
+ } else {
+ return and;
+ }
+
+ if (left instanceof Contains) {
+ if (!(right instanceof Contains)) {
+ throw new UnsupportedOperationException(
+ "Contains predicates cannot be composed with non-Contains
predicates");
+ }
+ return ((Contains) left).and(right);
+ } else {
+ return and;
+ }
}
@Override
public FilterPredicate visit(Or or) {
- return or(or.getLeft().accept(this), or.getRight().accept(this));
+ final FilterPredicate left;
+ if (or.getLeft() instanceof And) {
+ left = visit((And) or.getLeft());
+ } else if (or.getLeft() instanceof Or) {
+ left = visit((Or) or.getLeft());
+ } else if (or.getLeft() instanceof Contains) {
+ left = or.getLeft();
+ } else {
+ return or;
+ }
+
+ final FilterPredicate right;
+ if (or.getRight() instanceof And) {
+ right = visit((And) or.getRight());
+ } else if (or.getRight() instanceof Or) {
+ right = visit((Or) or.getRight());
+ } else if (or.getRight() instanceof Contains) {
+ right = or.getRight();
+ } else {
+ return or;
+ }
+
+ if (left instanceof Contains) {
+ if (!(right instanceof Contains)) {
+ throw new UnsupportedOperationException(
+ "Contains predicates cannot be composed with non-Contains
predicates");
+ }
+ return ((Contains) left).or(right);
+ } else {
+ return or;
+ }
}
@Override
public FilterPredicate visit(Not not) {
- return LogicalInverter.invert(not.getPredicate().accept(this));
+ throw new IllegalStateException("Not predicate should be rewritten before
being evaluated by ContainsRewriter");
}
@Override
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
index 841d68f2c..4126b73e5 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
@@ -24,6 +24,7 @@ import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
@@ -257,6 +258,10 @@ public final class FilterApi {
return new NotIn<>(column, values);
}
+ public static <T extends Comparable<T>> Contains<T> contains(Eq<T> pred) {
+ return Contains.of(pred);
+ }
+
/**
* Keeps records that pass the provided {@link UserDefinedPredicate}
* <p>
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
index 2f4c534d9..a662bb0b1 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
@@ -19,6 +19,7 @@
package org.apache.parquet.filter2.predicate;
import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -84,6 +85,10 @@ public interface FilterPredicate {
throw new UnsupportedOperationException("visit NotIn is not supported.");
}
+ default <T extends Comparable<T>> R visit(Contains<T> contains) {
+ throw new UnsupportedOperationException("visit Contains is not
supported.");
+ }
+
R visit(And and);
R visit(Or or);
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
index 175f9b4b7..d1d7f07e8 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
@@ -24,6 +24,7 @@ import static
org.apache.parquet.filter2.predicate.FilterApi.or;
import java.util.Objects;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -98,6 +99,11 @@ public final class LogicalInverseRewriter implements
Visitor<FilterPredicate> {
return notIn;
}
+ @Override
+ public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains)
{
+ return contains;
+ }
+
@Override
public FilterPredicate visit(And and) {
return and(and.getLeft().accept(this), and.getRight().accept(this));
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
index 93ebb34b7..d1d006ccf 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.filter2.predicate;
import java.util.Objects;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -92,6 +93,11 @@ public final class LogicalInverter implements
Visitor<FilterPredicate> {
return new In<>(notIn.getColumn(), notIn.getValues());
}
+ @Override
+ public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains)
{
+ throw new UnsupportedOperationException("Contains not supported yet");
+ }
+
@Override
public FilterPredicate visit(And and) {
return new Or(and.getLeft().accept(this), and.getRight().accept(this));
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
index 1a9ea984f..b86a5ef09 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
+import java.util.function.BiFunction;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.io.api.Binary;
@@ -84,6 +85,8 @@ public final class Operators {
public static interface SupportsLtGt
extends SupportsEqNotEq {} // marker for columns that can be used with
lt(), ltEq(), gt(), gtEq()
+ public static interface SupportsContains {}
+
public static final class IntColumn extends Column<Integer> implements
SupportsLtGt {
IntColumn(ColumnPath columnPath) {
super(columnPath, Integer.class);
@@ -169,7 +172,7 @@ public final class Operators {
}
}
- public static final class Eq<T extends Comparable<T>> extends
ColumnFilterPredicate<T> {
+ public static final class Eq<T extends Comparable<T>> extends
ColumnFilterPredicate<T> implements SupportsContains {
// value can be null
public Eq(Column<T> column, T value) {
@@ -315,6 +318,140 @@ public final class Operators {
}
}
+ public abstract static class Contains<T extends Comparable<T>> implements
FilterPredicate, Serializable {
+ private final Column<T> column;
+
+ protected Contains(Column<T> column) {
+ this.column = Objects.requireNonNull(column, "column cannot be null");
+ }
+
+ static <ColumnT extends Comparable<ColumnT>, C extends
ColumnFilterPredicate<ColumnT> & SupportsContains>
+ Contains<ColumnT> of(C pred) {
+ return new ContainsColumnPredicate<>(pred);
+ }
+
+ public Column<T> getColumn() {
+ return column;
+ }
+
+ @Override
+ public <R> R accept(Visitor<R> visitor) {
+ return visitor.visit(this);
+ }
+
+ /**
+ * Applies a filtering Vistitor to the Contains predicate, traversing any
composed And or Or clauses,
+ * and finally delegating to the underlying ColumnFilterPredicate.
+ */
+ public abstract <R> R filter(
+ Visitor<R> visitor, BiFunction<R, R, R> andBehavior, BiFunction<R, R,
R> orBehavior);
+
+ Contains<T> and(FilterPredicate other) {
+ return new ContainsComposedPredicate<>(this, (Contains<T>) other,
ContainsComposedPredicate.Combinator.AND);
+ }
+
+ Contains<T> or(FilterPredicate other) {
+ return new ContainsComposedPredicate<>(this, (Contains<T>) other,
ContainsComposedPredicate.Combinator.OR);
+ }
+ }
+
+ private static class ContainsComposedPredicate<T extends Comparable<T>>
extends Contains<T> {
+ private final Contains<T> left;
+ private final Contains<T> right;
+
+ private final Combinator combinator;
+
+ private enum Combinator {
+ AND,
+ OR
+ }
+
+ ContainsComposedPredicate(Contains<T> left, Contains<T> right, Combinator
combinator) {
+ super(Objects.requireNonNull(left, "left predicate cannot be
null").getColumn());
+
+ if (!left.getColumn()
+ .columnPath
+ .equals(Objects.requireNonNull(right, "right predicate cannot be
null")
+ .getColumn()
+ .columnPath)) {
+ throw new IllegalArgumentException("Composed Contains predicates must
reference the same column name; "
+ + "found [" + left.getColumn().columnPath.toDotString() + ", "
+ + right.getColumn().columnPath.toDotString() + "]");
+ }
+
+ this.left = left;
+ this.right = right;
+ this.combinator = combinator;
+ }
+
+ @Override
+ public <R> R filter(Visitor<R> visitor, BiFunction<R, R, R> andBehavior,
BiFunction<R, R, R> orBehavior) {
+ final R filterLeft = left.filter(visitor, andBehavior, orBehavior);
+ final R filterRight = right.filter(visitor, andBehavior, orBehavior);
+
+ if (combinator == Combinator.AND) {
+ return andBehavior.apply(filterLeft, filterRight);
+ } else {
+ return orBehavior.apply(filterLeft, filterRight);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return combinator.toString().toLowerCase() + "(" + left + ", " + right +
")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ContainsComposedPredicate<T> that = (ContainsComposedPredicate<T>) o;
+ return left.equals(that.left) && right.equals(that.right);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass().getName(), left, right);
+ }
+ }
+
+ private static class ContainsColumnPredicate<T extends Comparable<T>, U
extends ColumnFilterPredicate<T>>
+ extends Contains<T> {
+ private final U underlying;
+
+ ContainsColumnPredicate(U underlying) {
+ super(underlying.getColumn());
+ if (underlying.getValue() == null) {
+ throw new IllegalArgumentException("Contains predicate does not
support null element value");
+ }
+ this.underlying = underlying;
+ }
+
+ @Override
+ public String toString() {
+ String name = Contains.class.getSimpleName().toLowerCase(Locale.ENGLISH);
+ return name + "(" + underlying.toString() + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ContainsColumnPredicate<T, U> that = (ContainsColumnPredicate<T, U>) o;
+ return underlying.equals(that.underlying);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass().getName(), underlying);
+ }
+
+ @Override
+ public <R> R filter(Visitor<R> visitor, BiFunction<R, R, R> andBehavior,
BiFunction<R, R, R> orBehavior) {
+ return underlying.accept(visitor);
+ }
+ }
+
public static final class NotIn<T extends Comparable<T>> extends
SetColumnFilterPredicate<T> {
NotIn(Column<T> column, Set<T> values) {
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
index c8997a9e2..b5708a4a0 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
@@ -25,6 +25,7 @@ import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.parquet.filter2.predicate.Operators.ColumnFilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -128,6 +129,12 @@ public class SchemaCompatibilityValidator implements
FilterPredicate.Visitor<Voi
return null;
}
+ @Override
+ public <T extends Comparable<T>> Void visit(Contains<T> pred) {
+ validateColumnFilterPredicate(pred);
+ return null;
+ }
+
@Override
public Void visit(And and) {
and.getLeft().accept(this);
@@ -167,7 +174,15 @@ public class SchemaCompatibilityValidator implements
FilterPredicate.Visitor<Voi
validateColumn(pred.getColumn());
}
+ private <T extends Comparable<T>> void
validateColumnFilterPredicate(Contains<T> pred) {
+ validateColumn(pred.getColumn(), true);
+ }
+
private <T extends Comparable<T>> void validateColumn(Column<T> column) {
+ validateColumn(column, false);
+ }
+
+ private <T extends Comparable<T>> void validateColumn(Column<T> column,
boolean shouldBeRepeated) {
ColumnPath path = column.getColumnPath();
Class<?> alreadySeen = columnTypesEncountered.get(path);
@@ -189,7 +204,11 @@ public class SchemaCompatibilityValidator implements
FilterPredicate.Visitor<Voi
return;
}
- if (descriptor.getMaxRepetitionLevel() > 0) {
+ if (shouldBeRepeated && descriptor.getMaxRepetitionLevel() == 0) {
+ throw new IllegalArgumentException(
+ "FilterPredicate for column " + path.toDotString() + " requires a
repeated "
+ + "schema, but found max repetition level " +
descriptor.getMaxRepetitionLevel());
+ } else if (!shouldBeRepeated && descriptor.getMaxRepetitionLevel() > 0) {
throw new IllegalArgumentException("FilterPredicates do not currently
support repeated columns. "
+ "Column " + path.toDotString() + " is repeated.");
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
index f1d7774d9..3c28ba6af 100644
---
a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
+++
b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
@@ -98,7 +98,7 @@ public interface IncrementallyUpdatedFilterPredicate {
/**
* Reset to clear state and begin evaluating the next record.
*/
- public final void reset() {
+ public void reset() {
isKnown = false;
result = false;
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index bc5c809e0..f4fe80ab9 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -41,6 +41,7 @@ import org.apache.parquet.column.MinMax;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -368,6 +369,11 @@ public abstract class ColumnIndexBuilder {
return IndexIterator.all(getPageCount());
}
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Contains<T>
contains) {
+ return contains.filter(this, IndexIterator::intersection,
IndexIterator::union);
+ }
+
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
PrimitiveIterator.OfInt visit(
UserDefined<T, U> udp) {
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
index bb0bdc849..86f93c4e9 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
@@ -55,6 +55,119 @@ public class IndexIterator implements
PrimitiveIterator.OfInt {
return new IndexIterator(from, to + 1, i -> true, translator);
}
+ static PrimitiveIterator.OfInt intersection(PrimitiveIterator.OfInt lhs,
PrimitiveIterator.OfInt rhs) {
+ return new PrimitiveIterator.OfInt() {
+ private int next = fetchNext();
+
+ @Override
+ public int nextInt() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ int result = next;
+ next = fetchNext();
+ return result;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != -1;
+ }
+
+ private int fetchNext() {
+ if (!lhs.hasNext() || !rhs.hasNext()) {
+ return -1;
+ }
+
+ // Since we know both iterators are in sorted order, we can iterate
linearly through until
+ // we find the next value that belongs to both iterators, or terminate
if none exist
+ int nextL = lhs.next();
+ int nextR = rhs.next();
+
+ while (true) {
+ // Try to iterate LHS and RHS to the next intersecting value
+ while (nextL < nextR && lhs.hasNext()) {
+ nextL = lhs.next();
+ }
+ while (nextR < nextL && rhs.hasNext()) {
+ nextR = rhs.next();
+ }
+ if (nextL == nextR) {
+ return nextL;
+ }
+
+ // No intersection found; advance LHS to the next element and retry
loop
+ if (nextL < nextR && lhs.hasNext()) {
+ nextL = lhs.next();
+ } else {
+ break;
+ }
+ }
+
+ return -1;
+ }
+ };
+ }
+
+ static PrimitiveIterator.OfInt union(PrimitiveIterator.OfInt lhs,
PrimitiveIterator.OfInt rhs) {
+ return new PrimitiveIterator.OfInt() {
+ private int peekL = -1;
+ private int peekR = -1;
+ private int next = fetchNext();
+
+ @Override
+ public int nextInt() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ int result = next;
+ next = fetchNext();
+ return result;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != -1;
+ }
+
+ private int fetchNext() {
+ if ((peekL == -1 && peekR == -1) && (!lhs.hasNext() &&
!rhs.hasNext())) {
+ return -1;
+ }
+
+ if (peekL == -1 && lhs.hasNext()) {
+ peekL = lhs.next();
+ }
+
+ if (peekR == -1 && rhs.hasNext()) {
+ peekR = rhs.next();
+ }
+
+ // Return the smaller of the two next iterator values
+ int result;
+ if (peekL != -1 && (peekL == peekR || peekR == -1)) {
+ // If RHS is exhausted or intersects with LHS, return l and throw
away r to avoid duplicates
+ result = peekL;
+ peekL = -1;
+ peekR = -1;
+ } else if (peekL == -1 && peekR != -1) {
+ // If LHS is exhausted, return RHS
+ result = peekR;
+ peekR = -1;
+ } else if (peekL < peekR) {
+ // If LHS value is smaller than RHS value, return LHS
+ result = peekL;
+ peekL = -1;
+ } else {
+ // If RHS value is smaller than LHS value, return RHS
+ result = peekR;
+ peekR = -1;
+ }
+ return result;
+ }
+ };
+ }
+
private IndexIterator(int startIndex, int endIndex, IntPredicate filter,
IntUnaryOperator translator) {
this.endIndex = endIndex;
this.filter = filter;
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
index 264212a24..e46673f01 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -29,6 +29,7 @@ import
org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -155,6 +156,11 @@ public class ColumnIndexFilter implements
Visitor<RowRanges> {
return applyPredicate(notIn.getColumn(), ci -> ci.visit(notIn), isNull ?
RowRanges.EMPTY : allRows());
}
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Contains<T> contains) {
+ return contains.filter(this, RowRanges::intersection, RowRanges::union);
+ }
+
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
RowRanges visit(UserDefined<T, U> udp) {
return applyPredicate(
diff --git
a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestContainsRewriter.java
b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestContainsRewriter.java
new file mode 100644
index 000000000..5daa5d79b
--- /dev/null
+++
b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestContainsRewriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import static org.apache.parquet.filter2.predicate.ContainsRewriter.rewrite;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.parquet.filter2.predicate.Operators.Contains;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.junit.Test;
+
+public class TestContainsRewriter {
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+ private static void assertNoOp(FilterPredicate p) {
+ assertEquals(p, rewrite(p));
+ }
+
+ @Test
+ public void testBaseCases() {
+ UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+ assertNoOp(eq(intColumn, 17));
+ assertNoOp(notEq(intColumn, 17));
+ assertNoOp(lt(intColumn, 17));
+ assertNoOp(ltEq(intColumn, 17));
+ assertNoOp(gt(intColumn, 17));
+ assertNoOp(gtEq(intColumn, 17));
+ assertNoOp(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ assertNoOp(or(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ assertNoOp(ud);
+
+ Contains<Integer> containsLhs = contains(eq(intColumn, 17));
+ Contains<Integer> containsRhs = contains(eq(intColumn, 7));
+
+ assertNoOp(containsLhs);
+ assertEquals(containsLhs.and(containsRhs), rewrite(and(containsLhs,
containsRhs)));
+ assertEquals(containsLhs.or(containsRhs), rewrite(or(containsLhs,
containsRhs)));
+ }
+
+ @Test
+ public void testNested() {
+ Contains<Integer> contains1 = contains(eq(intColumn, 1));
+ Contains<Integer> contains2 = contains(eq(intColumn, 2));
+ Contains<Integer> contains3 = contains(eq(intColumn, 3));
+ Contains<Integer> contains4 = contains(eq(intColumn, 4));
+
+ assertEquals(contains1.and(contains2.or(contains3)),
rewrite(and(contains1, or(contains2, contains3))));
+ assertEquals(contains1.and(contains2).or(contains3),
rewrite(or(and(contains1, contains2), contains3)));
+
+ assertEquals(
+ contains1.and(contains2).and(contains2.or(contains3)),
+ rewrite(and(and(contains1, contains2), or(contains2, contains3))));
+ assertEquals(
+ contains1.and(contains2).or(contains3.or(contains4)),
+ rewrite(or(and(contains1, contains2), or(contains3, contains4))));
+ }
+}
diff --git
a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
index f5050cac4..c2e1ef385 100644
---
a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
+++
b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
@@ -20,6 +20,7 @@ package org.apache.parquet.filter2.predicate;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.gt;
@@ -31,6 +32,7 @@ import static
org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.Operators.NotEq;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -91,6 +93,23 @@ public class TestFilterApiMethods {
assertEquals(ColumnPath.get("x", "y", "z"), ((Gt)
gt).getColumn().getColumnPath());
}
+ @Test
+ public void testInvalidContainsCreation() {
+ assertThrows(
+ "Contains predicate does not support null element value",
+ IllegalArgumentException.class,
+ () -> contains(eq(binColumn, null)));
+
+ assertThrows(
+ "Composed Contains predicates must reference the same column name;
found [a.b.c, b.c.d]",
+ IllegalArgumentException.class,
+ () -> ContainsRewriter.rewrite(or(
+ contains(eq(binaryColumn("a.b.c"), Binary.fromString("foo"))),
+ and(
+ contains(eq(binaryColumn("b.c.d"), Binary.fromString("bar"))),
+ contains(eq(binaryColumn("b.c.d"),
Binary.fromString("bar")))))));
+ }
+
@Test
public void testToString() {
FilterPredicate pred = or(predicate, notEq(binColumn,
Binary.fromString("foobarbaz")));
@@ -98,6 +117,15 @@ public class TestFilterApiMethods {
"or(and(not(or(eq(a.b.c, 7), noteq(a.b.c, 17))), gt(x.y.z, 100.0)), "
+ "noteq(a.string.column, Binary{\"foobarbaz\"}))",
pred.toString());
+
+ pred = ContainsRewriter.rewrite(or(
+ contains(eq(binColumn, Binary.fromString("foo"))),
+ and(
+ contains(eq(binColumn, Binary.fromString("bar"))),
+ contains(eq(binColumn, Binary.fromString("baz"))))));
+ assertEquals(
+ "or(contains(eq(a.string.column, Binary{\"foo\"})),
and(contains(eq(a.string.column, Binary{\"bar\"})),
contains(eq(a.string.column, Binary{\"baz\"}))))",
+ pred.toString());
}
@Test
diff --git
a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
index fc23ce7e2..47e9bdd5e 100644
---
a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
+++
b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
@@ -20,6 +20,7 @@ package org.apache.parquet.filter2.predicate;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.gt;
import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
@@ -126,7 +127,7 @@ public class TestSchemaCompatibilityValidator {
}
@Test
- public void testRepeatedNotSupported() {
+ public void testRepeatedNotSupportedForPrimitivePredicates() {
try {
validate(eq(lotsOfLongs, 10l), schema);
fail("this should throw");
@@ -136,4 +137,27 @@ public class TestSchemaCompatibilityValidator {
e.getMessage());
}
}
+
+ @Test
+ public void testRepeatedSupportedForContainsPredicates() {
+ try {
+ validate(contains(eq(lotsOfLongs, 10L)), schema);
+ validate(and(contains(eq(lotsOfLongs, 10L)), contains(eq(lotsOfLongs,
5l))), schema);
+ validate(or(contains(eq(lotsOfLongs, 10L)), contains(eq(lotsOfLongs,
5l))), schema);
+ } catch (IllegalArgumentException e) {
+ fail("Valid repeated column predicates should not throw exceptions");
+ }
+ }
+
+ @Test
+ public void testNonRepeatedNotSupportedForContainsPredicates() {
+ try {
+ validate(contains(eq(longBar, 10L)), schema);
+ fail("Non-repeated field " + longBar + " should fail to validate a
containsEq() predicate");
+ } catch (IllegalArgumentException e) {
+ assertEquals(
+ "FilterPredicate for column x.bar requires a repeated schema, but
found max repetition level 0",
+ e.getMessage());
+ }
+ }
}
diff --git
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 0631300fc..58a899eef 100644
---
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -19,8 +19,10 @@
package org.apache.parquet.internal.column.columnindex;
import static java.util.Arrays.asList;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
@@ -33,6 +35,7 @@ import static
org.apache.parquet.filter2.predicate.FilterApi.lt;
import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
@@ -62,6 +65,7 @@ import java.util.List;
import java.util.Set;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.filter2.predicate.ContainsRewriter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
@@ -236,6 +240,58 @@ public class TestColumnIndexBuilder {
}
}
+ @Test
+ public void testArrayContainsDouble() {
+ PrimitiveType type = Types.required(DOUBLE).named("test_double");
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type,
Integer.MAX_VALUE);
+ assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
+ assertNull(builder.build());
+ DoubleColumn col = doubleColumn("test_col");
+
+ StatsBuilder sb = new StatsBuilder();
+ builder.add(sb.stats(type, -4.2, -4.1));
+ builder.add(sb.stats(type, -11.7, 7.0, null));
+ builder.add(sb.stats(type, 2.2, 2.2, null, null));
+ builder.add(sb.stats(type, null, null, null));
+ builder.add(sb.stats(type, 1.9, 2.32));
+ builder.add(sb.stats(type, -21.0, 8.1));
+ builder.add(sb.stats(type, 10.0, 25.0));
+ assertEquals(7, builder.getPageCount());
+ assertEquals(sb.getMinMaxSize(), builder.getMinMaxSize());
+ ColumnIndex columnIndex = builder.build();
+ assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+ assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0, 0);
+ assertCorrectNullPages(columnIndex, false, false, false, true, false,
false, false);
+ assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null,
2.32, 8.1, 25.0);
+ assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null,
1.9, -21.0, 10.0);
+
+ // Validate that contains(eq()) matches eq() when not combined using or()
and and()
+ assertCorrectFiltering(columnIndex, eq(col, 0.0), 1, 5);
+ assertCorrectFiltering(columnIndex, contains(eq(col, 0.0)), 1, 5);
+
+ assertCorrectFiltering(columnIndex, eq(col, 2.2), 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, contains(eq(col, 2.2)), 1, 2, 4, 5);
+
+ assertCorrectFiltering(columnIndex, eq(col, 25.0), 6);
+ assertCorrectFiltering(columnIndex, contains(eq(col, 25.0)), 6);
+
+ // Should equal intersection of [1, 5] and [1, 2, 4, 5] --> [1, 5]
+ assertCorrectFiltering(
+ columnIndex, ContainsRewriter.rewrite(and(contains(eq(col, 0.0)),
contains(eq(col, 2.2)))), 1, 5);
+
+ // Should equal intersection of [6] and [1, 5] --> []
+ assertCorrectFiltering(
+ columnIndex, ContainsRewriter.rewrite(and(contains(eq(col, 25.0)),
contains(eq(col, 0.0)))));
+
+ // Should equal union of [6] and [1, 5] --> [1, 5, 6]
+ assertCorrectFiltering(
+ columnIndex, ContainsRewriter.rewrite(or(contains(eq(col, 25.0)),
contains(eq(col, 0.0)))), 1, 5, 6);
+
+ // Should equal de-duplicated union of [1, 5] and [1, 2, 4, 5] --> [1, 2,
4, 5]
+ assertCorrectFiltering(
+ columnIndex, ContainsRewriter.rewrite(or(contains(eq(col, 0.0)),
contains(eq(col, 2.2)))), 1, 2, 4, 5);
+ }
+
@Test
public void testBuildBinaryDecimal() {
PrimitiveType type =
@@ -286,6 +342,7 @@ public class TestColumnIndexBuilder {
set1.add(Binary.fromString("0.0"));
assertCorrectFiltering(columnIndex, in(col, set1), 1, 4);
assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 1, 2, 3, 4, 5, 6,
7);
+
set1.add(null);
assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5, 6);
assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 1, 2, 3, 4, 5, 6,
7);
diff --git
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
index 8075b4165..cfbfeed1d 100644
---
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
+++
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
@@ -50,6 +50,97 @@ public class TestIndexIterator {
assertEquals(IndexIterator.rangeTranslate(11, 18, i -> i - 10), 1, 2, 3,
4, 5, 6, 7, 8);
}
+ @Test
+ public void testUnion() {
+ // Test deduplication of intersecting ranges
+ assertEquals(
+ IndexIterator.union(
+ IndexIterator.rangeTranslate(0, 7, i -> i),
IndexIterator.rangeTranslate(4, 10, i -> i)),
+ 0,
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6,
+ 7,
+ 8,
+ 9,
+ 10);
+
+ // Test inversion of LHS and RHS
+ assertEquals(
+ IndexIterator.union(
+ IndexIterator.rangeTranslate(4, 10, i -> i),
IndexIterator.rangeTranslate(0, 7, i -> i)),
+ 0,
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6,
+ 7,
+ 8,
+ 9,
+ 10);
+
+ // Test non-intersecting ranges
+ assertEquals(
+ IndexIterator.union(
+ IndexIterator.rangeTranslate(2, 5, i -> i),
IndexIterator.rangeTranslate(8, 10, i -> i)),
+ 2,
+ 3,
+ 4,
+ 5,
+ 8,
+ 9,
+ 10);
+ }
+
+ @Test
+ public void testIntersection() {
+ // Case 1: some overlap between LHS and RHS
+ // LHS: [0, 1, 2, 3, 4, 5, 6, 7], RHS: [4, 5, 6, 7, 8, 9, 10]
+ assertEquals(
+ IndexIterator.intersection(
+ IndexIterator.rangeTranslate(0, 7, i -> i),
IndexIterator.rangeTranslate(4, 10, i -> i)),
+ 4,
+ 5,
+ 6,
+ 7);
+
+ // Test inversion of LHS and RHS
+ assertEquals(
+ IndexIterator.intersection(
+ IndexIterator.rangeTranslate(4, 10, i -> i),
IndexIterator.rangeTranslate(0, 7, i -> i)),
+ 4,
+ 5,
+ 6,
+ 7);
+
+ // Case 2: Single point of overlap at end of iterator
+ // LHS: [1, 3, 5, 7], RHS: [0, 2, 4, 6, 7]
+ assertEquals(
+ IndexIterator.intersection(
+ IndexIterator.filter(8, i -> i % 2 == 1), IndexIterator.filter(8,
i -> i % 2 == 0 || i == 7)),
+ 7);
+
+ // Test inversion of LHS and RHS
+ assertEquals(
+ IndexIterator.intersection(
+ IndexIterator.filter(8, i -> i % 2 == 0 || i == 7),
IndexIterator.filter(8, i -> i % 2 == 1)),
+ 7);
+
+ // Test no intersection between ranges
+ // LHS: [2, 3, 4, 5], RHS: [8, 9, 10]
+ assertEquals(IndexIterator.intersection(
+ IndexIterator.rangeTranslate(2, 5, i -> i),
IndexIterator.rangeTranslate(8, 10, i -> i)));
+
+ // Test inversion of LHS and RHS
+ assertEquals(IndexIterator.intersection(
+ IndexIterator.rangeTranslate(8, 10, i -> i),
IndexIterator.rangeTranslate(2, 5, i -> i)));
+ }
+
static void assertEquals(PrimitiveIterator.OfInt actualIt, int...
expectedValues) {
IntList actualList = new IntArrayList();
actualIt.forEachRemaining((int value) -> actualList.add(value));
diff --git
a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
index 5fb441521..1574ce247 100644
---
a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
+++
b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.gt;
@@ -47,6 +48,7 @@ import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Types.optional;
+import static org.apache.parquet.schema.Types.repeated;
import static org.junit.Assert.assertArrayEquals;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@@ -292,6 +294,26 @@ public class TestColumnIndexFilter {
.build();
private static final OffsetIndex COLUMN5_OI =
new OIBuilder().addPage(1).addPage(29).build();
+
+ private static final ColumnIndex COLUMN6_CI = new
CIBuilder(repeated(INT32).named("column6"), ASCENDING)
+ .addPage(0, 1, 1)
+ .addPage(1, 2, 6)
+ .addPage(0, 7, 7)
+ .addPage(1, 7, 10)
+ .addPage(0, 11, 17)
+ .addPage(0, 18, 23)
+ .addPage(0, 24, 26)
+ .build();
+ private static final OffsetIndex COLUMN6_OI = new OIBuilder()
+ .addPage(1)
+ .addPage(6)
+ .addPage(2)
+ .addPage(5)
+ .addPage(7)
+ .addPage(6)
+ .addPage(3)
+ .build();
+
private static final ColumnIndexStore STORE = new ColumnIndexStore() {
@Override
public ColumnIndex getColumnIndex(ColumnPath column) {
@@ -306,6 +328,8 @@ public class TestColumnIndexFilter {
return COLUMN4_CI;
case "column5":
return COLUMN5_CI;
+ case "column6":
+ return COLUMN6_CI;
default:
return null;
}
@@ -324,6 +348,8 @@ public class TestColumnIndexFilter {
return COLUMN4_OI;
case "column5":
return COLUMN5_OI;
+ case "column6":
+ return COLUMN6_OI;
default:
throw new MissingOffsetIndexException(column);
}
@@ -354,7 +380,7 @@ public class TestColumnIndexFilter {
@Test
public void testFiltering() {
- Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4");
+ Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4",
"column6");
assertAllRows(
calculateRowRanges(
@@ -364,6 +390,48 @@ public class TestColumnIndexFilter {
TOTAL_ROW_COUNT),
TOTAL_ROW_COUNT);
+ assertRows(
+ calculateRowRanges(
+ FilterCompat.get(contains(eq(intColumn("column6"), 7))), STORE,
paths, TOTAL_ROW_COUNT),
+ 7,
+ 8,
+ 9,
+ 10,
+ 11,
+ 12,
+ 13);
+ assertRows(
+ calculateRowRanges(
+ FilterCompat.get(
+ and(contains(eq(intColumn("column6"), 7)),
contains(eq(intColumn("column6"), 10)))),
+ STORE,
+ paths,
+ TOTAL_ROW_COUNT),
+ 9,
+ 10,
+ 11,
+ 12,
+ 13);
+ assertRows(
+ calculateRowRanges(
+ FilterCompat.get(
+ or(contains(eq(intColumn("column6"), 7)),
contains(eq(intColumn("column6"), 20)))),
+ STORE,
+ paths,
+ TOTAL_ROW_COUNT),
+ 7,
+ 8,
+ 9,
+ 10,
+ 11,
+ 12,
+ 13,
+ 21,
+ 22,
+ 23,
+ 24,
+ 25,
+ 26);
Set<Integer> set1 = new HashSet<>();
set1.add(7);
assertRows(
diff --git
a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
index 662be8c41..1a2f5e54e 100644
---
a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
+++
b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
@@ -70,6 +70,9 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
+ "import java.util.Set;\n"
+ "\n"
+ "import org.apache.parquet.hadoop.metadata.ColumnPath;\n"
+ + "import org.apache.parquet.filter2.predicate.FilterPredicate;\n"
+ + "import org.apache.parquet.filter2.predicate.Operators;\n"
+ + "import org.apache.parquet.filter2.predicate.Operators.Contains;\n"
+ "import org.apache.parquet.filter2.predicate.Operators.Eq;\n"
+ "import org.apache.parquet.filter2.predicate.Operators.Gt;\n"
+ "import org.apache.parquet.filter2.predicate.Operators.GtEq;\n"
@@ -100,13 +103,13 @@ public class IncrementallyUpdatedFilterPredicateGenerator
{
addVisitBegin("Eq");
for (TypeInfo info : TYPES) {
- addEqNotEqCase(info, true);
+ addEqNotEqCase(info, true, false);
}
addVisitEnd();
addVisitBegin("NotEq");
for (TypeInfo info : TYPES) {
- addEqNotEqCase(info, false);
+ addEqNotEqCase(info, false, false);
}
addVisitEnd();
@@ -122,6 +125,12 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
}
addVisitEnd();
+ addContainsBegin();
+ addVisitBegin("Contains");
+ addContainsCase();
+ addContainsEnd();
+ addVisitEnd();
+
addVisitBegin("Lt");
for (TypeInfo info : TYPES) {
addInequalityCase(info, "<");
@@ -186,21 +195,27 @@ public class IncrementallyUpdatedFilterPredicateGenerator
{
+ " }\n\n");
}
- private void addEqNotEqCase(TypeInfo info, boolean isEq) throws IOException {
- add(" if (clazz.equals(" + info.className + ".class)) {\n" + " if
(pred.getValue() == null) {\n"
- + " valueInspector = new ValueInspector() {\n"
- + " @Override\n"
- + " public void updateNull() {\n"
- + " setResult("
- + isEq + ");\n" + " }\n"
- + "\n"
- + " @Override\n"
- + " public void update("
- + info.primitiveName + " value) {\n" + " setResult("
- + !isEq + ");\n" + " }\n"
- + " };\n"
- + " } else {\n"
- + " final "
+ private void addEqNotEqCase(TypeInfo info, boolean isEq, boolean
expectMultipleResults) throws IOException {
+ add(" if (clazz.equals(" + info.className + ".class)) {\n");
+
+ // Predicates for repeated fields don't need to support null values
+ if (!expectMultipleResults) {
+ add(" if (pred.getValue() == null) {\n"
+ + " valueInspector = new ValueInspector() {\n"
+ + " @Override\n"
+ + " public void updateNull() {\n"
+ + " setResult("
+ + isEq + ");\n" + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public void update("
+ + info.primitiveName + " value) {\n" + " setResult("
+ + !isEq + ");\n" + " }\n"
+ + " };\n"
+ + " } else {\n");
+ }
+
+ add(" final "
+ info.primitiveName + " target = (" + info.className + ") (Object)
pred.getValue();\n"
+ " final PrimitiveComparator<"
+ info.className + "> comparator = getComparator(columnPath);\n" + "\n"
@@ -214,9 +229,20 @@ public class IncrementallyUpdatedFilterPredicateGenerator {
+ " public void update("
+ info.primitiveName + " value) {\n");
- add(" setResult(" + compareEquality("value", "target", isEq) +
");\n");
+ if (!expectMultipleResults) {
+ add(" setResult(" + compareEquality("value", "target", isEq)
+ ");\n");
+ } else {
+ add(" if (!isKnown() && " + compareEquality("value",
"target", isEq)
+ + ") { setResult(true); }\n");
+ }
+
+ add(" }\n };\n");
- add(" }\n" + " };\n" + " }\n" + " }\n\n");
+ if (!expectMultipleResults) {
+ add(" }\n");
+ }
+
+ add(" }\n\n");
}
private void addInequalityCase(TypeInfo info, String op) throws IOException {
@@ -304,6 +330,196 @@ public class IncrementallyUpdatedFilterPredicateGenerator
{
+ "\n");
}
+ private void addContainsUpdateCase(TypeInfo info, String... inspectors)
throws IOException {
+ add(" @Override\n" + " public void update(" + info.primitiveName + "
value) {\n");
+ for (String inspector : inspectors) {
+ add(" " + inspector + ".update(value);\n");
+ }
+ add(" checkSatisfied();\n" + " }\n");
+ }
+
+ private void addContainsInspectorVisitor(String op, boolean isSupported)
throws IOException {
+ if (isSupported) {
+ add(" @Override\n"
+ + " public <T extends Comparable<T>> ValueInspector visit(" + op
+ "<T> pred) {\n"
+ + " ColumnPath columnPath = pred.getColumn().getColumnPath();\n"
+ + " Class<T> clazz = pred.getColumn().getColumnType();\n"
+ + " ValueInspector valueInspector = null;\n");
+
+ for (TypeInfo info : TYPES) {
+ switch (op) {
+ case "Eq":
+ addEqNotEqCase(info, true, true);
+ break;
+ default:
+ throw new UnsupportedOperationException("Op " + op + " not
implemented for Contains filter");
+ }
+ }
+
+ add(" return valueInspector;" + " }\n");
+ } else {
+ add(" @Override\n"
+ + " public <T extends Comparable<T>> ValueInspector visit(" + op
+ "<T> pred) {\n"
+ + " throw new UnsupportedOperationException(\"" + op
+ + " not supported for Contains predicate\");\n"
+ + " }\n"
+ + "\n");
+ }
+ }
+
+ private void addContainsBegin() throws IOException {
+ add(" private static class ContainsPredicate extends ValueInspector {\n"
+ + " private final ValueInspector inspector;\n"
+ + "\n"
+ + " private ContainsPredicate(ValueInspector inspector) {\n"
+ + " this.inspector = inspector;\n"
+ + " }\n"
+ + "\n"
+ + " private void checkSatisfied() {\n"
+ + " if (!isKnown() && inspector.isKnown() &&
inspector.getResult()) {\n"
+ + " setResult(true);\n"
+ + " }\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public void updateNull() {\n"
+ + " setResult(false);\n"
+ + " }\n");
+
+ for (TypeInfo info : TYPES) {
+ addContainsUpdateCase(info, "inspector");
+ }
+
+ add(" @Override\n" + " public void reset() {\n"
+ + " super.reset();\n"
+ + " inspector.reset();\n"
+ + " }\n"
+ + " }\n");
+
+ add(" private static class ContainsAndPredicate extends ValueInspector
{\n"
+ + " private final ValueInspector left;\n"
+ + " private final ValueInspector right;\n"
+ + "\n"
+ + " private ContainsAndPredicate(ValueInspector left,
ValueInspector right) {\n"
+ + " this.left = left;\n"
+ + " this.right = right;\n"
+ + " }\n"
+ + "\n"
+ + " private void checkSatisfied() {\n"
+ + " if (isKnown()) { return; }\n"
+ + " if (left.isKnown() && right.isKnown() && left.getResult() &&
right.getResult()) {\n"
+ + " setResult(true);\n"
+ + " }\n"
+ + " }\n"
+ + " \n"
+ + " @Override\n"
+ + " public void updateNull() {\n"
+ + " setResult(false);\n"
+ + " }\n\n");
+
+ for (TypeInfo info : TYPES) {
+ addContainsUpdateCase(info, "left", "right");
+ }
+
+ add(" @Override\n"
+ + " public void reset() {\n"
+ + " super.reset();\n"
+ + " left.reset();\n"
+ + " right.reset();\n"
+ + " }\n"
+ + " }\n");
+
+ add(" private static class ContainsOrPredicate extends ValueInspector {\n"
+ + " private final ValueInspector left;\n"
+ + " private final ValueInspector right;\n"
+ + "\n"
+ + " private ContainsOrPredicate(ValueInspector left, ValueInspector
right) {\n"
+ + " this.left = left;\n"
+ + " this.right = right;\n"
+ + " }\n"
+ + "\n"
+ + " private void checkSatisfied() {\n"
+ + " if (isKnown()) { return; }\n"
+ + " if (left.isKnown() && left.getResult()) {\n"
+ + " setResult(true);\n"
+ + " return;\n"
+ + " }\n"
+ + " if (right.isKnown() && right.getResult()) {\n"
+ + " setResult(true);\n"
+ + " }\n"
+ + " }\n"
+ + " \n"
+ + " @Override\n"
+ + " public void updateNull() {\n"
+ + " setResult(false);\n"
+ + " }\n");
+
+ for (TypeInfo info : TYPES) {
+ addContainsUpdateCase(info, "left", "right");
+ }
+
+ add(" @Override\n"
+ + " public void reset() {\n"
+ + " super.reset();\n"
+ + " left.reset();\n"
+ + " right.reset();\n"
+ + " }\n"
+ + " }\n");
+
+ add(" private class ContainsInspectorVisitor implements
FilterPredicate.Visitor<ValueInspector> {\n\n"
+ + " @Override\n"
+ + " public <T extends Comparable<T>> ValueInspector
visit(Contains<T> contains) {\n"
+ + " return contains.filter(\n"
+ + " this,\n"
+ + " (l, r) -> new ContainsAndPredicate(l, r),\n"
+ + " (l, r) -> new ContainsOrPredicate(l, r)\n"
+ + " );\n"
+ + " }\n");
+
+ addContainsInspectorVisitor("Eq", true);
+ addContainsInspectorVisitor("NotEq", false);
+ addContainsInspectorVisitor("Lt", false);
+ addContainsInspectorVisitor("LtEq", false);
+ addContainsInspectorVisitor("Gt", false);
+ addContainsInspectorVisitor("GtEq", false);
+
+ add(" @Override\n"
+ + " public ValueInspector visit(Operators.And pred) {\n"
+ + " throw new UnsupportedOperationException(\"Operators.And not
supported for Contains predicate\");\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public ValueInspector visit(Operators.Or pred) {\n"
+ + " throw new UnsupportedOperationException(\"Operators.Or not
supported for Contains predicate\");\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public ValueInspector visit(Operators.Not pred) {\n"
+ + " throw new UnsupportedOperationException(\"Operators.Not not
supported for Contains predicate\");\n"
+ + " }"
+ + " @Override\n"
+ + " public <T extends Comparable<T>, U extends
UserDefinedPredicate<T>> ValueInspector visit(\n"
+ + " UserDefined<T, U> pred) {\n"
+ + " throw new
UnsupportedOperationException(\"UserDefinedPredicate not supported for Contains
predicate\");\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public <T extends Comparable<T>, U extends
UserDefinedPredicate<T>> ValueInspector visit(\n"
+ + " LogicalNotUserDefined<T, U> pred) {\n"
+ + " throw new
UnsupportedOperationException(\"LogicalNotUserDefined not supported for
Contains predicate\");\n"
+ + " }\n"
+ + " }\n"
+ + "\n");
+ }
+
+ private void addContainsCase() throws IOException {
+ add(" valueInspector = new ContainsPredicate(new
ContainsInspectorVisitor().visit(pred));\n");
+ }
+
+ private void addContainsEnd() {
+ // No-op
+ }
+
private void addUdpCase(TypeInfo info, boolean invert) throws IOException {
add(" if (clazz.equals(" + info.className + ".class)) {\n"
+ " valueInspector = new ValueInspector() {\n"
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
index f1ed6ea8c..16348e535 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
@@ -119,6 +119,11 @@ public class BloomFilterImpl implements
FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Contains<T>
contains) {
+ return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r);
+ }
+
@Override
public <T extends Comparable<T>> Boolean visit(Operators.In<T> in) {
Set<T> values = in.getValues();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index 665e39e9f..dbb38047e 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -36,6 +36,7 @@ import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -487,6 +488,11 @@ public class DictionaryFilter implements
FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Contains<T> contains) {
+ return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r);
+ }
+
@Override
public Boolean visit(And and) {
return and.getLeft().accept(this) || and.getRight().accept(this);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
index 76f9078ca..deb4706d5 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -28,6 +28,7 @@ import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -211,6 +212,11 @@ public class StatisticsFilter implements
FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Contains<T> contains) {
+ return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r);
+ }
+
@Override
@SuppressWarnings("unchecked")
public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index 525b603bb..5b9e638d6 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -24,6 +24,7 @@ import static
org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_
import static
org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
@@ -69,6 +70,7 @@ import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
@@ -112,6 +114,7 @@ public class DictionaryFilterTest {
+ "required int32 plain_int32_field; "
+ "required binary fallback_binary_field; "
+ "required int96 int96_field; "
+ + "repeated binary repeated_binary_field;"
+ "} ");
private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
@@ -177,7 +180,16 @@ public class DictionaryFilterTest {
i < (nElements / 2)
? ALPHABET.substring(index, index + 1)
: UUID.randomUUID().toString())
- .append("int96_field", INT96_VALUES[i % INT96_VALUES.length]);
+ .append("int96_field", INT96_VALUES[i % INT96_VALUES.length])
+ .append("repeated_binary_field", ALPHABET.substring(index, index +
1));
+
+ if (index + 1 < 26) {
+ group = group.append("repeated_binary_field", ALPHABET.substring(index
+ 1, index + 2));
+ }
+
+ if (index + 2 < 26) {
+ group = group.append("repeated_binary_field", ALPHABET.substring(index
+ 2, index + 3));
+ }
// 10% of the time, leave the field null
if (index % 10 > 0) {
@@ -282,7 +294,8 @@ public class DictionaryFilterTest {
"int64_field",
"double_field",
"float_field",
- "int96_field"));
+ "int96_field",
+ "repeated_binary_field"));
for (ColumnChunkMetaData column : ccmd) {
String name = column.getPath().toDotString();
if (dictionaryEncodedColumns.contains(name)) {
@@ -319,7 +332,8 @@ public class DictionaryFilterTest {
"int64_field",
"double_field",
"float_field",
- "int96_field"));
+ "int96_field",
+ "repeated_binary_field"));
for (ColumnChunkMetaData column : ccmd) {
EncodingStats encStats = column.getEncodingStats();
String name = column.getPath().toDotString();
@@ -814,6 +828,42 @@ public class DictionaryFilterTest {
canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake,
nullRejecting))), ccmd, dictionaries));
}
+ @Test
+ public void testContainsAnd() throws Exception {
+ BinaryColumn col = binaryColumn("binary_field");
+
+ // both evaluate to false (no upper-case letters are in the dictionary)
+ Operators.Contains<Binary> B = contains(eq(col, Binary.fromString("B")));
+ Operators.Contains<Binary> C = contains(eq(col, Binary.fromString("C")));
+
+ // both evaluate to true (all lower-case letters are in the dictionary)
+ Operators.Contains<Binary> x = contains(eq(col, Binary.fromString("x")));
+ Operators.Contains<Binary> y = contains(eq(col, Binary.fromString("y")));
+
+ assertTrue("Should drop when either predicate must be false",
canDrop(and(B, y), ccmd, dictionaries));
+ assertTrue("Should drop when either predicate must be false",
canDrop(and(x, C), ccmd, dictionaries));
+ assertTrue("Should drop when either predicate must be false",
canDrop(and(B, C), ccmd, dictionaries));
+ assertFalse("Should not drop when either predicate could be true",
canDrop(and(x, y), ccmd, dictionaries));
+ }
+
+ @Test
+ public void testContainsOr() throws Exception {
+ BinaryColumn col = binaryColumn("binary_field");
+
+ // both evaluate to false (no upper-case letters are in the dictionary)
+ Operators.Contains<Binary> B = contains(eq(col, Binary.fromString("B")));
+ Operators.Contains<Binary> C = contains(eq(col, Binary.fromString("C")));
+
+ // both evaluate to true (all lower-case letters are in the dictionary)
+ Operators.Contains<Binary> x = contains(eq(col, Binary.fromString("x")));
+ Operators.Contains<Binary> y = contains(eq(col, Binary.fromString("y")));
+
+ assertFalse("Should not drop when one predicate could be true",
canDrop(or(B, y), ccmd, dictionaries));
+ assertFalse("Should not drop when one predicate could be true",
canDrop(or(x, C), ccmd, dictionaries));
+ assertTrue("Should drop when both predicates must be false", canDrop(or(B,
C), ccmd, dictionaries));
+ assertFalse("Should not drop when one predicate could be true",
canDrop(or(x, y), ccmd, dictionaries));
+ }
+
private static final class InInt32UDP extends UserDefinedPredicate<Integer>
implements Serializable {
private final Set<Integer> ints;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index d4a77879e..a0ecfa377 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.ByteBufferAllocator;
@@ -54,6 +56,12 @@ public class PhoneBookWriter {
+ " optional binary kind (UTF8);\n"
+ " }\n"
+ " }\n"
+ + " optional group accounts (MAP) {\n"
+ + " repeated group key_value {\n"
+ + " required binary key;\n"
+ + " required double value;\n"
+ + " }\n"
+ + " }\n"
+ "}\n";
private static final MessageType schema = getSchema();
@@ -154,11 +162,19 @@ public class PhoneBookWriter {
private final List<PhoneNumber> phoneNumbers;
private final Location location;
+ private final Map<String, Double> accounts;
+
public User(long id, String name, List<PhoneNumber> phoneNumbers, Location
location) {
+ this(id, name, phoneNumbers, location, null);
+ }
+
+ public User(
+ long id, String name, List<PhoneNumber> phoneNumbers, Location
location, Map<String, Double> accounts) {
this.id = id;
this.name = name;
this.phoneNumbers = phoneNumbers;
this.location = location;
+ this.accounts = accounts;
}
public long getId() {
@@ -177,6 +193,10 @@ public class PhoneBookWriter {
return location;
}
+ public Map<String, Double> getAccounts() {
+ return accounts;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -189,6 +209,7 @@ public class PhoneBookWriter {
if (name != null ? !name.equals(user.name) : user.name != null) return
false;
if (phoneNumbers != null ? !phoneNumbers.equals(user.phoneNumbers) :
user.phoneNumbers != null)
return false;
+ if (accounts != null ? !accounts.equals(user.accounts) : user.accounts
!= null) return false;
return true;
}
@@ -199,17 +220,18 @@ public class PhoneBookWriter {
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (phoneNumbers != null ? phoneNumbers.hashCode() :
0);
result = 31 * result + (location != null ? location.hashCode() : 0);
+ result = 31 * result + (accounts != null ? accounts.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "User [id=" + id + ", name=" + name + ", phoneNumbers=" +
phoneNumbers + ", location=" + location
- + "]";
+ + ", accounts=" + accounts + "]";
}
public User cloneWithName(String name) {
- return new User(id, name, phoneNumbers, location);
+ return new User(id, name, phoneNumbers, location, accounts);
}
}
@@ -241,6 +263,16 @@ public class PhoneBookWriter {
location.append("lat", user.getLocation().getLat());
}
}
+
+ if (user.getAccounts() != null) {
+ Group accounts = root.addGroup("accounts");
+ for (Map.Entry<String, Double> account : user.getAccounts().entrySet()) {
+ Group kv = accounts.addGroup("key_value");
+ kv.append("key", account.getKey());
+ kv.append("value", account.getValue());
+ }
+ }
+
return root;
}
@@ -249,7 +281,8 @@ public class PhoneBookWriter {
getLong(root, "id"),
getString(root, "name"),
getPhoneNumbers(getGroup(root, "phoneNumbers")),
- getLocation(getGroup(root, "location")));
+ getLocation(getGroup(root, "location")),
+ getAccounts(getGroup(root, "accounts")));
}
private static List<PhoneNumber> getPhoneNumbers(Group phoneNumbers) {
@@ -271,6 +304,19 @@ public class PhoneBookWriter {
return new Location(getDouble(location, "lon"), getDouble(location,
"lat"));
}
+ private static Map<String, Double> getAccounts(Group accounts) {
+ if (accounts == null) {
+ return null;
+ }
+ Map<String, Double> map = new HashMap<>();
+ for (int i = 0, n = accounts.getFieldRepetitionCount("key_value"); i < n;
++i) {
+ Group kv = accounts.getGroup("key_value", i);
+
+ map.put(getString(kv, "key"), getDouble(kv, "value"));
+ }
+ return map;
+ }
+
private static boolean isNull(Group group, String field) {
// Use null value if the field is not in the group schema
if (!group.getType().containsField(field)) {
@@ -336,6 +382,10 @@ public class PhoneBookWriter {
.withConf(conf)
.withFilter(filter)
.withAllocator(allocator)
+ .useBloomFilter(false)
+ .useDictionaryFilter(false)
+ .useStatsFilter(false)
+ .useColumnIndexFilter(false)
.build();
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
index 0e81917c3..dedec409c 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java
@@ -20,6 +20,7 @@ package org.apache.parquet.filter2.recordlevel;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.gt;
@@ -31,6 +32,7 @@ import static
org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.junit.Assert.assertEquals;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -59,21 +61,40 @@ public class TestRecordLevelFilters {
public static List<User> makeUsers() {
List<User> users = new ArrayList<User>();
- users.add(new User(17, null, null, null));
+ users.add(new User(
+ 17,
+ null,
+ null,
+ null,
+ ImmutableMap.of(
+ "business", 1000.0D,
+ "personal", 500.0D)));
users.add(new User(18, "bob", null, null));
- users.add(new User(19, "alice", new ArrayList<PhoneNumber>(), null));
+ users.add(new User(
+ 19,
+ "alice",
+ new ArrayList<PhoneNumber>(),
+ null,
+ ImmutableMap.of(
+ "business", 2000.0D,
+ "retirement", 1000.0D)));
users.add(new User(20, "thing1", Arrays.asList(new
PhoneNumber(5555555555L, null)), null));
- users.add(new User(27, "thing2", Arrays.asList(new
PhoneNumber(1111111111L, "home")), null));
+ users.add(new User(
+ 27,
+ "thing2",
+ Arrays.asList(new PhoneNumber(1111111111L, "home"), new
PhoneNumber(2222222222L, "cell")),
+ null));
users.add(new User(
28,
"popular",
Arrays.asList(
new PhoneNumber(1111111111L, "home"),
+ new PhoneNumber(1111111111L, "apartment"),
new PhoneNumber(2222222222L, null),
new PhoneNumber(3333333333L, "mobile")),
null));
@@ -127,6 +148,15 @@ public class TestRecordLevelFilters {
}
}
+ private static void assertPredicate(FilterPredicate predicate, long...
expectedIds) throws IOException {
+ List<Group> found = PhoneBookWriter.readFile(phonebookFile,
FilterCompat.get(predicate));
+
+ assertEquals(expectedIds.length, found.size());
+ for (int i = 0; i < expectedIds.length; i++) {
+ assertEquals(expectedIds[i], found.get(i).getLong("id", 0));
+ }
+ }
+
@Test
public void testNoFilter() throws Exception {
List<Group> found = PhoneBookWriter.readFile(phonebookFile,
FilterCompat.NOOP);
@@ -181,6 +211,77 @@ public class TestRecordLevelFilters {
assert (found.size() == 102);
}
+ @Test
+ public void testArrayContains() throws Exception {
+ assertPredicate(
+ contains(eq(binaryColumn("phoneNumbers.phone.kind"),
Binary.fromString("home"))), 27L, 28L, 30L);
+ }
+
+ @Test
+ public void testArrayContainsSimpleAndFilter() throws Exception {
+ assertPredicate(
+ and(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 1111111111L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"),
3333333333L))),
+ 28L);
+
+ assertPredicate(
+ and(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 1111111111L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"), -123L))) //
Won't match
+ );
+ }
+
+ @Test
+ public void testArrayContainsNestedAndFilter() throws Exception {
+ assertPredicate(
+ and(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 1111111111L)),
+ and(
+ contains(eq(longColumn("phoneNumbers.phone.number"),
2222222222L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"),
3333333333L)))),
+ 28L);
+ }
+
+ @Test
+ public void testArrayContainsSimpleOrFilter() throws Exception {
+ assertPredicate(
+ or(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 5555555555L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"),
2222222222L))),
+ 20L,
+ 27L,
+ 28L);
+
+ assertPredicate(
+ or(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 5555555555L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"), -123L))), //
Won't match
+ 20L);
+ }
+
+ @Test
+ public void testArrayContainsNestedOrFilter() throws Exception {
+ assertPredicate(
+ or(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 5555555555L)),
+ or(
+ contains(eq(longColumn("phoneNumbers.phone.number"),
-10000000L)), // Won't be matched
+ contains(eq(longColumn("phoneNumbers.phone.number"),
2222222222L)))),
+ 20L,
+ 27L,
+ 28L);
+ }
+
+ @Test
+ public void testMapContains() throws Exception {
+ // Test key predicate
+ assertPredicate(contains(eq(binaryColumn("accounts.key_value.key"),
Binary.fromString("business"))), 17L, 19L);
+
+ // Test value predicate
+ assertPredicate(contains(eq(doubleColumn("accounts.key_value.value"),
1000.0D)), 17L, 19L);
+ }
+
@Test
public void testNameNotNull() throws Exception {
BinaryColumn name = binaryColumn("name");
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
index 1615cf064..15d0a8ab1 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.filter2.statisticslevel;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.gt;
@@ -49,6 +50,7 @@ import org.apache.parquet.column.statistics.DoubleStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.IntColumn;
@@ -381,6 +383,38 @@ public class TestStatisticsFilter {
Arrays.asList(getIntColumnMeta(statsSomeNulls, 177L),
getDoubleColumnMeta(doubleStats, 177L))));
}
+ @Test
+ public void testContainsEqNonNull() {
+ assertTrue(canDrop(contains(eq(intColumn, 9)), columnMetas));
+ assertFalse(canDrop(contains(eq(intColumn, 10)), columnMetas));
+ assertFalse(canDrop(contains(eq(intColumn, 100)), columnMetas));
+ assertTrue(canDrop(contains(eq(intColumn, 101)), columnMetas));
+
+ // drop columns of all nulls when looking for non-null value
+ assertTrue(canDrop(contains(eq(intColumn, 0)), nullColumnMetas));
+ assertFalse(canDrop(contains(eq(intColumn, 50)),
missingMinMaxColumnMetas));
+ }
+
+ @Test
+ public void testContainsAnd() {
+ Operators.Contains<Integer> yes = contains(eq(intColumn, 9));
+ Operators.Contains<Double> no = contains(eq(doubleColumn, 50D));
+ assertTrue(canDrop(and(yes, yes), columnMetas));
+ assertTrue(canDrop(and(yes, no), columnMetas));
+ assertTrue(canDrop(and(no, yes), columnMetas));
+ assertFalse(canDrop(and(no, no), columnMetas));
+ }
+
+ @Test
+ public void testContainsOr() {
+ Operators.Contains<Integer> yes = contains(eq(intColumn, 9));
+ Operators.Contains<Double> no = contains(eq(doubleColumn, 50D));
+ assertTrue(canDrop(or(yes, yes), columnMetas));
+ assertFalse(canDrop(or(yes, no), columnMetas));
+ assertFalse(canDrop(or(no, yes), columnMetas));
+ assertFalse(canDrop(or(no, no), columnMetas));
+ }
+
@Test
public void testAnd() {
FilterPredicate yes = eq(intColumn, 9);
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index 29c5d6f58..651184a0d 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -19,11 +19,14 @@
package org.apache.parquet.hadoop;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.contains;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.in;
import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -40,6 +43,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.Predicate;
@@ -120,7 +124,8 @@ public class TestBloomFiltering {
List<PhoneBookWriter.User> users = new ArrayList<>();
List<String> names = generateNames(rowCount);
for (int i = 0; i < rowCount; ++i) {
- users.add(new PhoneBookWriter.User(i, names.get(i),
generatePhoneNumbers(), generateLocation(i, rowCount)));
+ users.add(
+ new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(i),
generateLocation(i, rowCount)));
}
return users;
}
@@ -171,12 +176,13 @@ public class TestBloomFiltering {
names.add("len");
}
for (int i = 0; i < rowCount; ++i) {
- users.add(new PhoneBookWriter.User(i, names.get(i),
generatePhoneNumbers(), generateLocation(i, rowCount)));
+ users.add(
+ new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(i),
generateLocation(i, rowCount)));
}
return users;
}
- private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers() {
+ private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers(int
index) {
int length = RANDOM.nextInt(5) - 1;
if (length < 0) {
return null;
@@ -184,8 +190,8 @@ public class TestBloomFiltering {
List<PhoneBookWriter.PhoneNumber> phoneNumbers = new ArrayList<>(length);
for (int i = 0; i < length; ++i) {
// 6 digits numbers
- long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
- phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number,
PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+ phoneNumbers.add(
+ new PhoneBookWriter.PhoneNumber(500L % index,
PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
}
return phoneNumbers;
}
@@ -318,12 +324,17 @@ public class TestBloomFiltering {
.withBloomFilterEnabled("name", true)
.withBloomFilterCandidateNumber("name", 10)
.withBloomFilterEnabled("id", true)
- .withBloomFilterCandidateNumber("id", 10);
+ .withBloomFilterCandidateNumber("id", 10)
+ .withDictionaryEncoding("phoneNumbers.phone.number", false)
+ .withBloomFilterEnabled("phoneNumbers.phone.number", true)
+ .withBloomFilterCandidateNumber("phoneNumbers.phone.number", 10);
} else {
writeBuilder
.withBloomFilterNDV("location.lat", 10000L)
.withBloomFilterNDV("name", 10000L)
- .withBloomFilterNDV("id", 10000L);
+ .withBloomFilterNDV("id", 10000L)
+ .withDictionaryEncoding("phoneNumbers.phone.number", false)
+ .withBloomFilterNDV("phoneNumbers.phone.number", 10000L);
}
PhoneBookWriter.write(writeBuilder, DATA);
}
@@ -398,6 +409,38 @@ public class TestBloomFiltering {
eq(doubleColumn("location.lat"), 99.9));
}
+ @Test
+ public void testContainsEqFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> Optional.ofNullable(record.getPhoneNumbers())
+ .map(numbers -> numbers.stream().anyMatch(n -> n.getNumber() ==
250L))
+ .orElse(false),
+ contains(eq(longColumn("phoneNumbers.phone.number"), 250L)));
+ }
+
+ @Test
+ public void testContainsOrFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> Optional.ofNullable(record.getPhoneNumbers())
+ .map(numbers -> numbers.stream().anyMatch(n -> n.getNumber() ==
250L || n.getNumber() == 50L))
+ .orElse(false),
+ or(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 250L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"), 50L))));
+ }
+
+ @Test
+ public void testContainsAndFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> Optional.ofNullable(record.getPhoneNumbers())
+ .map(numbers -> numbers.stream().anyMatch(n -> n.getNumber() ==
10L)
+ && numbers.stream().anyMatch(n -> n.getNumber() == 5L))
+ .orElse(false),
+ and(
+ contains(eq(longColumn("phoneNumbers.phone.number"), 10L)),
+ contains(eq(longColumn("phoneNumbers.phone.number"), 5L))));
+ }
+
@Test
public void checkBloomFilterSize() throws IOException {
FileDecryptionProperties fileDecryptionProperties =
getFileDecryptionProperties();
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
index 51fa90e1c..51f8a7dd6 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
@@ -23,7 +23,9 @@ import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -109,18 +111,23 @@ public class TestParquetWriterError {
location = null;
}
List<PhoneBookWriter.PhoneNumber> phoneNumbers;
+ Map<String, Double> accounts;
if (RANDOM.nextDouble() < .1) {
phoneNumbers = null;
+ accounts = null;
} else {
int n = RANDOM.nextInt(4);
phoneNumbers = new ArrayList<>(n);
+ accounts = new HashMap<>();
for (int i = 0; i < n; ++i) {
String kind = RANDOM.nextDouble() < .1 ? null : "kind" +
RANDOM.nextInt(5);
phoneNumbers.add(new PhoneBookWriter.PhoneNumber(RANDOM.nextInt(),
kind));
+ accounts.put("Account " + i, (double) i);
}
}
String name = RANDOM.nextDouble() < .1 ? null : "name" +
RANDOM.nextLong();
- PhoneBookWriter.User user = new PhoneBookWriter.User(RANDOM.nextLong(),
name, phoneNumbers, location);
+ PhoneBookWriter.User user =
+ new PhoneBookWriter.User(RANDOM.nextLong(), name, phoneNumbers,
location, accounts);
return PhoneBookWriter.groupFromUser(user);
}