This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 3311364a512812c5c75d8c864545fc761c456bbf Author: Peeyush Gupta <[email protected]> AuthorDate: Tue Jul 22 12:19:12 2025 -0700 [ASTERIXDB-3633][EXT] Support file splits while reading delta tables - user model changes: no - storage format changes: no - interface changes: no Ext-ref:MB-66319 Change-Id: Ie6daf3846064326bfe749ad15b508fe27d1721ca Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20135 Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- .../provider/SqlppCompilationProvider.java | 3 +- .../deltalake-file-nine.02.query.sqlpp | 23 +++ .../deltalake-multiple_file_read.02.query.sqlpp | 23 +++ .../deltalake-partitioned-file-read.03.query.sqlpp | 23 +++ .../api/cluster_state_1/cluster_state_1.1.regexadm | 1 + .../cluster_state_1_full.1.regexadm | 1 + .../cluster_state_1_less.1.regexadm | 1 + .../deltalake-file-nine/deltalake-file-nine.02.adm | 9 + .../deltalake-multiple-file-read/read-data.3.adm | 4 + .../read-data.3.adm | 3 + .../asterix/common/config/CompilerProperties.java | 8 +- .../input/record/reader/aws/delta/DeltaEngine.java | 47 ++++++ .../reader/aws/delta/DeltaFileRecordReader.java | 75 +++++++-- .../reader/aws/delta/DeltaParquetFileReader.java | 187 +++++++++++++++++++++ .../reader/aws/delta/DeltaParquetHandler.java | 80 +++++++++ .../reader/aws/delta/DeltaReaderFactory.java | 95 ++++++++++- .../reader/aws/delta/SerializableFileSplit.java | 45 +++++ .../metadata/declared/DatasetDataSource.java | 11 ++ 18 files changed, 614 insertions(+), 25 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java index 10574b1cb2..ca97aa6734 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java @@ -96,7 +96,8 @@ public class SqlppCompilationProvider implements ILangCompilationProvider { CompilerProperties.COMPILER_QUERY_PLAN_SHAPE_KEY, CompilerProperties.COMPILER_MIN_MEMORY_ALLOCATION_KEY, CompilerProperties.COMPILER_COLUMN_FILTER_KEY, CompilerProperties.COMPILER_BATCH_LOOKUP_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, - CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY, FuzzyUtils.SIM_FUNCTION_PROP_NAME, + CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY, + CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.query.sqlpp new file mode 100644 index 0000000000..3ee2964132 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + +SET `compiler.deltalake.filesplits` "true"; + SELECT element ds FROM DeltalakeDataset as ds order by id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.02.query.sqlpp new file mode 100644 index 0000000000..47156dd82a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.02.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + +SET `compiler.deltalake.filesplits` "true"; +SELECT element ds FROM DeltalakeDataset as ds order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp new file mode 100644 index 0000000000..a8fb52984b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + +SET `compiler.deltalake.filesplits` "true"; + SELECT element ds FROM DeltalakeDataset2 as ds where ds.timestamp=datetime("2025-01-01T00:01:20Z") order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 065435b55c..6c78c21b42 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -37,6 +37,7 @@ "compiler.cbo" : true, "compiler.column.filter" : true, "compiler.copy.to.write.buffer.size" : 8388608, + "compiler.deltalake.filesplits" : false, "compiler\.external\.field\.pushdown" : true, "compiler.forcejoinorder" : false, "compiler\.framesize" : 32768, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 28fa1f2b6c..c44e600a3b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -37,6 +37,7 @@ "compiler.cbo" : true, "compiler.column.filter" : true, "compiler.copy.to.write.buffer.size" : 8388608, + "compiler.deltalake.filesplits" : false, "compiler\.external\.field\.pushdown" : true, "compiler.forcejoinorder" : false, "compiler\.framesize" : 32768, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 8e0e9a6b00..10eab67e07 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -37,6 +37,7 @@ "compiler.cbo" : true, "compiler.column.filter" : true, "compiler.copy.to.write.buffer.size" : 8388608, + "compiler.deltalake.filesplits" : false, "compiler\.external\.field\.pushdown" : true, "compiler.forcejoinorder" : false, "compiler\.framesize" : 32768, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.adm new file mode 100644 index 0000000000..500f6a9970 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.02.adm @@ -0,0 +1,9 @@ +{ "id": 0, "name": "Cooper" } +{ "id": 1, "name": "Adam" } +{ "id": 2, "name": "Third" } +{ "id": 3, "name": "Fourth" } +{ "id": 4, "name": "Five" } +{ "id": 5, "name": "Six" } +{ "id": 6, "name": "Seven" } +{ "id": 7, "name": "Eight" } +{ "id": 8, "name": "Nine" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.3.adm new file mode 100644 index 0000000000..afae366266 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.3.adm @@ -0,0 +1,4 @@ +{ "id": 0, "name": "Cooper", "age": "42" } +{ "id": 1, "name": "Murphy", "age": "16" } +{ "id": 2, "name": "Mann", "age": "45" } +{ "id": 3, "name": "Brand", "age": "35" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.3.adm new file mode 100644 index 0000000000..c185038fa0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.3.adm @@ -0,0 +1,3 @@ +{ "id": 0, "name": "Order 1", "timestamp": datetime("2025-01-01T00:01:20.000") } +{ "id": 1, "name": "Order 2", "timestamp": datetime("2025-01-01T00:01:20.000") } +{ "id": 2, "name": "Order 3", "timestamp": datetime("2025-01-01T00:01:20.000") \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index a7e39c9b2a..2adfd7d9cc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -157,7 +157,8 @@ public class CompilerProperties extends AbstractProperties { getRangedIntegerType(0, Integer.MAX_VALUE), 128, "Maximum occurrences of a variable allowed in an expression for inlining"), - COMPILER_ORDERED_FIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS, "Enable/disable select order list"); + COMPILER_ORDERED_FIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS, "Enable/disable select order list"), + COMPILER_DELTALAKE_FILESPLITS(BOOLEAN, false, "Enable/disable delta lake file splits"); private final IOptionType type; private final Object defaultValue; @@ -245,6 +246,7 @@ public class CompilerProperties extends AbstractProperties { public static final String COMPILER_ORDERED_FIELDS_KEY = Option.COMPILER_ORDERED_FIELDS.ini(); public static final int COMPILER_PARALLELISM_AS_STORAGE = 0; + public static final String COMPILER_DELTALAKE_FILESPLITS_KEY = Option.COMPILER_DELTALAKE_FILESPLITS.ini(); public CompilerProperties(PropertiesAccessor accessor) { super(accessor); @@ -387,4 +389,8 @@ public class CompilerProperties extends AbstractProperties { public int getMaxVariableOccurrencesForInlining() { return accessor.getInt(Option.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING); } + + public boolean isDeltaLakeFileSplitsEnabled() { + return accessor.getBoolean(Option.COMPILER_DELTALAKE_FILESPLITS); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java new file mode 100644 index 0000000000..141040569b --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.defaults.engine.fileio.FileIO; +import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO; +import io.delta.kernel.engine.ParquetHandler; + +public class DeltaEngine extends DefaultEngine { + + private final FileIO fileIO; + private final Configuration conf; + + protected DeltaEngine(FileIO fileIO, Configuration conf) { + super(fileIO); + this.fileIO = fileIO; + this.conf = conf; + } + + public static DeltaEngine create(Configuration configuration) { + return new DeltaEngine(new HadoopFileIO(configuration), configuration); + } + + public ParquetHandler getParquetHandler() { + return new DeltaParquetHandler(this.fileIO, this.conf); + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index 121a76b2e8..a9d60d6244 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -19,11 +19,13 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.asterix.common.exceptions.ErrorCode; @@ -70,9 +72,12 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { private Row scanFile; private CloseableIterator<Row> rows; private Optional<Predicate> filterPredicate; + private final boolean usingSplits; + private List<Map.Entry<String, List<SerializableFileSplit>>> scanAndSplits; public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config, String filterExpressionStr) throws HyracksDataException { + this.usingSplits = false; JobConf conf = config.getConf(); this.engine = DefaultEngine.create(conf); this.scanFiles = new ArrayList<>(); @@ -88,19 +93,47 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { if (scanFiles.size() > 0) { this.fileIndex = 0; this.scanFile = scanFiles.get(0); - this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); - this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); - this.filterPredicate = PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr); - try { + initializeDataIterators(filterExpressionStr); + } + } + + public DeltaFileRecordReader(Map<String, List<SerializableFileSplit>> splitsMap, String serScanState, + ConfFactory config, String filterExpressionStr) throws HyracksDataException { + JobConf conf = config.getConf(); + this.usingSplits = true; + this.engine = DeltaEngine.create(conf); + scanAndSplits = new ArrayList<>(splitsMap.entrySet()); + this.scanState = RowSerDe.deserializeRowFromJson(serScanState); + this.physicalReadSchema = null; + this.physicalDataIter = null; + this.dataIter = null; + this.record = new GenericRecord<>(); + if (scanAndSplits.size() > 0) { + this.fileIndex = 0; + this.scanFile = RowSerDe.deserializeRowFromJson(scanAndSplits.get(0).getKey()); + initializeDataIterators(filterExpressionStr); + } + } + + private void initializeDataIterators(String filterExpressionStr) throws HyracksDataException { + this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); + this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); + this.filterPredicate = PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr); + try { + if (usingSplits) { + this.physicalDataIter = ((DeltaParquetHandler) engine.getParquetHandler()).readParquetSplits( + toCloseableIterator(scanAndSplits.get(fileIndex).getValue().iterator()), physicalReadSchema, + filterPredicate); + } else { this.physicalDataIter = engine.getParquetHandler() .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate); - this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); - if (dataIter.hasNext()) { - rows = dataIter.next().getRows(); - } - } catch (IOException e) { - throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } + this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); + if (dataIter.hasNext()) { + rows = dataIter.next().getRows(); + } + } catch (IOException e) { + throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } } @@ -112,6 +145,9 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { if (physicalDataIter != null) { physicalDataIter.close(); } + if (rows != null) { + rows.close(); + } } @Override @@ -121,14 +157,25 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { } else if (dataIter != null && dataIter.hasNext()) { rows = dataIter.next().getRows(); return this.hasNext(); - } else if (fileIndex < scanFiles.size() - 1) { + } else if ((!usingSplits && fileIndex < scanFiles.size() - 1) + || (usingSplits && fileIndex < scanAndSplits.size() - 1)) { fileIndex++; - scanFile = scanFiles.get(fileIndex); + if (usingSplits) { + scanFile = RowSerDe.deserializeRowFromJson(scanAndSplits.get(fileIndex).getKey()); + } else { + scanFile = scanFiles.get(fileIndex); + } fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); try { - physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus), - physicalReadSchema, filterPredicate); + if (usingSplits) { + this.physicalDataIter = ((DeltaParquetHandler) engine.getParquetHandler()).readParquetSplits( + toCloseableIterator(scanAndSplits.get(fileIndex).getValue().iterator()), physicalReadSchema, + filterPredicate); + } else { + physicalDataIter = engine.getParquetHandler().readParquetFiles( + singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate); + } dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); } catch (IOException e) { throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java new file mode 100644 index 0000000000..a01bea8df5 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static io.delta.kernel.defaults.internal.parquet.ParquetFilterUtils.toParquetFilter; +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.setFilterPredicate; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetRecordReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.engine.fileio.FileIO; +import io.delta.kernel.defaults.internal.parquet.ParquetFileReader; +import io.delta.kernel.exceptions.KernelEngineException; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +public class DeltaParquetFileReader extends ParquetFileReader { + + private final FileIO fileIO; + private final int maxBatchSize; + private final Configuration conf; + + public DeltaParquetFileReader(FileIO fileIO, Configuration conf) { + super(fileIO); + this.fileIO = requireNonNull(fileIO, "fileIO is null"); + this.conf = requireNonNull(conf, "conf is null"); + this.maxBatchSize = + fileIO.getConf("delta.kernel.default.parquet.reader.batch-size").map(Integer::valueOf).orElse(1024); + } + + public CloseableIterator<ColumnarBatch> read(InputSplit split, StructType schema, Optional<Predicate> predicate) { + + final boolean hasRowIndexCol = schema.indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME) >= 0 + && schema.get(StructField.METADATA_ROW_INDEX_COLUMN_NAME).isMetadataColumn(); + + return new CloseableIterator<ColumnarBatch>() { + private final BatchReadSupport readSupport = new BatchReadSupport(maxBatchSize, schema); + private ParquetRecordReader<Object> reader; + private boolean hasNotConsumedNextElement; + + @Override + public void close() throws IOException { + Utils.closeCloseables(reader); + } + + @Override + public boolean hasNext() { + initParquetReaderIfRequired(); + try { + if (hasNotConsumedNextElement) { + return true; + } + + hasNotConsumedNextElement = reader.nextKeyValue() && reader.getCurrentValue() != null; + return hasNotConsumedNextElement; + } catch (IOException | InterruptedException ex) { + throw new KernelEngineException("Error reading Parquet file: " + ((FileSplit) split).getPath(), ex); + } + } + + @Override + public ColumnarBatch next() { + if (!hasNotConsumedNextElement) { + throw new NoSuchElementException(); + } + int batchSize = 0; + do { + hasNotConsumedNextElement = false; + // hasNext reads to row to confirm there is a next element. + // get the row index only if required by the read schema + long rowIndex = 0; + try { + rowIndex = hasRowIndexCol ? reader.getCurrentRowIndex() : -1; + } catch (IOException e) { + throw new RuntimeException(e); + } + readSupport.finalizeCurrentRow(rowIndex); + batchSize++; + } while (batchSize < maxBatchSize && hasNext()); + + return readSupport.getDataAsColumnarBatch(batchSize); + } + + private void initParquetReaderIfRequired() { + if (reader == null) { + org.apache.parquet.hadoop.ParquetFileReader fileReader = null; + try { + Configuration confCopy = conf; + Path filePath = ((FileSplit) split).getPath(); + + // We need physical schema in order to construct a filter that can be + // pushed into the `parquet-mr` reader. For that reason read the footer + // in advance. + ParquetMetadata footer = + org.apache.parquet.hadoop.ParquetFileReader.readFooter(confCopy, filePath); + + MessageType parquetSchema = footer.getFileMetaData().getSchema(); + Optional<FilterPredicate> parquetPredicate = + predicate.flatMap(predicate -> toParquetFilter(parquetSchema, predicate)); + + if (parquetPredicate.isPresent()) { + // clone the configuration to avoid modifying the original one + confCopy = new Configuration(confCopy); + + setFilterPredicate(confCopy, parquetPredicate.get()); + // Disable the record level filtering as the `parquet-mr` evaluates + // the filter once the entire record has been materialized. Instead, + // we use the predicate to prune the row groups which is more efficient. + // In the future, we can consider using the record level filtering if a + // native Parquet reader is implemented in Kernel default module. + confCopy.set(RECORD_FILTERING_ENABLED, "false"); + confCopy.set(DICTIONARY_FILTERING_ENABLED, "false"); + confCopy.set(COLUMN_INDEX_FILTERING_ENABLED, "false"); + } + + // Pass the already read footer to the reader to avoid reading it again. + fileReader = new ParquetFileReaderWithFooter(filePath, confCopy, footer); + reader = new ParquetRecordReader<>(readSupport, ParquetInputFormat.getFilter(confCopy)); + reader.initialize((FileSplit) split, confCopy, Reporter.NULL); + } catch (IOException e) { + Utils.closeCloseablesSilently(fileReader, reader); + throw new KernelEngineException("Error reading Parquet file: " + ((FileSplit) split).getPath(), + e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }; + } + + /** + * Wrapper around {@link org.apache.parquet.hadoop.ParquetFileReader} to allow using the + * provided footer instead of reading it again. We read the footer in advance to construct a + * predicate for filtering rows. + */ + private static class ParquetFileReaderWithFooter extends org.apache.parquet.hadoop.ParquetFileReader { + private final ParquetMetadata footer; + + ParquetFileReaderWithFooter(Path filePath, Configuration configuration, ParquetMetadata footer) + throws IOException { + super(configuration, filePath, footer); + this.footer = requireNonNull(footer, "footer is null"); + } + + @Override + public ParquetMetadata getFooter() { + return footer; // return the footer passed in the constructor + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java new file mode 100644 index 0000000000..8c78999110 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.engine.DefaultParquetHandler; +import io.delta.kernel.defaults.engine.fileio.FileIO; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +public class DeltaParquetHandler extends DefaultParquetHandler { + + private final FileIO fileIO; + private final Configuration conf; + + public DeltaParquetHandler(FileIO fileIO, Configuration conf) { + super(fileIO); + this.fileIO = fileIO; + this.conf = conf; + } + + public CloseableIterator<ColumnarBatch> readParquetSplits(final CloseableIterator<SerializableFileSplit> splits, + final StructType physicalSchema, final Optional<Predicate> predicate) throws IOException { + return new CloseableIterator<>() { + private final DeltaParquetFileReader batchReader; + private CloseableIterator<ColumnarBatch> currentFileReader; + + { + this.batchReader = + new DeltaParquetFileReader(DeltaParquetHandler.this.fileIO, DeltaParquetHandler.this.conf); + } + + public void close() throws IOException { + Utils.closeCloseables(new AutoCloseable[] { this.currentFileReader, splits }); + } + + public boolean hasNext() { + if (this.currentFileReader != null && this.currentFileReader.hasNext()) { + return true; + } else { + Utils.closeCloseables(new AutoCloseable[] { this.currentFileReader }); + this.currentFileReader = null; + if (splits.hasNext()) { + this.currentFileReader = this.batchReader.read(splits.next(), physicalSchema, predicate); + return this.hasNext(); + } else { + return false; + } + } + } + + public ColumnarBatch next() { + return this.currentFileReader.next(); + } + }; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index 03285efbe9..9ac004386b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -25,12 +25,14 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -45,6 +47,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.HDFSUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.runtime.projection.FunctionCallInformation; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -55,6 +58,8 @@ import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.hdfs.dataflow.ConfFactory; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import io.delta.kernel.Scan; import io.delta.kernel.Snapshot; @@ -73,13 +78,14 @@ import io.delta.kernel.utils.FileStatus; public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private static final Logger LOGGER = LogManager.getLogger(); private transient AlgebricksAbsolutePartitionConstraint locationConstraints; private String scanState; protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); protected ConfFactory confFactory; private String filterExpressionStr; + private boolean usingSplits; public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() { return partitionWorkLoadsBasedOnSize; @@ -157,12 +163,43 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); scanFiles = getScanFiles(scan, engine); } + int numPartitions = getPartitionConstraint().getLocations().length; LOGGER.info("Number of delta table parquet data files to scan: {}", scanFiles.size()); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); - distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); + try { + usingSplits = getFileSplitsConfig(configuration, appCtx); + if (usingSplits) { + distributeSplits(scanFiles, conf, numPartitions); + } else { + distributeFiles(scanFiles, numPartitions); + } + } catch (IOException e) { + throw new RuntimeException(e); + } issueWarnings(warnings, warningCollector); } + private boolean getFileSplitsConfig(Map<String, String> configuration, ICcApplicationContext appCtx) { + String fileSplits = configuration.get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY); + return fileSplits != null ? Boolean.parseBoolean(fileSplits) + : appCtx.getCompilerProperties().isDeltaLakeFileSplitsEnabled(); + } + + private List<SerializableFileSplit> getInputSplits(Row file, JobConf conf) throws IOException { + List<SerializableFileSplit> inputSplits = new ArrayList<>(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(file); + Path parquetPath = new Path(fileStatus.getPath()); + try (ParquetFileReader reader = ParquetFileReader.open(conf, parquetPath)) { + List<BlockMetaData> blocks = reader.getFooter().getBlocks(); + for (BlockMetaData block : blocks) { + long start = block.getStartingPos(); + long length = block.getCompressedSize(); + inputSplits.add(new SerializableFileSplit(parquetPath, start, length, conf)); + } + } + return inputSplits; + } + private List<Row> getScanFiles(Scan scan, Engine engine) { List<Row> scanFiles = new ArrayList<>(); CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); @@ -195,10 +232,9 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> public void distributeFiles(List<Row> scanFiles, int partitionsCount) { PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); - // Prepare the workloads based on the number of partitions for (int i = 0; i < partitionsCount; i++) { - workloadQueue.add(new PartitionWorkLoadBasedOnSize()); + workloadQueue.add(new PartitionWorkLoadBasedOnSize(false)); } for (Row scanFileRow : scanFiles) { PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); @@ -209,12 +245,37 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> partitionWorkLoadsBasedOnSize.addAll(workloadQueue); } + private void distributeSplits(List<Row> scanFiles, JobConf conf, int partitionsCount) throws IOException { + PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, + Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); + // Prepare the workloads based on the number of partitions + for (int i = 0; i < partitionsCount; i++) { + workloadQueue.add(new PartitionWorkLoadBasedOnSize(true)); + } + for (Row scanFile : scanFiles) { + String scanFileJson = RowSerDe.serializeRowToJson(scanFile); + List<SerializableFileSplit> splits = getInputSplits(scanFile, conf); + // Distribute splits across partitions + for (int i = 0; i < splits.size(); i++) { + PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); + workload.addScanFileSplit(scanFileJson, splits.get(i)); + workloadQueue.add(workload); + } + } + partitionWorkLoadsBasedOnSize.addAll(workloadQueue); + } + @Override public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { try { int partition = context.getPartition(); - return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, - confFactory, filterExpressionStr); + if (usingSplits) { + return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFileSplits(), + scanState, confFactory, filterExpressionStr); + } else { + return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, + confFactory, filterExpressionStr); + } } catch (Exception e) { throw HyracksDataException.create(e); } @@ -231,29 +292,45 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> } public static class PartitionWorkLoadBasedOnSize implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final List<String> scanFiles = new ArrayList<>(); + private final Map<String, List<SerializableFileSplit>> scanFileSplits = new HashMap<>(); + private final boolean usingSplits; private long totalSize = 0; - public PartitionWorkLoadBasedOnSize() { + public PartitionWorkLoadBasedOnSize(boolean usingSplits) { + this.usingSplits = usingSplits; } public List<String> getScanFiles() { return scanFiles; } + public Map<String, List<SerializableFileSplit>> getScanFileSplits() { + return scanFileSplits; + } + + public boolean isUsingSplits() { + return usingSplits; + } + public void addScanFile(String scanFile, long size) { this.scanFiles.add(scanFile); this.totalSize += size; } + public void addScanFileSplit(String scanFile, SerializableFileSplit split) { + this.totalSize += split.getLength(); + this.scanFileSplits.computeIfAbsent(scanFile, k -> new ArrayList<>()).add(split); + } + public long getTotalSize() { return totalSize; } @Override public String toString() { - return "Files: " + scanFiles.size() + ", Total Size: " + totalSize; + return "Files: " + (usingSplits ? scanFileSplits.size() : scanFiles.size()) + ", Total Size: " + totalSize; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/SerializableFileSplit.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/SerializableFileSplit.java new file mode 100644 index 0000000000..445cb15e4e --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/SerializableFileSplit.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +public class SerializableFileSplit extends FileSplit implements Serializable { + + private static final long serialVersionUID = 1L; + + public SerializableFileSplit(Path file, long start, long length, JobConf conf) { + super(file, start, length, conf); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + this.write(out); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.readFields(in); + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index b961421db2..f7bf0f6e67 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.common.metadata.DataverseName; @@ -139,6 +140,7 @@ public class DatasetDataSource extends DataSource { addExternalProjectionInfo(projectionFiltrationInfo, edd.getProperties()); properties = addSubPath(externalDataSource.getProperties(), properties); properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize)); + setExternalCollectionCompilerProperties(metadataProvider, properties); IExternalFilterEvaluatorFactory filterEvaluatorFactory = metadataProvider .createExternalFilterEvaluatorFactory(context, typeEnv, projectionFiltrationInfo, properties); ITypedAdapterFactory adapterFactory = @@ -223,4 +225,13 @@ public class DatasetDataSource extends DataSource { public boolean isScanAccessPathALeaf() { return dataset.getDatasetType() == DatasetType.EXTERNAL; } + + private void setExternalCollectionCompilerProperties(MetadataProvider metadataProvider, + Map<String, String> configuration) { + String fileSplits = + (String) metadataProvider.getConfig().get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY); + if (fileSplits != null) { + configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, fileSplits); + } + } }
