ahmedabu98 commented on code in PR #35589: URL: https://github.com/apache/beam/pull/35589#discussion_r2208258308
########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); Review Comment: Do we need to validate that `RexNode e` is indeed a `RexCall`? ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { Review Comment: We need thorough testing for this class ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); + if (COMBINERS.containsKey(kind)) { + return combine((RexCall) e, beamSchema); + } + + switch (kind) { + case IN: + return toInPredicate((RexCall) e, beamSchema, false); + case NOT_IN: + return toInPredicate((RexCall) e, beamSchema, true); + case NOT: + FilterPredicate inner = convert(((RexCall) e).getOperands().get(0), beamSchema); + return (inner == null) ? null : FilterApi.not(inner); + case IS_NULL: + case IS_NOT_NULL: + return toUnaryPredicate((RexCall) e, beamSchema); + default: + if (COMPARISON_KINDS.contains(kind)) { + return toBinaryPredicate((RexCall) e, beamSchema); + } + return null; Review Comment: throw an UnsupportedOperationException? ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); + if (COMBINERS.containsKey(kind)) { + return combine((RexCall) e, beamSchema); + } + + switch (kind) { + case IN: + return toInPredicate((RexCall) e, beamSchema, false); + case NOT_IN: + return toInPredicate((RexCall) e, beamSchema, true); + case NOT: + FilterPredicate inner = convert(((RexCall) e).getOperands().get(0), beamSchema); + return (inner == null) ? null : FilterApi.not(inner); + case IS_NULL: + case IS_NOT_NULL: + return toUnaryPredicate((RexCall) e, beamSchema); + default: + if (COMPARISON_KINDS.contains(kind)) { + return toBinaryPredicate((RexCall) e, beamSchema); + } + return null; + } + } + + @Nullable + private static FilterPredicate combine(RexCall call, Schema beamSchema) { + FilterCombiner combiner = COMBINERS.get(call.getKind()); + if (combiner == null) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode op : call.getOperands()) { + FilterPredicate p = convert(op, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(combiner::combine).orElse(null); + } + + @Nullable + private static FilterPredicate toInPredicate(RexCall call, Schema beamSchema, boolean isNotIn) { + RexInputRef columnRef = (RexInputRef) call.getOperands().get(0); + List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); + SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; + + // CHANGE: Use an explicit loop + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode valueNode : valueNodes) { + FilterPredicate p = + createSingleComparison(comparison, columnRef, (RexLiteral) valueNode, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(isNotIn ? FilterApi::and : FilterApi::or).orElse(null); + } + + @Nullable + private static FilterPredicate toUnaryPredicate(RexCall call, Schema beamSchema) { + RexNode operand = call.getOperands().get(0); + if (!(operand instanceof RexInputRef)) { + return null; + } + RexInputRef columnRef = (RexInputRef) operand; + String columnName = getColumnName(columnRef, beamSchema); + SqlTypeName type = columnRef.getType().getSqlTypeName(); + boolean isNull = call.getKind() == SqlKind.IS_NULL; + switch (type) { + case INTEGER: + return isNull + ? FilterApi.eq(FilterApi.intColumn(columnName), null) + : FilterApi.notEq(FilterApi.intColumn(columnName), null); + case BIGINT: + return isNull + ? FilterApi.eq(FilterApi.longColumn(columnName), null) + : FilterApi.notEq(FilterApi.longColumn(columnName), null); + case FLOAT: + return isNull + ? FilterApi.eq(FilterApi.floatColumn(columnName), null) + : FilterApi.notEq(FilterApi.floatColumn(columnName), null); + case DOUBLE: + return isNull + ? FilterApi.eq(FilterApi.doubleColumn(columnName), null) + : FilterApi.notEq(FilterApi.doubleColumn(columnName), null); + case BOOLEAN: + return isNull + ? FilterApi.eq(FilterApi.booleanColumn(columnName), null) + : FilterApi.notEq(FilterApi.booleanColumn(columnName), null); + case VARCHAR: + case CHAR: + case DECIMAL: + return isNull + ? FilterApi.eq(FilterApi.binaryColumn(columnName), null) + : FilterApi.notEq(FilterApi.binaryColumn(columnName), null); + default: + return null; + } + } + + @Nullable + private static FilterPredicate toBinaryPredicate(RexCall call, Schema beamSchema) { + RexNode left = call.getOperands().get(0); + RexNode right = call.getOperands().get(1); + if (left.getKind() == SqlKind.CAST) { + left = ((RexCall) left).getOperands().get(0); + } + if (right.getKind() == SqlKind.CAST) { + right = ((RexCall) right).getOperands().get(0); + } + if (!(left instanceof RexInputRef) || !(right instanceof RexLiteral)) { + return null; + } Review Comment: Does this indicate an invalid input? should we throw an error here instead? ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java: ########## @@ -340,6 +345,12 @@ public Read withProjection(Schema projectionSchema, Schema encoderSchema) { .build(); } + /** Specifies a filter predicate to use for filtering records. */ + public Read withFilter(ParquetFilter predicate) { Review Comment: How would a Java user make use of this? This sink relies on `ParquetFilter` being an instance of `ParquetFilterImpl`, which is not available to the user. ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); + if (COMBINERS.containsKey(kind)) { + return combine((RexCall) e, beamSchema); + } + + switch (kind) { + case IN: + return toInPredicate((RexCall) e, beamSchema, false); + case NOT_IN: + return toInPredicate((RexCall) e, beamSchema, true); + case NOT: + FilterPredicate inner = convert(((RexCall) e).getOperands().get(0), beamSchema); + return (inner == null) ? null : FilterApi.not(inner); + case IS_NULL: + case IS_NOT_NULL: + return toUnaryPredicate((RexCall) e, beamSchema); + default: + if (COMPARISON_KINDS.contains(kind)) { + return toBinaryPredicate((RexCall) e, beamSchema); + } + return null; + } + } + + @Nullable + private static FilterPredicate combine(RexCall call, Schema beamSchema) { + FilterCombiner combiner = COMBINERS.get(call.getKind()); + if (combiner == null) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode op : call.getOperands()) { + FilterPredicate p = convert(op, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(combiner::combine).orElse(null); + } + + @Nullable + private static FilterPredicate toInPredicate(RexCall call, Schema beamSchema, boolean isNotIn) { + RexInputRef columnRef = (RexInputRef) call.getOperands().get(0); + List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); + SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; + + // CHANGE: Use an explicit loop + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode valueNode : valueNodes) { + FilterPredicate p = + createSingleComparison(comparison, columnRef, (RexLiteral) valueNode, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(isNotIn ? FilterApi::and : FilterApi::or).orElse(null); + } + + @Nullable + private static FilterPredicate toUnaryPredicate(RexCall call, Schema beamSchema) { + RexNode operand = call.getOperands().get(0); + if (!(operand instanceof RexInputRef)) { + return null; + } + RexInputRef columnRef = (RexInputRef) operand; + String columnName = getColumnName(columnRef, beamSchema); + SqlTypeName type = columnRef.getType().getSqlTypeName(); + boolean isNull = call.getKind() == SqlKind.IS_NULL; + switch (type) { + case INTEGER: + return isNull + ? FilterApi.eq(FilterApi.intColumn(columnName), null) + : FilterApi.notEq(FilterApi.intColumn(columnName), null); + case BIGINT: + return isNull + ? FilterApi.eq(FilterApi.longColumn(columnName), null) + : FilterApi.notEq(FilterApi.longColumn(columnName), null); + case FLOAT: + return isNull + ? FilterApi.eq(FilterApi.floatColumn(columnName), null) + : FilterApi.notEq(FilterApi.floatColumn(columnName), null); + case DOUBLE: + return isNull + ? FilterApi.eq(FilterApi.doubleColumn(columnName), null) + : FilterApi.notEq(FilterApi.doubleColumn(columnName), null); + case BOOLEAN: + return isNull + ? FilterApi.eq(FilterApi.booleanColumn(columnName), null) + : FilterApi.notEq(FilterApi.booleanColumn(columnName), null); + case VARCHAR: + case CHAR: + case DECIMAL: + return isNull + ? FilterApi.eq(FilterApi.binaryColumn(columnName), null) + : FilterApi.notEq(FilterApi.binaryColumn(columnName), null); + default: + return null; + } + } + + @Nullable + private static FilterPredicate toBinaryPredicate(RexCall call, Schema beamSchema) { + RexNode left = call.getOperands().get(0); + RexNode right = call.getOperands().get(1); + if (left.getKind() == SqlKind.CAST) { + left = ((RexCall) left).getOperands().get(0); + } + if (right.getKind() == SqlKind.CAST) { + right = ((RexCall) right).getOperands().get(0); + } + if (!(left instanceof RexInputRef) || !(right instanceof RexLiteral)) { + return null; + } + return createSingleComparison( + call.getKind(), (RexInputRef) left, (RexLiteral) right, beamSchema); + } + + @Nullable + private static FilterPredicate createSingleComparison( + SqlKind kind, RexInputRef columnRef, RexLiteral literal, Schema beamSchema) { + + String columnName = getColumnName(columnRef, beamSchema); + SqlTypeName columnType = columnRef.getType().getSqlTypeName(); + + Comparable<?> value = literal.getValueAs(Comparable.class); + if (value == null) { + return null; + } + + switch (columnType) { + case TINYINT: + case SMALLINT: + case INTEGER: + return createIntPredicate(kind, columnName, ((Number) value).intValue()); + case BIGINT: + return createLongPredicate(kind, columnName, ((Number) value).longValue()); + case FLOAT: + return createFloatPredicate(kind, columnName, ((Number) value).floatValue()); + case DOUBLE: + return createDoublePredicate(kind, columnName, ((Number) value).doubleValue()); + case BOOLEAN: + return createBooleanPredicate(kind, columnName, (Boolean) value); + case CHAR: + case VARCHAR: + return createStringPredicate(kind, columnName, value.toString()); + case BINARY: + case VARBINARY: + org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.BitString bitString = + (org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.BitString) value; + return createBinaryPredicate( + kind, columnName, Binary.fromConstantByteArray(bitString.getAsByteArray())); + case DECIMAL: + BigDecimal bigDecimalValue = (BigDecimal) value; + return createBinaryPredicate( + kind, + columnName, + Binary.fromConstantByteArray(bigDecimalValue.unscaledValue().toByteArray())); + case DATE: + case TIME: + case TIMESTAMP: + case ARRAY: + case MAP: + case ROW: + default: + return null; + } + } + + private static String getColumnName(RexInputRef columnRef, Schema beamSchema) { + return beamSchema.getField(columnRef.getIndex()).getName(); + } + + private static FilterPredicate createIntPredicate(SqlKind kind, String name, Integer value) { Review Comment: I think the `create___Predicate` methods can be merged into one method. A lot of it seems to be duplicated logic ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); + if (COMBINERS.containsKey(kind)) { + return combine((RexCall) e, beamSchema); + } + + switch (kind) { + case IN: + return toInPredicate((RexCall) e, beamSchema, false); + case NOT_IN: + return toInPredicate((RexCall) e, beamSchema, true); + case NOT: + FilterPredicate inner = convert(((RexCall) e).getOperands().get(0), beamSchema); + return (inner == null) ? null : FilterApi.not(inner); + case IS_NULL: + case IS_NOT_NULL: + return toUnaryPredicate((RexCall) e, beamSchema); + default: + if (COMPARISON_KINDS.contains(kind)) { + return toBinaryPredicate((RexCall) e, beamSchema); + } + return null; + } + } + + @Nullable + private static FilterPredicate combine(RexCall call, Schema beamSchema) { + FilterCombiner combiner = COMBINERS.get(call.getKind()); + if (combiner == null) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode op : call.getOperands()) { + FilterPredicate p = convert(op, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(combiner::combine).orElse(null); + } + + @Nullable + private static FilterPredicate toInPredicate(RexCall call, Schema beamSchema, boolean isNotIn) { + RexInputRef columnRef = (RexInputRef) call.getOperands().get(0); + List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); + SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; + + // CHANGE: Use an explicit loop + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode valueNode : valueNodes) { + FilterPredicate p = + createSingleComparison(comparison, columnRef, (RexLiteral) valueNode, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(isNotIn ? FilterApi::and : FilterApi::or).orElse(null); + } + + @Nullable + private static FilterPredicate toUnaryPredicate(RexCall call, Schema beamSchema) { + RexNode operand = call.getOperands().get(0); + if (!(operand instanceof RexInputRef)) { + return null; + } + RexInputRef columnRef = (RexInputRef) operand; + String columnName = getColumnName(columnRef, beamSchema); + SqlTypeName type = columnRef.getType().getSqlTypeName(); + boolean isNull = call.getKind() == SqlKind.IS_NULL; + switch (type) { + case INTEGER: + return isNull + ? FilterApi.eq(FilterApi.intColumn(columnName), null) + : FilterApi.notEq(FilterApi.intColumn(columnName), null); + case BIGINT: + return isNull + ? FilterApi.eq(FilterApi.longColumn(columnName), null) + : FilterApi.notEq(FilterApi.longColumn(columnName), null); + case FLOAT: + return isNull + ? FilterApi.eq(FilterApi.floatColumn(columnName), null) + : FilterApi.notEq(FilterApi.floatColumn(columnName), null); + case DOUBLE: + return isNull + ? FilterApi.eq(FilterApi.doubleColumn(columnName), null) + : FilterApi.notEq(FilterApi.doubleColumn(columnName), null); + case BOOLEAN: + return isNull + ? FilterApi.eq(FilterApi.booleanColumn(columnName), null) + : FilterApi.notEq(FilterApi.booleanColumn(columnName), null); + case VARCHAR: + case CHAR: + case DECIMAL: + return isNull + ? FilterApi.eq(FilterApi.binaryColumn(columnName), null) + : FilterApi.notEq(FilterApi.binaryColumn(columnName), null); + default: + return null; + } + } + + @Nullable + private static FilterPredicate toBinaryPredicate(RexCall call, Schema beamSchema) { + RexNode left = call.getOperands().get(0); + RexNode right = call.getOperands().get(1); + if (left.getKind() == SqlKind.CAST) { + left = ((RexCall) left).getOperands().get(0); + } + if (right.getKind() == SqlKind.CAST) { + right = ((RexCall) right).getOperands().get(0); + } + if (!(left instanceof RexInputRef) || !(right instanceof RexLiteral)) { + return null; + } + return createSingleComparison( + call.getKind(), (RexInputRef) left, (RexLiteral) right, beamSchema); + } + + @Nullable + private static FilterPredicate createSingleComparison( + SqlKind kind, RexInputRef columnRef, RexLiteral literal, Schema beamSchema) { + + String columnName = getColumnName(columnRef, beamSchema); + SqlTypeName columnType = columnRef.getType().getSqlTypeName(); + + Comparable<?> value = literal.getValueAs(Comparable.class); + if (value == null) { + return null; + } + + switch (columnType) { + case TINYINT: + case SMALLINT: + case INTEGER: + return createIntPredicate(kind, columnName, ((Number) value).intValue()); + case BIGINT: + return createLongPredicate(kind, columnName, ((Number) value).longValue()); + case FLOAT: + return createFloatPredicate(kind, columnName, ((Number) value).floatValue()); + case DOUBLE: + return createDoublePredicate(kind, columnName, ((Number) value).doubleValue()); + case BOOLEAN: + return createBooleanPredicate(kind, columnName, (Boolean) value); + case CHAR: + case VARCHAR: + return createStringPredicate(kind, columnName, value.toString()); + case BINARY: + case VARBINARY: + org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.BitString bitString = + (org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.BitString) value; + return createBinaryPredicate( + kind, columnName, Binary.fromConstantByteArray(bitString.getAsByteArray())); + case DECIMAL: + BigDecimal bigDecimalValue = (BigDecimal) value; + return createBinaryPredicate( + kind, + columnName, + Binary.fromConstantByteArray(bigDecimalValue.unscaledValue().toByteArray())); + case DATE: + case TIME: + case TIMESTAMP: + case ARRAY: + case MAP: + case ROW: + default: + return null; Review Comment: This will suppress bad filters and may confuse users who expect it to work. Can we throw an UnsupportedOperationException instead? ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); + if (COMBINERS.containsKey(kind)) { + return combine((RexCall) e, beamSchema); + } + + switch (kind) { + case IN: + return toInPredicate((RexCall) e, beamSchema, false); + case NOT_IN: + return toInPredicate((RexCall) e, beamSchema, true); + case NOT: + FilterPredicate inner = convert(((RexCall) e).getOperands().get(0), beamSchema); + return (inner == null) ? null : FilterApi.not(inner); + case IS_NULL: + case IS_NOT_NULL: + return toUnaryPredicate((RexCall) e, beamSchema); + default: + if (COMPARISON_KINDS.contains(kind)) { + return toBinaryPredicate((RexCall) e, beamSchema); + } + return null; + } + } + + @Nullable + private static FilterPredicate combine(RexCall call, Schema beamSchema) { + FilterCombiner combiner = COMBINERS.get(call.getKind()); + if (combiner == null) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode op : call.getOperands()) { + FilterPredicate p = convert(op, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(combiner::combine).orElse(null); + } + + @Nullable + private static FilterPredicate toInPredicate(RexCall call, Schema beamSchema, boolean isNotIn) { + RexInputRef columnRef = (RexInputRef) call.getOperands().get(0); + List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); + SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; + + // CHANGE: Use an explicit loop + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode valueNode : valueNodes) { + FilterPredicate p = + createSingleComparison(comparison, columnRef, (RexLiteral) valueNode, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(isNotIn ? FilterApi::and : FilterApi::or).orElse(null); + } + + @Nullable + private static FilterPredicate toUnaryPredicate(RexCall call, Schema beamSchema) { + RexNode operand = call.getOperands().get(0); + if (!(operand instanceof RexInputRef)) { + return null; + } + RexInputRef columnRef = (RexInputRef) operand; + String columnName = getColumnName(columnRef, beamSchema); + SqlTypeName type = columnRef.getType().getSqlTypeName(); + boolean isNull = call.getKind() == SqlKind.IS_NULL; + switch (type) { + case INTEGER: + return isNull + ? FilterApi.eq(FilterApi.intColumn(columnName), null) + : FilterApi.notEq(FilterApi.intColumn(columnName), null); + case BIGINT: + return isNull + ? FilterApi.eq(FilterApi.longColumn(columnName), null) + : FilterApi.notEq(FilterApi.longColumn(columnName), null); + case FLOAT: + return isNull + ? FilterApi.eq(FilterApi.floatColumn(columnName), null) + : FilterApi.notEq(FilterApi.floatColumn(columnName), null); + case DOUBLE: + return isNull + ? FilterApi.eq(FilterApi.doubleColumn(columnName), null) + : FilterApi.notEq(FilterApi.doubleColumn(columnName), null); + case BOOLEAN: + return isNull + ? FilterApi.eq(FilterApi.booleanColumn(columnName), null) + : FilterApi.notEq(FilterApi.booleanColumn(columnName), null); + case VARCHAR: + case CHAR: + case DECIMAL: + return isNull + ? FilterApi.eq(FilterApi.binaryColumn(columnName), null) + : FilterApi.notEq(FilterApi.binaryColumn(columnName), null); + default: + return null; Review Comment: throw an UnsupportedOperationException? ########## sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderFilterPushDownTest.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Parameterized test for ParquetTable's filter and projection pushdown capabilities. */ +@RunWith(Parameterized.class) +@Category(NeedsRunner.class) +public class ParquetTableProviderFilterPushDownTest implements Serializable { + + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + @ClassRule public static final TestPipeline WRITE_PIPELINE = TestPipeline.create(); + @Rule public final transient TestPipeline readPipeline = TestPipeline.create(); + + private static BeamSqlEnv env; + private static final Schema FULL_SCHEMA = + Schema.builder() + .addInt32Field("id") + .addNullableField("product_name", Schema.FieldType.STRING) + .addBooleanField("is_stocked") + .addDoubleField("price") + .addInt32Field("category_id") + .build(); + + private static final Schema PROJECTED_ID_PRICE_SCHEMA = + Schema.builder().addInt32Field("id").addDoubleField("price").build(); + + private static final Schema PROJECTED_NAME_CAT_SCHEMA = + Schema.builder() + .addNullableField("product_name", Schema.FieldType.STRING) + .addInt32Field("category_id") + .build(); + + @Parameter(0) + public String testCaseName; + + @Parameter(1) + public String query; + + @Parameter(2) + public List<Object> params; + + @Parameter(3) + public long expectedReadCount; + + @Parameter(4) + public List<Row> expectedRows; + + @Parameter(5) + public Schema expectedSchema; + + @Parameters(name = "{0}") + public static Collection<Object[]> data() { + return Arrays.asList( + new Object[][] { + { + "Filter: PriceGreaterThan", + "SELECT * FROM ProductInfo WHERE price > 200.0", + Collections.emptyList(), + 3L, + Arrays.asList( + Row.withSchema(FULL_SCHEMA).addValues(1, "Laptop", true, 1200.50, 101).build(), + Row.withSchema(FULL_SCHEMA).addValues(4, "Monitor", true, 300.0, 102).build(), + Row.withSchema(FULL_SCHEMA).addValues(6, "Dock", true, 250.0, 103).build()), + FULL_SCHEMA + }, + { + "Filter: StockedAndCategory", + "SELECT * FROM ProductInfo WHERE is_stocked = TRUE AND category_id = 101", + Collections.emptyList(), + 2L, + Arrays.asList( + Row.withSchema(FULL_SCHEMA).addValues(1, "Laptop", true, 1200.50, 101).build(), + Row.withSchema(FULL_SCHEMA).addValues(2, "Mouse", true, 25.0, 101).build()), + FULL_SCHEMA + }, + { + "Filter: IsNotNull", + "SELECT * FROM ProductInfo WHERE product_name IS NOT NULL", + Collections.emptyList(), + 6L, + Arrays.asList( + Row.withSchema(FULL_SCHEMA).addValues(1, "Laptop", true, 1200.50, 101).build(), + Row.withSchema(FULL_SCHEMA).addValues(2, "Mouse", true, 25.0, 101).build(), + Row.withSchema(FULL_SCHEMA).addValues(3, "Keyboard", false, 75.25, 101).build(), + Row.withSchema(FULL_SCHEMA).addValues(4, "Monitor", true, 300.0, 102).build(), + Row.withSchema(FULL_SCHEMA).addValues(5, "Webcam", false, 150.0, 102).build(), + Row.withSchema(FULL_SCHEMA).addValues(6, "Dock", true, 250.0, 103).build()), + FULL_SCHEMA + }, + { + "Filter: Parameterized (No Pushdown)", + "SELECT * FROM ProductInfo WHERE price > ? AND is_stocked = ?", + Arrays.asList(100.0, true), Review Comment: Are the values inside the array supposed to be substituted for `"?"` ? If so, the output seems to be incorrect ########## sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetFilterFactory.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilterFactory { + private static final ImmutableMap<SqlKind, FilterCombiner> COMBINERS = + ImmutableMap.of(SqlKind.AND, FilterApi::and, SqlKind.OR, FilterApi::or); + + private static final Set<SqlKind> COMPARISON_KINDS = + ImmutableSet.of( + SqlKind.EQUALS, + SqlKind.NOT_EQUALS, + SqlKind.GREATER_THAN, + SqlKind.GREATER_THAN_OR_EQUAL, + SqlKind.LESS_THAN, + SqlKind.LESS_THAN_OR_EQUAL); + + public static ParquetFilter create(List<RexNode> expressions, Schema beamSchema) { + FilterPredicate internalPredicate = toFilterPredicate(expressions, beamSchema); + return new ParquetFilterImpl(internalPredicate); + } + + public static ParquetFilter fromPredicate(FilterPredicate predicate) { + return new ParquetFilterImpl(predicate); + } + + /** Private implementation of our filter interface that holds the real Parquet object. */ + static class ParquetFilterImpl implements ParquetFilter { + private final @Nullable FilterPredicate predicate; + + ParquetFilterImpl(@Nullable FilterPredicate predicate) { + this.predicate = predicate; + } + + // This method allows ParquetIO to "unwrap" the filter. + @Nullable + FilterPredicate getPredicate() { + return predicate; + } + } + + @Nullable + private static FilterPredicate toFilterPredicate(List<RexNode> expressions, Schema beamSchema) { + if (expressions == null || expressions.isEmpty()) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode expr : expressions) { + FilterPredicate p = convert(expr, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(FilterApi::and).orElse(null); + } + + @Nullable + private static FilterPredicate convert(RexNode e, Schema beamSchema) { + SqlKind kind = e.getKind(); + if (COMBINERS.containsKey(kind)) { + return combine((RexCall) e, beamSchema); + } + + switch (kind) { + case IN: + return toInPredicate((RexCall) e, beamSchema, false); + case NOT_IN: + return toInPredicate((RexCall) e, beamSchema, true); + case NOT: + FilterPredicate inner = convert(((RexCall) e).getOperands().get(0), beamSchema); + return (inner == null) ? null : FilterApi.not(inner); + case IS_NULL: + case IS_NOT_NULL: + return toUnaryPredicate((RexCall) e, beamSchema); + default: + if (COMPARISON_KINDS.contains(kind)) { + return toBinaryPredicate((RexCall) e, beamSchema); + } + return null; + } + } + + @Nullable + private static FilterPredicate combine(RexCall call, Schema beamSchema) { + FilterCombiner combiner = COMBINERS.get(call.getKind()); + if (combiner == null) { + return null; + } + + List<FilterPredicate> predicates = new ArrayList<>(); + for (RexNode op : call.getOperands()) { + FilterPredicate p = convert(op, beamSchema); + if (p != null) { + predicates.add(p); + } + } + + if (predicates.isEmpty()) { + return null; + } + return predicates.stream().reduce(combiner::combine).orElse(null); + } + + @Nullable + private static FilterPredicate toInPredicate(RexCall call, Schema beamSchema, boolean isNotIn) { + RexInputRef columnRef = (RexInputRef) call.getOperands().get(0); + List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); + SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; + + // CHANGE: Use an explicit loop Review Comment: Elaborate what this means? Should this be a TODO? ########## sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java: ########## @@ -64,23 +69,70 @@ public PCollection<Row> buildIOReader(PBegin begin) { @Override public PCollection<Row> buildIOReader( - PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) { + PBegin begin, BeamSqlTableFilter filter, List<String> projectedFieldNames) { + + // Determine ALL fields required for the read (projection + filter fields). + Set<String> requiredFieldsForRead = new HashSet<>(projectedFieldNames); + if (filter instanceof ParquetFilter) { + ParquetFilter parquetFilter = (ParquetFilter) filter; + requiredFieldsForRead.addAll(parquetFilter.getReferencedFields(getSchema())); + } + + // If no fields are projected or filtered, read the full schema. final Schema schema = AvroUtils.toAvroSchema(table.getSchema()); + Schema readSchema = + requiredFieldsForRead.isEmpty() + ? schema + : projectSchema(schema, new ArrayList<>(requiredFieldsForRead)); + + LOG.info("Projecting fields schema: {}", readSchema); + String filePattern = resolveFilePattern(table.getLocation()); Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern); - if (!fieldNames.isEmpty()) { - Schema projectionSchema = projectSchema(schema, fieldNames); - LOG.info("Projecting fields schema: {}", projectionSchema); - read = read.withProjection(projectionSchema, projectionSchema); + read = read.withProjection(readSchema, readSchema); Review Comment: I think we should be making the unwanted columns nullable for the second `readSchema`. Check ParquetIO javadoc: https://github.com/apache/beam/blob/cd72a255018126e3db15ad77ce8ae3a82623effb/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L137-L141 ########## sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderFilterPushDownTest.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.parquet; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Parameterized test for ParquetTable's filter and projection pushdown capabilities. */ +@RunWith(Parameterized.class) +@Category(NeedsRunner.class) +public class ParquetTableProviderFilterPushDownTest implements Serializable { + + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + @ClassRule public static final TestPipeline WRITE_PIPELINE = TestPipeline.create(); + @Rule public final transient TestPipeline readPipeline = TestPipeline.create(); + + private static BeamSqlEnv env; + private static final Schema FULL_SCHEMA = + Schema.builder() + .addInt32Field("id") + .addNullableField("product_name", Schema.FieldType.STRING) + .addBooleanField("is_stocked") + .addDoubleField("price") + .addInt32Field("category_id") + .build(); + + private static final Schema PROJECTED_ID_PRICE_SCHEMA = + Schema.builder().addInt32Field("id").addDoubleField("price").build(); + + private static final Schema PROJECTED_NAME_CAT_SCHEMA = + Schema.builder() + .addNullableField("product_name", Schema.FieldType.STRING) + .addInt32Field("category_id") + .build(); + + @Parameter(0) + public String testCaseName; + + @Parameter(1) + public String query; + + @Parameter(2) + public List<Object> params; + + @Parameter(3) + public long expectedReadCount; + + @Parameter(4) + public List<Row> expectedRows; + + @Parameter(5) + public Schema expectedSchema; Review Comment: I think we don't need `expectedReadCount` or `expectedSchema`, right? That information is already captured in `expectedRows` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org