This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 4d2a73fcdfa07295b7ef90ddcf01a2e3c0777afc
Author: Hussain Towaileb <[email protected]>
AuthorDate: Wed Jun 9 19:59:23 2021 +0300

    [NO ISSUE][EXT]: Retry upon failure for S3 retryable errors
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Retry upon failure for S3 retryable errors.
    
    Change-Id: I639fd7d43c2a6c28b3cc4247bf9ac5d3a23a387e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11883
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../input/record/reader/aws/AwsS3InputStream.java  |  55 ++++++--
 .../record/reader/aws/AwsS3InputStreamFactory.java |   4 +-
 .../external/util/ExternalDataConstants.java       |   9 ++
 .../asterix/external/util/ExternalDataUtils.java   |   3 +-
 .../asterix/external/input/awss3/AwsS3Test.java    | 142 +++++++++++++++++++++
 5 files changed, 199 insertions(+), 14 deletions(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 8bd7a51..4d5288c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -23,6 +23,7 @@ import static 
org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -41,6 +42,7 @@ import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 
 public class AwsS3InputStream extends AbstractMultipleInputStream {
 
@@ -51,6 +53,7 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
     private final int bufferSize;
 
     private final S3Client s3Client;
+    private static final int MAX_RETRIES = 5; // We will retry 5 times in case 
of internal error from AWS S3 service
 
     // File fields
     private final List<String> filePaths;
@@ -84,17 +87,10 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
         GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
         GetObjectRequest getObjectRequest = 
getObjectBuilder.bucket(bucket).key(fileName).build();
 
-        // Have a reference to the S3 stream to ensure that if GZipInputStream 
causes an IOException because of reading
-        // the header, then the S3 stream gets closed in the close method
-        try {
-            in = s3Client.getObject(getObjectRequest);
-        } catch (NoSuchKeyException ex) {
-            LOGGER.debug(() -> "Key " + 
LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
-                    + getObjectRequest.bucket());
-            nextFileIndex++;
+        boolean isAvailableStream = doGetInputStream(getObjectRequest);
+        nextFileIndex++;
+        if (!isAvailableStream) {
             return advance();
-        } catch (SdkException ex) {
-            throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex.getMessage());
         }
 
         // Use gzip stream if needed
@@ -103,13 +99,50 @@ public class AwsS3InputStream extends 
AbstractMultipleInputStream {
         }
 
         // Current file ready, point to the next file
-        nextFileIndex++;
         if (notificationHandler != null) {
             notificationHandler.notifyNewSource();
         }
         return true;
     }
 
+    /**
+     * Get the input stream. If an error is encountered, depending on the 
error code, a retry might be favorable.
+     *
+     * @return true
+     */
+    private boolean doGetInputStream(GetObjectRequest request) throws 
RuntimeDataException {
+        int retries = 0;
+        while (retries < MAX_RETRIES) {
+            try {
+                in = s3Client.getObject(request);
+                break;
+            } catch (NoSuchKeyException ex) {
+                LOGGER.debug(() -> "Key " + 
LogRedactionUtil.userData(request.key()) + " was not found in bucket "
+                        + request.bucket());
+                return false;
+            } catch (S3Exception ex) {
+                if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) 
{
+                    throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+                }
+                LOGGER.debug(() -> "S3 retryable error: " + 
LogRedactionUtil.userData(ex.getMessage()));
+
+                // Backoff for 1 sec for the first 2 retries, and 2 seconds 
from there onward
+                try {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 
2));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            } catch (SdkException ex) {
+                throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+            }
+        }
+        return true;
+    }
+
+    private boolean shouldRetry(String errorCode, int currentRetry) {
+        return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+    }
+
     private S3Client buildAwsS3Client(Map<String, String> configuration) 
throws HyracksDataException {
         try {
             return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 0bc4c40..715c5df 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -136,7 +136,7 @@ public class AwsS3InputStreamFactory implements 
IInputStreamFactory {
             // New API is not implemented, try falling back to old API
             try {
                 // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) 
{
+                if 
(ex.awsErrorDetails().errorCode().equals(AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
                     filesOnly = oldApiListS3Objects(s3Client, container, 
matchersList, p);
                 } else {
                     throw ex;
@@ -312,7 +312,7 @@ public class AwsS3InputStreamFactory implements 
IInputStreamFactory {
         return smallest;
     }
 
-    private static class PartitionWorkLoadBasedOnSize implements Serializable {
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
         private static final long serialVersionUID = 1L;
         private final List<String> filePaths = new ArrayList<>();
         private long totalSize = 0;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1911083..0f4117b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -291,5 +291,14 @@ public class ExternalDataConstants {
         public static final String SECRET_ACCESS_KEY_FIELD_NAME = 
"secretAccessKey";
         public static final String CONTAINER_NAME_FIELD_NAME = "container";
         public static final String SERVICE_END_POINT_FIELD_NAME = 
"serviceEndpoint";
+
+        // AWS S3 specific error codes
+        public static final String ERROR_INTERNAL_ERROR = "InternalError";
+        public static final String ERROR_SLOW_DOWN = "SlowDown";
+        public static final String ERROR_METHOD_NOT_IMPLEMENTED = 
"NotImplemented";
+
+        public static boolean isRetryableError(String errorCode) {
+            return errorCode.equals(ERROR_INTERNAL_ERROR) || 
errorCode.equals(ERROR_SLOW_DOWN);
+        }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 363ec74..a6b8b47 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -710,7 +710,8 @@ public class ExternalDataUtils {
                 // Method not implemented, try falling back to old API
                 try {
                     // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                    if 
(ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                    if (ex.awsErrorDetails().errorCode()
+                            
.equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
                         useOldApi = true;
                         response = isBucketEmpty(s3Client, container, prefix, 
true);
                     } else {
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
new file mode 100644
index 0000000..f0a2223
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.awss3;
+
+import static 
org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
+import 
org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3Test {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testWorkloadDistribution() throws Exception {
+        AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory();
+
+        List<S3Object> s3Objects = new ArrayList<>();
+        final int partitionsCount = 3;
+
+        // Create S3 objects, 9 objects, on 3 partitions, they should be 600 
total size on each partition
+        S3Object.Builder builder = S3Object.builder();
+        s3Objects.add(builder.key("1.json").size(100L).build());
+        s3Objects.add(builder.key("2.json").size(100L).build());
+        s3Objects.add(builder.key("3.json").size(100L).build());
+        s3Objects.add(builder.key("4.json").size(200L).build());
+        s3Objects.add(builder.key("5.json").size(200L).build());
+        s3Objects.add(builder.key("6.json").size(200L).build());
+        s3Objects.add(builder.key("7.json").size(300L).build());
+        s3Objects.add(builder.key("8.json").size(300L).build());
+        s3Objects.add(builder.key("9.json").size(300L).build());
+
+        // invoke the distributeWorkLoad method
+        Method distributeWorkloadMethod =
+                
AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", 
List.class, int.class);
+        distributeWorkloadMethod.setAccessible(true);
+        distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount);
+
+        // get the partitionWorkLoadsBasedOnSize field and verify the result
+        Field distributeWorkloadField = 
AwsS3InputStreamFactory.class.getDeclaredField("partitionWorkLoadsBasedOnSize");
+        distributeWorkloadField.setAccessible(true);
+        List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize> workloads =
+                (List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize>) 
distributeWorkloadField.get(factory);
+
+        for (AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize workload : 
workloads) {
+            Assert.assertEquals(workload.getTotalSize(), 600);
+        }
+    }
+
+    @Test
+    public void s3InternalError() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with internal error code
+        AwsErrorDetails errorDetails = 
AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR)
+                .errorMessage("Internal Error from AWS").build();
+        S3Exception internalErrorEx = (S3Exception) 
S3Exception.builder().awsErrorDetails(errorDetails).build();
+        
Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = 
Mockito.mock(AwsS3InputStream.class);
+        Field s3ClientField = 
AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", 
GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, 
GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not internal error", ex.getCause() instanceof 
IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External 
source error. Internal Error from AWS"));
+        }
+    }
+
+    @Test
+    public void s3SlowDown() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with slow down error code
+        AwsErrorDetails errorDetails =
+                
AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown 
Error from AWS").build();
+        S3Exception slowDownEx = (S3Exception) 
S3Exception.builder().awsErrorDetails(errorDetails).build();
+        
Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = 
Mockito.mock(AwsS3InputStream.class);
+
+        // Set S3Client
+        Field s3ClientField = 
AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", 
GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, 
GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof 
IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External 
source error. SlowDown Error from AWS"));
+        }
+    }
+}

Reply via email to