This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 12d39ded1d9411b81e60436cbccf1339fa086416 Author: Peeyush Gupta <[email protected]> AuthorDate: Mon Feb 10 11:41:44 2025 -0800 [ASTERIXDB-3561][EXT] Predicate pushdown for Delta Tables - user model changes: no - storage format changes: no - interface changes: no Details: Predicates are pushed down to reduce the number of files to scan. Ext-ref: MB-65217 Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- .../rules/PushValueAccessAndFilterDownRule.java | 2 + .../DeltaTableFilterPushdownProcessor.java | 64 ++++++++ .../external_dataset/ExternalDatasetTestUtils.java | 3 +- .../deltalake/DeltaTableGenerator.java | 146 +++++++++++++++++ .../deltalake-partitioned-file-read.00.ddl.sqlpp | 35 ++++ .../deltalake-partitioned-file-read.01.query.sqlpp | 22 +++ .../read-data.2.adm | 5 + .../runtimets/testsuite_external_dataset_s3.xml | 6 + .../filter/DeltaTableFilterEvaluatorFactory.java | 53 ++++++ .../NoOpDeltaTableFilterEvaluatorFactory.java | 46 ++++++ .../reader/aws/delta/DeltaReaderFactory.java | 23 ++- .../apache/asterix/metadata/utils/DatasetUtil.java | 5 + .../apache/asterix/metadata/utils/IndexUtil.java | 30 ++-- .../utils/filter/AbstractFilterBuilder.java | 2 +- .../utils/filter/DeltaTableFilterBuilder.java | 180 +++++++++++++++++++++ 15 files changed, 609 insertions(+), 13 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java index 16aeecb8a5..a101bed470 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java @@ -32,6 +32,7 @@ import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnFilterPushdow import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor; +import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor; import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor; import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor; @@ -118,6 +119,7 @@ public class PushValueAccessAndFilterDownRule implements IAlgebraicRewriteRule { } // Performs prefix pushdowns pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context)); + pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context)); pushdownProcessorsExecutor .add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context)); // Inlines AND/OR expression (must be last to run) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java new file mode 100644 index 0000000000..8e5bf5be94 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.optimizer.rules.pushdown.processor; + +import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS; + +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.optimizer.rules.pushdown.PushdownContext; +import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor; +import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode; +import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType; +import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class DeltaTableFilterPushdownProcessor extends ColumnFilterPushdownProcessor { + + public DeltaTableFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) { + super(pushdownContext, context); + } + + @Override + protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException { + return !DatasetUtil.isDeltaTable(scanDefineDescriptor.getDataset()); + } + + @Override + protected boolean isNotPushable(AbstractFunctionCallExpression expression) { + FunctionIdentifier fid = expression.getFunctionIdentifier(); + return ARRAY_FUNCTIONS.contains(fid) || super.isNotPushable(expression); + } + + @Override + protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException { + IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null); + if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) { + return false; + } + + // The inferred path from the provided expression + ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node); + paths.put(expression, expressionPath); + return true; + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java index c0b5e264b0..9a0be3ff94 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java @@ -446,7 +446,8 @@ public class ExternalDatasetTestUtils { loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine", PARQUET_FILTER, "delta-data/"); loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/"); loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/"); - + loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table/_delta_log", JSON_FILTER, "delta-data/"); } private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter, diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java index 67d460c943..7c8840a973 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java @@ -23,8 +23,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -59,6 +61,8 @@ public class DeltaTableGenerator { "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_one"; public static final String DELTA_FILE_SIZE_NINE = "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine"; + public static final String DELTA_PARTITIONED_TABLE = + "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "partitioned_delta_table"; public static void prepareDeltaTableContainer(Configuration conf) { File basePath = new File("."); @@ -68,6 +72,7 @@ public class DeltaTableGenerator { prepareEmptyTable(conf); prepareFileSizeOne(conf); prepareFileSizeNine(conf); + preparePartitionedTable(conf); } public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) { @@ -352,4 +357,145 @@ public class DeltaTableGenerator { throw new RuntimeException(e); } } + + public static void preparePartitionedTable(Configuration conf) { + Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name") + .requiredString("date").requiredString("hour").endRecord(); + try { + List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema), + new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileSecondSnapshotRecords = + List.of(new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileThirdSnapshotRecords = + List.of(new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileFourthSnapshotRecords = + List.of(new GenericData.Record(schema), new GenericData.Record(schema)); + + fileFirstSnapshotRecords.get(0).put("id", 0); + fileFirstSnapshotRecords.get(0).put("name", "Order 1"); + fileFirstSnapshotRecords.get(0).put("date", "01-01-2025"); + fileFirstSnapshotRecords.get(0).put("hour", 10); + + fileFirstSnapshotRecords.get(1).put("id", 1); + fileFirstSnapshotRecords.get(1).put("name", "Order 2"); + fileFirstSnapshotRecords.get(1).put("date", "01-01-2025"); + fileFirstSnapshotRecords.get(1).put("hour", 10); + + fileFirstSnapshotRecords.get(2).put("id", 2); + fileFirstSnapshotRecords.get(2).put("name", "Order 3"); + fileFirstSnapshotRecords.get(2).put("date", "01-01-2025"); + fileFirstSnapshotRecords.get(2).put("hour", 10); + + fileSecondSnapshotRecords.get(0).put("id", 3); + fileSecondSnapshotRecords.get(0).put("name", "Order 10"); + fileSecondSnapshotRecords.get(0).put("date", "01-01-2025"); + fileSecondSnapshotRecords.get(0).put("hour", 15); + + fileSecondSnapshotRecords.get(1).put("id", 4); + fileSecondSnapshotRecords.get(1).put("name", "Order 11"); + fileSecondSnapshotRecords.get(1).put("date", "01-01-2025"); + fileSecondSnapshotRecords.get(1).put("hour", 15); + + fileThirdSnapshotRecords.get(0).put("id", 5); + fileThirdSnapshotRecords.get(0).put("name", "Order 21"); + fileThirdSnapshotRecords.get(0).put("date", "01-02-2025"); + fileThirdSnapshotRecords.get(0).put("hour", 12); + + fileThirdSnapshotRecords.get(1).put("id", 6); + fileThirdSnapshotRecords.get(1).put("name", "Order 22"); + fileThirdSnapshotRecords.get(1).put("date", "01-02-2025"); + fileThirdSnapshotRecords.get(1).put("hour", 12); + + fileFourthSnapshotRecords.get(0).put("id", 7); + fileFourthSnapshotRecords.get(0).put("name", "Order 30"); + fileFourthSnapshotRecords.get(0).put("date", "01-02-2025"); + fileFourthSnapshotRecords.get(0).put("hour", 16); + + fileFourthSnapshotRecords.get(1).put("id", 8); + fileFourthSnapshotRecords.get(1).put("name", "Order 31"); + fileFourthSnapshotRecords.get(1).put("date", "01-02-2025"); + fileFourthSnapshotRecords.get(1).put("hour", 16); + + Path path = new Path(DELTA_PARTITIONED_TABLE, "firstFile.parquet"); + ParquetWriter<GenericData.Record> writer = + AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileFirstSnapshotRecords) { + writer.write(record); + } + long size = writer.getDataSize(); + writer.close(); + + Path path2 = new Path(DELTA_PARTITIONED_TABLE, "secondFile.parquet"); + ParquetWriter<GenericData.Record> writer2 = + AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileSecondSnapshotRecords) { + writer2.write(record); + } + long size2 = writer2.getDataSize(); + writer2.close(); + + Path path3 = new Path(DELTA_PARTITIONED_TABLE, "thirdFile.parquet"); + ParquetWriter<GenericData.Record> writer3 = + AvroParquetWriter.<GenericData.Record> builder(path3).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileThirdSnapshotRecords) { + writer3.write(record); + } + long size3 = writer3.getDataSize(); + writer3.close(); + + Path path4 = new Path(DELTA_PARTITIONED_TABLE, "fourthFile.parquet"); + ParquetWriter<GenericData.Record> writer4 = + AvroParquetWriter.<GenericData.Record> builder(path4).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileFourthSnapshotRecords) { + writer4.write(record); + } + long size4 = writer4.getDataSize(); + writer4.close(); + + DeltaLog log = DeltaLog.forTable(conf, DELTA_PARTITIONED_TABLE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(Arrays.asList("date", "hour")) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("name", new StringType(), true)) + .add(new StructField("date", new StringType(), true)) + .add(new StructField("hour", new IntegerType(), true))) + .build(); + + Map<String, String> partitionValues = new HashMap<>(); + partitionValues.put("date", "01-01-2025"); + partitionValues.put("hour", "10"); + List<Action> actions = List.of(new AddFile("firstFile.parquet", partitionValues, size, + System.currentTimeMillis(), true, null, null)); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + + txn = log.startTransaction(); + partitionValues.clear(); + partitionValues.put("date", "01-01-2025"); + partitionValues.put("hour", "15"); + actions = List.of(new AddFile("secondFile.parquet", partitionValues, size2, System.currentTimeMillis(), + true, null, null)); + txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + txn = log.startTransaction(); + partitionValues.clear(); + partitionValues.put("date", "01-02-2025"); + partitionValues.put("hour", "12"); + actions = List.of(new AddFile("thirdFile.parquet", partitionValues, size3, System.currentTimeMillis(), true, + null, null)); + txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + txn = log.startTransaction(); + partitionValues.clear(); + partitionValues.put("date", "01-02-2025"); + partitionValues.put("hour", "16"); + actions = List.of(new AddFile("fourthFile.parquet", partitionValues, size4, System.currentTimeMillis(), + true, null, null)); + txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp new file mode 100644 index 0000000000..a085d6fb96 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/partitioned_delta_table"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp new file mode 100644 index 0000000000..73ef0f94cc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE date = "01-01-2025" order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm new file mode 100644 index 0000000000..86d41fe4b0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm @@ -0,0 +1,5 @@ +{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } +{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 1a05334bbc..e8e89dea12 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -574,6 +574,12 @@ <output-dir compare="Text">common/deltalake-file-nine</output-dir> </compilation-unit> </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-partitioned-file-read"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-partitioned-file-read</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="external-dataset"> <compilation-unit name="common/avro/avro-types/avro-map"> <placeholder name="adapter" value="S3" /> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java new file mode 100644 index 0000000000..c954f9bccd --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.filter; + +import org.apache.asterix.common.external.IExternalFilterEvaluator; +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; + +import io.delta.kernel.expressions.Expression; + +public class DeltaTableFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory { + private static final long serialVersionUID = 1L; + private final Expression filterExpression; + + public DeltaTableFilterEvaluatorFactory(Expression expression) { + this.filterExpression = expression; + } + + @Override + public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector) + throws HyracksDataException { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) { + return NoOpFilterValueEmbedder.INSTANCE; + } + + public Expression getFilterExpression() { + return filterExpression; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java new file mode 100644 index 0000000000..406a63011c --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.filter; + +import org.apache.asterix.common.external.IExternalFilterEvaluator; +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.IWarningCollector; + +public class NoOpDeltaTableFilterEvaluatorFactory extends DeltaTableFilterEvaluatorFactory { + public static final IExternalFilterEvaluatorFactory INSTANCE = new NoOpDeltaTableFilterEvaluatorFactory(); + private static final long serialVersionUID = 1L; + + private NoOpDeltaTableFilterEvaluatorFactory() { + super(null); + } + + @Override + public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector) { + return NoOpExternalFilterEvaluator.INSTANCE; + } + + @SuppressWarnings("unchecked") + @Override + public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) { + return NoOpFilterValueEmbedder.INSTANCE; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index e3313f70bb..3c998a5ae9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -39,6 +39,7 @@ import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.api.IExternalDataRuntimeContext; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory; import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.HDFSUtils; @@ -62,6 +63,8 @@ import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; @@ -124,19 +127,35 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> } catch (AsterixDeltaRuntimeException e) { throw e.getHyracksDataException(); } - Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + Expression filterExpression = ((DeltaTableFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); + Scan scan; + if (filterExpression != null) { + scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema) + .withFilter(engine, (Predicate) filterExpression).build(); + } else { + scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + } scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); List<Row> scanFiles = new ArrayList<>(); while (iter.hasNext()) { - FilteredColumnarBatch batch = iter.next(); + FilteredColumnarBatch batch = null; + try { + batch = iter.next(); + } catch (UnsupportedOperationException e) { + // Failed to apply expression due to type mismatch. We can skip the files where partitioned column + // type is different from the type of value provided in the predicate + LOGGER.info("Unsupported operation {}", e.getMessage()); + continue; + } CloseableIterator<Row> rowIter = batch.getRows(); while (rowIter.hasNext()) { Row row = rowIter.next(); scanFiles.add(row); } } + LOGGER.info("Number of files to scan: {}", scanFiles.size()); locationConstraints = getPartitions(appCtx); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 923dbd4547..7df9b477b2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -744,4 +744,9 @@ public class DatasetUtil { return dataset.getDatasetType() == DatasetType.INTERNAL && dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN; } + + public static boolean isDeltaTable(Dataset dataset) { + return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils + .isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index dda2111e2d..593929025f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.metadata.utils; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; @@ -40,6 +41,7 @@ import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.common.metadata.MetadataConstants; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory; import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.metadata.dataset.DatasetFormatInfo; @@ -49,6 +51,7 @@ import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder; import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder; +import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder; import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.IAObject; @@ -335,16 +338,25 @@ public class IndexUtil { public static IExternalFilterEvaluatorFactory createExternalFilterEvaluatorFactory(JobGenContext context, IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo projectionFiltrationInfo, Map<String, String> properties) throws AlgebricksException { - if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) { - return NoOpExternalFilterEvaluatorFactory.INSTANCE; + if (isDeltaTable(properties)) { + if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) { + return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE; + } else { + DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder( + (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv); + return builder.build(); + } + } else { + if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) { + return NoOpExternalFilterEvaluatorFactory.INSTANCE; + } else { + ExternalDataPrefix prefix = new ExternalDataPrefix(properties); + ExternalDatasetProjectionFiltrationInfo pfi = + (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo; + ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, typeEnv, prefix); + return build.build(); + } } - - ExternalDataPrefix prefix = new ExternalDataPrefix(properties); - ExternalDatasetProjectionFiltrationInfo pfi = - (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo; - ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, typeEnv, prefix); - - return build.build(); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java index ce392209d2..f41a116250 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java @@ -100,7 +100,7 @@ abstract class AbstractFilterBuilder { return argsEvalFactories; } - private IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression funcExpr) throws AlgebricksException { + protected IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression funcExpr) throws AlgebricksException { MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); IFunctionManager functionManager = metadataProvider.getFunctionManager(); FunctionIdentifier fnId = funcExpr.getFunctionIdentifier(); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java new file mode 100644 index 0000000000..73ed81e0c7 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils.filter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory; +import org.apache.asterix.om.base.ADate; +import org.apache.asterix.om.base.ADateTime; +import org.apache.asterix.om.base.ADouble; +import org.apache.asterix.om.base.AInt16; +import org.apache.asterix.om.base.AInt32; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AInt8; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.constants.AsterixConstantValue; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.logging.log4j.LogManager; + +import com.microsoft.azure.storage.core.Logger; + +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.expressions.Predicate; + +public class DeltaTableFilterBuilder extends AbstractFilterBuilder { + + private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger(); + + public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo, + JobGenContext context, IVariableTypeEnvironment typeEnv) { + super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getFilterExpression(), context, + typeEnv); + } + + public IExternalFilterEvaluatorFactory build() throws AlgebricksException { + Expression deltaTablePredicate = null; + if (filterExpression != null) { + try { + deltaTablePredicate = createExpression(filterExpression); + } catch (Exception e) { + LOGGER.error("Error creating DeltaTable filter expression ", e); + } + } + if (deltaTablePredicate != null && !(deltaTablePredicate instanceof Predicate)) { + deltaTablePredicate = null; + } + return new DeltaTableFilterEvaluatorFactory(deltaTablePredicate); + } + + protected Expression createExpression(ILogicalExpression expression) throws AlgebricksException { + if (filterPaths.containsKey(expression)) { + // Path expression, create a value accessor (i.e., a column reader) + return createColumnExpression(expression); + } else if (expression.getExpressionTag() == LogicalExpressionTag.CONSTANT) { + return createLiteralExpression(expression); + } else if (expression.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { + return handleFunction(expression); + } + + /* + * A variable expression: This should not happen as the provided filter expression is inlined. + * If a variable was encountered for some reason, it should only be the record variable. If the record variable + * was encountered, that means there's a missing value path the compiler didn't provide. + */ + throw new RuntimeException("Unsupported expression " + expression + ". the provided paths are: " + filterPaths); + } + + private Expression createLiteralExpression(ILogicalExpression expression) throws AlgebricksException { + ConstantExpression constExpr = (ConstantExpression) expression; + if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) { + throw new RuntimeException("Unsupported literal type: " + constExpr.getValue()); + } + AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue(); + switch (constantValue.getObject().getType().getTypeTag()) { + case STRING: + return Literal.ofString(((AString) constantValue.getObject()).getStringValue()); + case TINYINT: + return Literal.ofByte(((AInt8) constantValue.getObject()).getByteValue()); + case SMALLINT: + return Literal.ofShort(((AInt16) constantValue.getObject()).getShortValue()); + case INTEGER: + return Literal.ofInt(((AInt32) constantValue.getObject()).getIntegerValue()); + case BOOLEAN: + return Literal.ofBoolean(constantValue.isTrue()); + case BIGINT: + return Literal.ofLong(((AInt64) constantValue.getObject()).getLongValue()); + case DOUBLE: + return Literal.ofDouble(((ADouble) constantValue.getObject()).getDoubleValue()); + case DATE: + return Literal.ofDate(((ADate) constantValue.getObject()).getChrononTimeInDays()); + case DATETIME: + Long millis = ((ADateTime) constantValue.getObject()).getChrononTime(); + return Literal.ofTimestamp(TimeUnit.MILLISECONDS.toMicros(millis)); + default: + throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType()); + } + } + + @Override + protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) { + return null; + } + + private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException { + AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; + IFunctionDescriptor fd = resolveFunction(funcExpr); + List<Expression> args = handleArgs(funcExpr); + FunctionIdentifier fid = fd.getIdentifier(); + if (fid.equals(AlgebricksBuiltinFunctions.AND)) { + return new Predicate("AND", args); + } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) { + return new Predicate("OR", args); + } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) { + return new Predicate("=", args); + } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) { + return new Predicate(">=", args); + } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) { + return new Predicate(">", args); + } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) { + return new Predicate("<=", args); + } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) { + return new Predicate("<", args); + } else { + throw new RuntimeException("Unsupported function: " + funcExpr); + } + } + + private List<Expression> handleArgs(AbstractFunctionCallExpression funcExpr) throws AlgebricksException { + List<Mutable<ILogicalExpression>> args = funcExpr.getArguments(); + List<Expression> argsExpressions = new ArrayList<>(); + for (int i = 0; i < args.size(); i++) { + ILogicalExpression expr = args.get(i).getValue(); + Expression evalFactory = createExpression(expr); + argsExpressions.add(evalFactory); + } + return argsExpressions; + } + + protected Column createColumnExpression(ILogicalExpression expression) { + ARecordType path = filterPaths.get(expression); + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Unsupported expression: " + expression); + } + return new Column(path.getFieldNames()[0]); + } +}
