This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit e30e9029fb605475d0b925f7d0798b7322da6a37 Author: Peeyush Gupta <[email protected]> AuthorDate: Tue Apr 8 11:36:00 2025 -0700 [ASTERIXDB-3575][EXT] Pushdown predicates for Parquet external datasets to filter row groups - user model changes: no - storage format changes: no - interface changes: yes Ext-ref: MB-65316 Change-Id: I2c3214e2a351252fb1929aa1562cbab2d67fa9aa Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19633 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../rules/PushValueAccessAndFilterDownRule.java | 2 + .../optimizer/rules/pushdown/PushdownContext.java | 13 +- .../rules/pushdown/PushdownProcessorsExecutor.java | 8 + .../ParquetDatasetScanDefineDescriptor.java | 45 +++++ .../processor/ColumnFilterPushdownProcessor.java | 2 +- .../processor/ParquetFilterPushdownProcessor.java | 85 ++++++++ .../parquet/embed-one-value/one-field.011.plan | 2 +- .../parquet/embed-one-value/one-field.021.plan | 2 +- .../parquet/embed-one-value/one-field.031.plan | 2 +- .../parquet/embed-one-value/one-field.121.plan | 2 +- .../parquet/one-field/one-field.011.plan | 2 +- .../parquet/one-field/one-field.021.plan | 2 +- .../parquet/one-field/one-field.031.plan | 2 +- .../parquet/one-field/one-field.121.plan | 2 +- .../parquet/pushdown-plans/pushdown-plans.03.plan | 2 +- .../parquet/pushdown-plans/pushdown-plans.04.plan | 2 +- .../parquet/pushdown-plans/pushdown-plans.05.plan | 2 +- .../parquet/pushdown-plans/pushdown-plans.08.plan | 2 +- .../parquet/pushdown-plans/pushdown-plans.09.plan | 2 +- .../filter/ParquetFilterEvaluatorFactory.java | 55 ++++++ .../aws/parquet/AwsS3ParquetReaderFactory.java | 10 + .../parquet/AzureBlobParquetReaderFactory.java | 10 + .../parquet/AzureDataLakeParquetReaderFactory.java | 10 + .../gcs/parquet/GCSParquetReaderFactory.java | 10 + .../apache/asterix/metadata/utils/DatasetUtil.java | 5 + .../apache/asterix/metadata/utils/IndexUtil.java | 23 +++ .../utils/filter/ParquetFilterBuilder.java | 220 +++++++++++++++++++++ .../ExternalDatasetProjectionFiltrationInfo.java | 2 +- ...uetExternalDatasetProjectionFiltrationInfo.java | 86 ++++++++ 29 files changed, 595 insertions(+), 17 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java index a101bed470..bd9e3293a5 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java @@ -35,6 +35,7 @@ import org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjecti import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor; +import org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -120,6 +121,7 @@ public class PushValueAccessAndFilterDownRule implements IAlgebraicRewriteRule { // Performs prefix pushdowns pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context)); pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context)); + pushdownProcessorsExecutor.add(new ParquetFilterPushdownProcessor(pushdownContext, context)); pushdownProcessorsExecutor .add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context)); // Inlines AND/OR expression (must be last to run) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java index 1622c6e53e..c4210a3f9e 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java @@ -26,7 +26,9 @@ import java.util.Map; import java.util.Set; import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor; +import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor; import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor; import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor; import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor; @@ -84,8 +86,15 @@ public class PushdownContext { public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable, LogicalVariable metaVariable, AbstractScanOperator scanOperator) { - ScanDefineDescriptor scanDefDesc = - new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator); + ScanDefineDescriptor scanDefDesc; + if (DatasetUtil.isParquetFormat(dataset)) { + scanDefDesc = new ParquetDatasetScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, + metaVariable, scanOperator); + } else { + scanDefDesc = new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, + scanOperator); + } + new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator); defineChain.put(recordVariable, scanDefDesc); useChain.put(recordVariable, new ArrayList<>()); if (metaVariable != null) { diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java index 023e4da61e..cf438d80d4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java @@ -31,12 +31,14 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.ExternalDatasetDetails; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor; import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor; import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor; import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo; import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; import org.apache.asterix.runtime.projection.FunctionCallInformation; +import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; @@ -121,6 +123,12 @@ public class PushdownProcessorsExecutor { Map<String, String> configuration = ((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties(); boolean embedFilterValues = ExternalDataPrefix.containsComputedFields(configuration) && Boolean.parseBoolean( configuration.getOrDefault(ExternalDataConstants.KEY_EMBED_FILTER_VALUES, ExternalDataConstants.TRUE)); + if (DatasetUtil.isParquetFormat(dataset)) { + return new ParquetExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations, + scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(), + ((ParquetDatasetScanDefineDescriptor) scanDefineDescriptor).getRowGroupFilterExpression(), + embedFilterValues); + } return new ExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations, scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(), embedFilterValues); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java new file mode 100644 index 0000000000..4b4d742eb1 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.optimizer.rules.pushdown.descriptor; + +import java.util.List; + +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; + +public class ParquetDatasetScanDefineDescriptor extends ScanDefineDescriptor { + + private ILogicalExpression rowGroupFilterExpression; + + public ParquetDatasetScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables, + LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) { + super(scope, dataset, primaryKeyVariables, recordVariable, metaRecordVariable, operator); + this.rowGroupFilterExpression = null; + } + + public ILogicalExpression getRowGroupFilterExpression() { + return rowGroupFilterExpression; + } + + public void setRowGroupFilterExpression(ILogicalExpression rowGroupFilterExpression) { + this.rowGroupFilterExpression = rowGroupFilterExpression; + } +} diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java index 1ec64d5b33..0d79767ea3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java @@ -61,7 +61,7 @@ public class ColumnFilterPushdownProcessor extends AbstractFilterPushdownProcess protected final ExpressionToExpectedSchemaNodeVisitor exprToNodeVisitor; protected final ColumnFilterPathBuilderVisitor pathBuilderVisitor; protected final Map<ILogicalExpression, ARecordType> paths; - private final ArrayPathCheckerVisitor checkerVisitor; + protected final ArrayPathCheckerVisitor checkerVisitor; public ColumnFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) { super(pushdownContext, context); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java new file mode 100644 index 0000000000..25db9f9342 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.optimizer.rules.pushdown.processor; + +import static org.apache.asterix.metadata.utils.PushdownUtil.RANGE_FILTER_PUSHABLE_FUNCTIONS; + +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.optimizer.rules.pushdown.PushdownContext; +import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor; +import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor; +import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode; +import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType; +import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class ParquetFilterPushdownProcessor extends ColumnFilterPushdownProcessor { + + public ParquetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) { + super(pushdownContext, context); + } + + @Override + protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException { + return !DatasetUtil.isParquetFormat(scanDefineDescriptor.getDataset()); + } + + @Override + protected boolean isNotPushable(AbstractFunctionCallExpression expression) { + FunctionIdentifier fid = expression.getFunctionIdentifier(); + return !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier()); + } + + @Override + protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException { + IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null); + if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) { + return false; + } + + // The inferred path from the provided expression + ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node); + paths.put(expression, expressionPath); + return true; + } + + @Override + protected void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor, ILogicalExpression inlinedExpr) + throws AlgebricksException { + if (checkerVisitor.containsMultipleArrayPaths(paths.values())) { + // Cannot pushdown a filter with multiple unnest + // TODO allow rewindable column readers for filters + // TODO this is a bit conservative (maybe too conservative) as we can push part of expression down + return; + } + ParquetDatasetScanDefineDescriptor scanDefDesc = (ParquetDatasetScanDefineDescriptor) scanDefineDescriptor; + ILogicalExpression filterExpr = scanDefDesc.getRowGroupFilterExpression(); + if (filterExpr != null) { + filterExpr = andExpression(filterExpr, inlinedExpr); + scanDefDesc.setRowGroupFilterExpression(filterExpr); + } else { + scanDefDesc.setRowGroupFilterExpression(inlinedExpr); + } + } +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan index b560a5b858..805b9aa53a 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan index 2bfad0690a..a57424cf57 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan index afb74f1838..5e231767ca 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.Department embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.Department embed-filter-value: true row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan index fc48056b72..5832f4f1dc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan index 8dcaa959f4..ecbd5ee343 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan index a99cc0c7e4..35da9483fb 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan index a7c5727c6d..23892f1f71 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.Department [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.Department row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan index b48def6ef3..9fb8ee15f8 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan @@ -16,7 +16,7 @@ distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan index 2b413072eb..9dab9c1dc3 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan @@ -14,7 +14,7 @@ distribute result [$$p1] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- STREAM_SELECT |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$p1] <- test.ParquetDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$p1] <- test.ParquetDataset1 row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan index 3993a4db52..2fca35d399 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan @@ -36,7 +36,7 @@ distribute result [$$69] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- STREAM_SELECT |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan index 9b93e19524..945f2da54c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan @@ -36,7 +36,7 @@ distribute result [$$68] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- STREAM_SELECT |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan index cccf9a347d..694172004e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan @@ -44,7 +44,7 @@ distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("x"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan index cf265bfb53..316e465ef1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan @@ -44,7 +44,7 @@ distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ASSIGN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- DATASOURCE_SCAN |PARTITIONED| exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java new file mode 100644 index 0000000000..774e90853b --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.filter; + +import org.apache.asterix.common.external.IExternalFilterEvaluator; +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.parquet.filter2.predicate.FilterPredicate; + +public class ParquetFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory { + private static final long serialVersionUID = 1L; + private final FilterPredicate filterExpression; + private final IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory; + + public ParquetFilterEvaluatorFactory(IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory, + FilterPredicate expression) { + this.externalFilterEvaluatorFactory = externalFilterEvaluatorFactory; + this.filterExpression = expression; + } + + @Override + public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector) + throws HyracksDataException { + return externalFilterEvaluatorFactory.create(serviceContext, warningCollector); + } + + @SuppressWarnings("unchecked") + @Override + public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) { + return externalFilterEvaluatorFactory.createValueEmbedder(warningCollector); + } + + public FilterPredicate getFilterExpression() { + return filterExpression; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java index 2d92e10dcb..f5dc7a2eb6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java @@ -33,6 +33,7 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; +import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; @@ -44,6 +45,8 @@ import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; import com.amazonaws.SdkBaseException; @@ -91,6 +94,13 @@ public class AwsS3ParquetReaderFactory extends HDFSDataSourceFactory { IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions); configureHdfsConf(conf, configuration); + if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) { + FilterPredicate parquetFilterPredicate = + ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); + if (parquetFilterPredicate != null) { + ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate); + } + } } catch (SdkException | SdkBaseException ex) { throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); } catch (AlgebricksException ex) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java index 530ce74736..72c9977f0c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; +import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; @@ -41,6 +42,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.models.BlobItem; @@ -82,6 +85,13 @@ public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory { JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory); configureAzureHdfsJobConf(conf, configuration, endPoint); configureHdfsConf(conf, configuration); + if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) { + FilterPredicate parquetFilterPredicate = + ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); + if (parquetFilterPredicate != null) { + ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate); + } + } } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java index 4dedb0829a..1db4445a07 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; +import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; @@ -40,6 +41,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.models.PathItem; @@ -69,6 +72,13 @@ public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory { JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory); configureAzureHdfsJobConf(conf, configuration, endPoint); configureHdfsConf(conf, configuration); + if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) { + FilterPredicate parquetFilterPredicate = + ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); + if (parquetFilterPredicate != null) { + ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate); + } + } } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java index 874c3bd78f..9469747b4b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.external.IExternalFilterEvaluator; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; +import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; @@ -39,6 +40,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; import com.google.cloud.storage.Blob; @@ -76,6 +79,13 @@ public class GCSParquetReaderFactory extends HDFSDataSourceFactory { int numberOfPartitions = getPartitionConstraint().getLocations().length; GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions); configureHdfsConf(conf, configuration); + if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) { + FilterPredicate parquetFilterPredicate = + ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); + if (parquetFilterPredicate != null) { + ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate); + } + } } @Override diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 2551b865c9..0cd150ec04 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -854,4 +854,9 @@ public class DatasetUtil { return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils .isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); } + + public static boolean isParquetFormat(Dataset dataset) { + return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils + .isParquetFormat(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 593929025f..dfefb8ff9e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -19,6 +19,7 @@ package org.apache.asterix.metadata.utils; import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; @@ -43,6 +44,7 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory; import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.metadata.dataset.DatasetFormatInfo; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -53,6 +55,7 @@ import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder; import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder; import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder; import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder; +import org.apache.asterix.metadata.utils.filter.ParquetFilterBuilder; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.IAObject; import org.apache.asterix.om.types.ARecordType; @@ -61,6 +64,7 @@ import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo; import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; import org.apache.asterix.runtime.projection.FunctionCallInformation; +import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; @@ -346,6 +350,25 @@ public class IndexUtil { (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv); return builder.build(); } + } else if (isParquetFormat(properties)) { + if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) { + return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE; + } else { + ExternalDataPrefix prefix = new ExternalDataPrefix(properties); + ExternalDatasetProjectionFiltrationInfo pfi = + (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo; + IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory = + NoOpExternalFilterEvaluatorFactory.INSTANCE; + if (!prefix.getPaths().isEmpty()) { + ExternalFilterBuilder externalFilterBuilder = + new ExternalFilterBuilder(pfi, context, typeEnv, prefix); + externalFilterEvaluatorFactory = externalFilterBuilder.build(); + } + ParquetFilterBuilder builder = new ParquetFilterBuilder( + (ParquetExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv); + return new ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory, + builder.buildFilterPredicate()); + } } else { if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) { return NoOpExternalFilterEvaluatorFactory.INSTANCE; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java new file mode 100644 index 0000000000..38ad119ae0 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils.filter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.om.base.ADate; +import org.apache.asterix.om.base.ADateTime; +import org.apache.asterix.om.base.ADouble; +import org.apache.asterix.om.base.AInt16; +import org.apache.asterix.om.base.AInt32; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AInt8; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.constants.AsterixConstantValue; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.io.api.Binary; + +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Predicate; + +public class ParquetFilterBuilder extends AbstractFilterBuilder { + + private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger(); + + public ParquetFilterBuilder(ParquetExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo, + JobGenContext context, IVariableTypeEnvironment typeEnv) { + super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getParquetRowGroupFilterExpression(), + context, typeEnv); + } + + public FilterPredicate buildFilterPredicate() throws AlgebricksException { + FilterPredicate parquetFilterPredicate = null; + if (filterExpression != null) { + try { + parquetFilterPredicate = createFilterExpression(filterExpression); + } catch (Exception e) { + LOGGER.error("Error creating Parquet row-group filter expression ", e); + } + } + if (parquetFilterPredicate != null && !(parquetFilterPredicate instanceof Predicate)) { + parquetFilterPredicate = null; + } + return parquetFilterPredicate; + } + + private FilterPredicate createComparisonExpression(ILogicalExpression columnName, ILogicalExpression constValue, + FunctionIdentifier fid) throws AlgebricksException { + ConstantExpression constExpr = (ConstantExpression) constValue; + if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) { + throw new RuntimeException("Unsupported literal type: " + constExpr.getValue()); + } + AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue(); + String fieldName = createColumnExpression(columnName); + switch (constantValue.getObject().getType().getTypeTag()) { + case STRING: + return createComparisionFunction(FilterApi.binaryColumn(fieldName), + Binary.fromString(((AString) constantValue.getObject()).getStringValue()), fid); + case TINYINT: + return createComparisionFunction(FilterApi.intColumn(fieldName), + (int) ((AInt8) constantValue.getObject()).getByteValue(), fid); + case SMALLINT: + return createComparisionFunction(FilterApi.intColumn(fieldName), + (int) ((AInt16) constantValue.getObject()).getShortValue(), fid); + case INTEGER: + return createComparisionFunction(FilterApi.intColumn(fieldName), + ((AInt32) constantValue.getObject()).getIntegerValue(), fid); + case BOOLEAN: + if (!fid.equals(AlgebricksBuiltinFunctions.EQ)) { + throw new RuntimeException("Unsupported comparison function: " + fid); + } + return FilterApi.eq(FilterApi.booleanColumn(fieldName), constantValue.isTrue()); + case BIGINT: + return createComparisionFunction(FilterApi.longColumn(fieldName), + ((AInt64) constantValue.getObject()).getLongValue(), fid); + case DOUBLE: + return createComparisionFunction(FilterApi.doubleColumn(fieldName), + ((ADouble) constantValue.getObject()).getDoubleValue(), fid); + case DATE: + return createComparisionFunction(FilterApi.intColumn(fieldName), + ((ADate) constantValue.getObject()).getChrononTimeInDays(), fid); + case DATETIME: + Long millis = ((ADateTime) constantValue.getObject()).getChrononTime(); + return createComparisionFunction(FilterApi.longColumn(fieldName), + TimeUnit.MILLISECONDS.toMicros(millis), fid); + default: + throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType()); + } + } + + @Override + protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) { + return null; + } + + private FilterPredicate createFilterExpression(ILogicalExpression expr) throws AlgebricksException { + if (expr == null || expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + throw new RuntimeException("Unsupported expression: " + expr); + } + AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; + IFunctionDescriptor fd = resolveFunction(funcExpr); + FunctionIdentifier fid = fd.getIdentifier(); + if (funcExpr.getArguments().size() != 2 + && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) { + throw new RuntimeException("Unsupported function: " + funcExpr); + } + List<Mutable<ILogicalExpression>> args = funcExpr.getArguments(); + if (fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR)) { + return createAndOrPredicate(fid, args, 0); + } else { + return createComparisonExpression(args.get(0).getValue(), args.get(1).getValue(), fid); + } + } + + private <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsLtGt> FilterPredicate createComparisionFunction( + C column, T value, FunctionIdentifier fid) { + if (fid.equals(AlgebricksBuiltinFunctions.EQ)) { + return FilterApi.eq(column, value); + } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) { + return FilterApi.gtEq(column, value); + } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) { + return FilterApi.gt(column, value); + } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) { + return FilterApi.ltEq(column, value); + } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) { + return FilterApi.lt(column, value); + } else { + throw new RuntimeException("Unsupported function: " + fid); + } + } + + protected String createColumnExpression(ILogicalExpression expression) { + ARecordType path = filterPaths.get(expression); + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Unsupported column expression: " + expression); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + // The field could be a nested field + List<String> fieldList = new ArrayList<>(); + fieldList = createPathExpression(path, fieldList); + return String.join(".", fieldList); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return path.getFieldNames()[0]; + } else { + throw new RuntimeException("Unsupported column expression: " + expression); + } + } + + private List<String> createPathExpression(ARecordType path, List<String> fieldList) { + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Error creating column expression"); + } else { + fieldList.add(path.getFieldNames()[0]); + } + if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return fieldList; + } else { + throw new RuntimeException("Error creating column expression"); + } + } + + // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3)) + private FilterPredicate createAndOrPredicate(FunctionIdentifier function, List<Mutable<ILogicalExpression>> args, + int index) throws AlgebricksException { + if (index == args.size() - 2) { + if (function.equals(AlgebricksBuiltinFunctions.AND)) { + return FilterApi.and(createFilterExpression(args.get(0).getValue()), + createFilterExpression(args.get(1).getValue())); + } else { + return FilterApi.or(createFilterExpression(args.get(0).getValue()), + createFilterExpression(args.get(1).getValue())); + } + } else { + if (function.equals(AlgebricksBuiltinFunctions.AND)) { + return FilterApi.and(createFilterExpression(args.get(index).getValue()), + createAndOrPredicate(function, args, index + 1)); + } else { + return FilterApi.or(createFilterExpression(args.get(index).getValue()), + createAndOrPredicate(function, args, index + 1)); + } + } + } +} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java index e0ea99aa4e..f95483e5a4 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java @@ -43,7 +43,7 @@ public class ExternalDatasetProjectionFiltrationInfo implements IProjectionFiltr protected final ARecordType projectedType; protected final ILogicalExpression filterExpression; protected final Map<String, FunctionCallInformation> functionCallInfoMap; - private final boolean embedFilterValues; + protected final boolean embedFilterValues; protected final Map<ILogicalExpression, ARecordType> filterPaths; public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType, diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java new file mode 100644 index 0000000000..323d3ed83c --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.projection; + +import java.io.IOException; +import java.util.Map; + +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter; + +import com.fasterxml.jackson.core.JsonGenerator; + +public class ParquetExternalDatasetProjectionFiltrationInfo extends ExternalDatasetProjectionFiltrationInfo { + + private ILogicalExpression parquetRowGroupFilterExpression; + + public ParquetExternalDatasetProjectionFiltrationInfo(ARecordType projectedType, + Map<String, FunctionCallInformation> sourceInformationMap, Map<ILogicalExpression, ARecordType> filterPaths, + ILogicalExpression filterExpression, ILogicalExpression parquetRowGroupFilterExpression, + boolean embedFilterValues) { + super(projectedType, sourceInformationMap, filterPaths, filterExpression, embedFilterValues); + this.parquetRowGroupFilterExpression = parquetRowGroupFilterExpression; + } + + private ParquetExternalDatasetProjectionFiltrationInfo(ParquetExternalDatasetProjectionFiltrationInfo other) { + super(other.projectedType, other.functionCallInfoMap, other.filterPaths, other.filterExpression, + other.embedFilterValues); + this.parquetRowGroupFilterExpression = other.parquetRowGroupFilterExpression; + } + + public ILogicalExpression getParquetRowGroupFilterExpression() { + return parquetRowGroupFilterExpression; + } + + @Override + public void print(AlgebricksStringBuilderWriter writer) { + super.print(writer); + if (parquetRowGroupFilterExpression != null) { + writer.append(" row-group-filter on: "); + writer.append(parquetRowGroupFilterExpression.toString()); + } + } + + @Override + public void print(JsonGenerator generator) throws IOException { + super.print(generator); + if (parquetRowGroupFilterExpression != null) { + generator.writeStringField("row-group-filter-on", parquetRowGroupFilterExpression.toString()); + } + } + + @Override + public ParquetExternalDatasetProjectionFiltrationInfo createCopy() { + return new ParquetExternalDatasetProjectionFiltrationInfo(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ParquetExternalDatasetProjectionFiltrationInfo otherInfo = (ParquetExternalDatasetProjectionFiltrationInfo) o; + return super.equals(o) + && filterExpressionEquals(parquetRowGroupFilterExpression, otherInfo.parquetRowGroupFilterExpression); + } +}
