This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 49929014b3c8e69d37d0eb21e7b9bbb459663de2 Author: Peeyush Gupta <[email protected]> AuthorDate: Tue Nov 19 09:54:06 2024 -0800 [ASTERIXDB-3503][EXT] Improve logic of distributing files to partitions - user model changes: no - storage format changes: no - interface changes: no Details: With this patch the files to scan are distributed among different partitions such that the total size of files to read is distributed fairly among different partitions. Ext-ref: MB-63840 Change-Id: Id2c56b707f6fd5f4cb40f4576dc12ceb2e61a193 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19093 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> --- .../aws/delta/AsterixDeltaRuntimeException.java | 34 ++++++++++ .../aws/delta/AsterixTypeToDeltaTypeVisitor.java | 2 +- .../reader/aws/delta/AwsS3DeltaReaderFactory.java | 77 ++++++++++++++++------ .../asterix/external/util/ExternalDataUtils.java | 2 +- 4 files changed, 94 insertions(+), 21 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java new file mode 100644 index 0000000000..9dd2000925 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class AsterixDeltaRuntimeException extends RuntimeException { + private static final long serialVersionUID = 1L; + private final HyracksDataException hyracksDataException; + + public AsterixDeltaRuntimeException(HyracksDataException e) { + this.hyracksDataException = e; + } + + public HyracksDataException getHyracksDataException() { + return hyracksDataException; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java index 2c86132490..3ca704ff40 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java @@ -124,7 +124,7 @@ public class AsterixTypeToDeltaTypeVisitor implements IATypeVisitor<DataType, Da try { actualType = DeltaDataParser.getTypeTag(type, false, context); } catch (HyracksDataException e) { - throw new RuntimeException(e); + throw new AsterixDeltaRuntimeException(e); } ATypeTag expectedType = node.getTypeTag(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java index 5ce2c78152..9d93fcea1c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -21,11 +21,13 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -58,8 +60,10 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { @@ -68,10 +72,9 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3); private static final Logger LOGGER = LogManager.getLogger(); private transient AlgebricksAbsolutePartitionConstraint locationConstraints; - private Map<Integer, List<String>> schedule; private String scanState; private Map<String, String> configuration; - private List<String> scanFiles; + protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); @Override public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { @@ -120,28 +123,30 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap); } catch (IOException e) { throw new RuntimeException(e); + } catch (AsterixDeltaRuntimeException e) { + throw e.getHyracksDataException(); } Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); - scanFiles = new ArrayList<>(); + List<Row> scanFiles = new ArrayList<>(); while (iter.hasNext()) { FilteredColumnarBatch batch = iter.next(); CloseableIterator<Row> rowIter = batch.getRows(); while (rowIter.hasNext()) { Row row = rowIter.next(); - scanFiles.add(RowSerDe.serializeRowToJson(row)); + scanFiles.add(row); } } - locationConstraints = configureLocationConstraints(appCtx); + locationConstraints = configureLocationConstraints(appCtx, scanFiles); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); - distributeFiles(); + distributeFiles(scanFiles); issueWarnings(warnings, warningCollector); } private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) { - if (!warnings.isEmpty() && warningCollector.shouldWarn()) { + if (!warnings.isEmpty()) { for (Warning warning : warnings) { if (warningCollector.shouldWarn()) { warningCollector.warn(warning); @@ -151,7 +156,8 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { warnings.clear(); } - private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) { + private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx, + List<Row> scanFiles) { IClusterStateManager csm = appCtx.getClusterStateManager(); String[] locations = csm.getClusterLocations().getLocations(); @@ -168,24 +174,30 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { return new AlgebricksAbsolutePartitionConstraint(locations); } - private void distributeFiles() { - final int numComputePartitions = getPartitionConstraint().getLocations().length; - schedule = new HashMap<>(); - for (int i = 0; i < numComputePartitions; i++) { - schedule.put(i, new ArrayList<>()); + private void distributeFiles(List<Row> scanFiles) { + final int partitionsCount = getPartitionConstraint().getLocations().length; + PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, + Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); + + // Prepare the workloads based on the number of partitions + for (int i = 0; i < partitionsCount; i++) { + workloadQueue.add(new PartitionWorkLoadBasedOnSize()); } - int i = 0; - for (String scanFile : scanFiles) { - schedule.get(i).add(scanFile); - i = (i + 1) % numComputePartitions; + for (Row scanFileRow : scanFiles) { + PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize()); + workloadQueue.add(workload); } + partitionWorkLoadsBasedOnSize.addAll(workloadQueue); } @Override public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { try { int partition = context.getPartition(); - return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context); + return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, + configuration, context); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -206,4 +218,31 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { return Collections.singleton(ExternalDataConstants.FORMAT_DELTA); } + public static class PartitionWorkLoadBasedOnSize implements Serializable { + private static final long serialVersionUID = 1L; + private final List<String> scanFiles = new ArrayList<>(); + private long totalSize = 0; + + public PartitionWorkLoadBasedOnSize() { + } + + public List<String> getScanFiles() { + return scanFiles; + } + + public void addScanFile(String scanFile, long size) { + this.scanFiles.add(scanFile); + this.totalSize += size; + } + + public long getTotalSize() { + return totalSize; + } + + @Override + public String toString() { + return "Files: " + scanFiles.size() + ", Total Size: " + totalSize; + } + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 50e72ed047..b3118702a1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -921,7 +921,7 @@ public class ExternalDataUtils { } public static boolean supportsPushdown(Map<String, String> properties) { - //Currently, only Apache Parquet format is supported + //Currently, only Apache Parquet/Delta table format is supported return isParquetFormat(properties) || isDeltaTable(properties); }
