This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 4d2a73fcdfa07295b7ef90ddcf01a2e3c0777afc Author: Hussain Towaileb <[email protected]> AuthorDate: Wed Jun 9 19:59:23 2021 +0300 [NO ISSUE][EXT]: Retry upon failure for S3 retryable errors - user model changes: no - storage format changes: no - interface changes: no Details: - Retry upon failure for S3 retryable errors. Change-Id: I639fd7d43c2a6c28b3cc4247bf9ac5d3a23a387e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11883 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../input/record/reader/aws/AwsS3InputStream.java | 55 ++++++-- .../record/reader/aws/AwsS3InputStreamFactory.java | 4 +- .../external/util/ExternalDataConstants.java | 9 ++ .../asterix/external/util/ExternalDataUtils.java | 3 +- .../asterix/external/input/awss3/AwsS3Test.java | 142 +++++++++++++++++++++ 5 files changed, 199 insertions(+), 14 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index 8bd7a51..4d5288c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -23,6 +23,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import org.apache.asterix.common.exceptions.CompilationException; @@ -41,6 +42,7 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; public class AwsS3InputStream extends AbstractMultipleInputStream { @@ -51,6 +53,7 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { private final int bufferSize; private final S3Client s3Client; + private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service // File fields private final List<String> filePaths; @@ -84,17 +87,10 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder(); GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(fileName).build(); - // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading - // the header, then the S3 stream gets closed in the close method - try { - in = s3Client.getObject(getObjectRequest); - } catch (NoSuchKeyException ex) { - LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket " - + getObjectRequest.bucket()); - nextFileIndex++; + boolean isAvailableStream = doGetInputStream(getObjectRequest); + nextFileIndex++; + if (!isAvailableStream) { return advance(); - } catch (SdkException ex) { - throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); } // Use gzip stream if needed @@ -103,13 +99,50 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { } // Current file ready, point to the next file - nextFileIndex++; if (notificationHandler != null) { notificationHandler.notifyNewSource(); } return true; } + /** + * Get the input stream. If an error is encountered, depending on the error code, a retry might be favorable. + * + * @return true + */ + private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException { + int retries = 0; + while (retries < MAX_RETRIES) { + try { + in = s3Client.getObject(request); + break; + } catch (NoSuchKeyException ex) { + LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket " + + request.bucket()); + return false; + } catch (S3Exception ex) { + if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) { + throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } + LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage())); + + // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 2)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } catch (SdkException ex) { + throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } + } + return true; + } + + private boolean shouldRetry(String errorCode, int currentRetry) { + return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode); + } + private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException { try { return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 0bc4c40..715c5df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -136,7 +136,7 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { // New API is not implemented, try falling back to old API try { // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) { + if (ex.awsErrorDetails().errorCode().equals(AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) { filesOnly = oldApiListS3Objects(s3Client, container, matchersList, p); } else { throw ex; @@ -312,7 +312,7 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { return smallest; } - private static class PartitionWorkLoadBasedOnSize implements Serializable { + public static class PartitionWorkLoadBasedOnSize implements Serializable { private static final long serialVersionUID = 1L; private final List<String> filePaths = new ArrayList<>(); private long totalSize = 0; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 1911083..0f4117b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -291,5 +291,14 @@ public class ExternalDataConstants { public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; public static final String CONTAINER_NAME_FIELD_NAME = "container"; public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; + + // AWS S3 specific error codes + public static final String ERROR_INTERNAL_ERROR = "InternalError"; + public static final String ERROR_SLOW_DOWN = "SlowDown"; + public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented"; + + public static boolean isRetryableError(String errorCode) { + return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN); + } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 363ec74..a6b8b47 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -710,7 +710,8 @@ public class ExternalDataUtils { // Method not implemented, try falling back to old API try { // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) { + if (ex.awsErrorDetails().errorCode() + .equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) { useOldApi = true; response = isBucketEmpty(s3Client, container, prefix, true); } else { diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java new file mode 100644 index 0000000..f0a2223 --- /dev/null +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.awss3; + +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR; +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream; +import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory; +import org.apache.hyracks.api.exceptions.IFormattedException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class AwsS3Test { + + @SuppressWarnings("unchecked") + @Test + public void testWorkloadDistribution() throws Exception { + AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory(); + + List<S3Object> s3Objects = new ArrayList<>(); + final int partitionsCount = 3; + + // Create S3 objects, 9 objects, on 3 partitions, they should be 600 total size on each partition + S3Object.Builder builder = S3Object.builder(); + s3Objects.add(builder.key("1.json").size(100L).build()); + s3Objects.add(builder.key("2.json").size(100L).build()); + s3Objects.add(builder.key("3.json").size(100L).build()); + s3Objects.add(builder.key("4.json").size(200L).build()); + s3Objects.add(builder.key("5.json").size(200L).build()); + s3Objects.add(builder.key("6.json").size(200L).build()); + s3Objects.add(builder.key("7.json").size(300L).build()); + s3Objects.add(builder.key("8.json").size(300L).build()); + s3Objects.add(builder.key("9.json").size(300L).build()); + + // invoke the distributeWorkLoad method + Method distributeWorkloadMethod = + AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", List.class, int.class); + distributeWorkloadMethod.setAccessible(true); + distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount); + + // get the partitionWorkLoadsBasedOnSize field and verify the result + Field distributeWorkloadField = AwsS3InputStreamFactory.class.getDeclaredField("partitionWorkLoadsBasedOnSize"); + distributeWorkloadField.setAccessible(true); + List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize> workloads = + (List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize>) distributeWorkloadField.get(factory); + + for (AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize workload : workloads) { + Assert.assertEquals(workload.getTotalSize(), 600); + } + } + + @Test + public void s3InternalError() throws Exception { + // S3Client mock + S3Client s3ClientMock = Mockito.mock(S3Client.class); + + // Prepare S3Exception with internal error code + AwsErrorDetails errorDetails = AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR) + .errorMessage("Internal Error from AWS").build(); + S3Exception internalErrorEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build(); + Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx); + + // Set S3Client mock + AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class); + Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client"); + s3ClientField.setAccessible(true); + s3ClientField.set(inputStreamMock, s3ClientMock); + + // doGetInputStream method + Method doGetInputStreamMethod = + AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class); + doGetInputStreamMethod.setAccessible(true); + + try { + doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build()); + } catch (Exception ex) { + Assert.assertTrue("Not internal error", ex.getCause() instanceof IFormattedException + && ex.getCause().toString().contains("ASX1108: External source error. Internal Error from AWS")); + } + } + + @Test + public void s3SlowDown() throws Exception { + // S3Client mock + S3Client s3ClientMock = Mockito.mock(S3Client.class); + + // Prepare S3Exception with slow down error code + AwsErrorDetails errorDetails = + AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown Error from AWS").build(); + S3Exception slowDownEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build(); + Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx); + + // Set S3Client mock + AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class); + + // Set S3Client + Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client"); + s3ClientField.setAccessible(true); + s3ClientField.set(inputStreamMock, s3ClientMock); + + // doGetInputStream method + Method doGetInputStreamMethod = + AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class); + doGetInputStreamMethod.setAccessible(true); + + try { + doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build()); + } catch (Exception ex) { + Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof IFormattedException + && ex.getCause().toString().contains("ASX1108: External source error. SlowDown Error from AWS")); + } + } +}
