This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d3827662ad11193341a12d931bf2eb8188f07e59 Author: Hussain Towaileb <[email protected]> AuthorDate: Wed Mar 31 18:10:49 2021 +0300 [NO ISSUE][EXT]: Improve workload distribution logic - user model changes: no - storage format changes: no - interface changes: no Details: - Improve the workload distribution logic. Change-Id: I191dc87850c1812b49831dd3c78bc7a22cc5b931 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10763 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../AbstractExternalInputStreamFactory.java | 21 ------- .../record/reader/aws/AwsS3InputStreamFactory.java | 13 +++- .../reader/azure/AzureBlobInputStreamFactory.java | 15 +++-- .../input/record/reader/awss3/AwsS3Test.java | 73 ++++++++++++++++++++++ 4 files changed, 94 insertions(+), 28 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java index 0b215a0..2a063bf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java @@ -82,27 +82,6 @@ public abstract class AbstractExternalInputStreamFactory implements IInputStream ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations(); } - /** - * Finds the smallest workload and returns it - * - * @return the smallest workload - */ - protected PartitionWorkLoadBasedOnSize getSmallestWorkLoad() { - PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0); - for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) { - // If the current total size is 0, add the file directly as this is a first time partition - if (partition.getTotalSize() == 0) { - smallest = partition; - break; - } - if (partition.getTotalSize() < smallest.getTotalSize()) { - smallest = partition; - } - } - - return smallest; - } - protected IncludeExcludeMatcher getIncludeExcludeMatchers() throws CompilationException { // Get and compile the patterns for include/exclude if provided List<Matcher> includeMatchers = new ArrayList<>(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index a1c577a..747fcca 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -19,8 +19,10 @@ package org.apache.asterix.external.input.record.reader.aws; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.function.BiPredicate; import java.util.regex.Matcher; @@ -216,14 +218,19 @@ public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory * @param partitionsCount Partitions count */ private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) { + PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, + Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); + // Prepare the workloads based on the number of partitions for (int i = 0; i < partitionsCount; i++) { - partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize()); + workloadQueue.add(new PartitionWorkLoadBasedOnSize()); } for (S3Object object : fileObjects) { - PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad(); - smallest.addFilePath(object.key(), object.size()); + PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); + workload.addFilePath(object.key(), object.size()); + workloadQueue.add(workload); } + partitionWorkLoadsBasedOnSize.addAll(workloadQueue); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java index ca064b1..803694a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java @@ -19,8 +19,10 @@ package org.apache.asterix.external.input.record.reader.azure; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.function.BiPredicate; import java.util.regex.Matcher; @@ -131,14 +133,19 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact * @param partitionsCount Partitions count */ private void distributeWorkLoad(List<BlobItem> items, int partitionsCount) { + PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, + Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); + // Prepare the workloads based on the number of partitions for (int i = 0; i < partitionsCount; i++) { - partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize()); + workloadQueue.add(new PartitionWorkLoadBasedOnSize()); } - for (BlobItem item : items) { - PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad(); - smallest.addFilePath(item.getName(), item.getProperties().getContentLength()); + for (BlobItem object : items) { + PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); + workload.addFilePath(object.getName(), object.getProperties().getContentLength()); + workloadQueue.add(workload); } + partitionWorkLoadsBasedOnSize.addAll(workloadQueue); } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java new file mode 100644 index 0000000..9cdeb16 --- /dev/null +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.awss3; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; +import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory; +import org.junit.Assert; +import org.junit.Test; + +import software.amazon.awssdk.services.s3.model.S3Object; + +public class AwsS3Test { + + @SuppressWarnings("unchecked") + @Test + public void testWorkloadDistribution() throws Exception { + AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory(); + + List<S3Object> s3Objects = new ArrayList<>(); + final int partitionsCount = 3; + + // Create S3 objects, 9 objects, on 3 partitions, they should be 600 total size on each partition + S3Object.Builder builder = S3Object.builder(); + s3Objects.add(builder.key("1.json").size(100L).build()); + s3Objects.add(builder.key("2.json").size(100L).build()); + s3Objects.add(builder.key("3.json").size(100L).build()); + s3Objects.add(builder.key("4.json").size(200L).build()); + s3Objects.add(builder.key("5.json").size(200L).build()); + s3Objects.add(builder.key("6.json").size(200L).build()); + s3Objects.add(builder.key("7.json").size(300L).build()); + s3Objects.add(builder.key("8.json").size(300L).build()); + s3Objects.add(builder.key("9.json").size(300L).build()); + + // invoke the distributeWorkLoad method + Method distributeWorkloadMethod = + AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", List.class, int.class); + distributeWorkloadMethod.setAccessible(true); + distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount); + + // get the partitionWorkLoadsBasedOnSize field and verify the result + Field distributeWorkloadField = + AwsS3InputStreamFactory.class.getSuperclass().getDeclaredField("partitionWorkLoadsBasedOnSize"); + distributeWorkloadField.setAccessible(true); + List<AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize> workloads = + (List<AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize>) distributeWorkloadField + .get(factory); + + for (AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize workload : workloads) { + Assert.assertEquals(workload.getTotalSize(), 600); + } + } +}
