This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 50d076ab6ad325907690a2c115ee2cb1c45775c9
Author: Thomas Weise <[email protected]>
AuthorDate: Thu Aug 2 17:47:34 2018 -0700

    [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
    
    This closes #6482.
---
 .../kinesis/config/ConsumerConfigConstants.java    |  7 +-
 .../connectors/kinesis/proxy/KinesisProxy.java     | 53 ++++++++----
 .../connectors/kinesis/proxy/KinesisProxyTest.java | 94 ++++++++++++++++++++++
 3 files changed, 136 insertions(+), 18 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 48a0b3c..443b19e 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -92,6 +92,9 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
        /** The power constant for exponential backoff between each 
describeStream attempt. */
        public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT 
= "flink.stream.describe.backoff.expconst";
 
+       /** The maximum number of listShards attempts if we get a recoverable 
exception. */
+       public static final String LIST_SHARDS_RETRIES = 
"flink.list.shards.maxretries";
+
        /** The base backoff time between each listShards attempt. */
        public static final String LIST_SHARDS_BACKOFF_BASE = 
"flink.list.shards.backoff.base";
 
@@ -104,7 +107,7 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
        /** The maximum number of records to try to get each time we fetch 
records from a AWS Kinesis shard. */
        public static final String SHARD_GETRECORDS_MAX = 
"flink.shard.getrecords.maxrecordcount";
 
-       /** The maximum number of getRecords attempts if we get 
ProvisionedThroughputExceededException. */
+       /** The maximum number of getRecords attempts if we get a recoverable 
exception. */
        public static final String SHARD_GETRECORDS_RETRIES = 
"flink.shard.getrecords.maxretries";
 
        /** The base backoff time between getRecords attempts if we get a 
ProvisionedThroughputExceededException. */
@@ -161,6 +164,8 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
 
        public static final double 
DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
+       public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
        public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
        public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7e6a360..262181a 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -91,6 +91,9 @@ public class KinesisProxy implements KinesisProxyInterface {
        /** Exponential backoff power constant for the list shards operation. */
        private final double listShardsExpConstant;
 
+       /** Maximum retry attempts for the list shards operation. */
+       private final int listShardsMaxRetries;
+
        // 
------------------------------------------------------------------------
        //  getRecords() related performance settings
        // 
------------------------------------------------------------------------
@@ -104,8 +107,8 @@ public class KinesisProxy implements KinesisProxyInterface {
        /** Exponential backoff power constant for the get records operation. */
        private final double getRecordsExpConstant;
 
-       /** Maximum attempts for the get records operation. */
-       private final int getRecordsMaxAttempts;
+       /** Maximum retry attempts for the get records operation. */
+       private final int getRecordsMaxRetries;
 
        // 
------------------------------------------------------------------------
        //  getShardIterator() related performance settings
@@ -120,8 +123,8 @@ public class KinesisProxy implements KinesisProxyInterface {
        /** Exponential backoff power constant for the get shard iterator 
operation. */
        private final double getShardIteratorExpConstant;
 
-       /** Maximum attempts for the get shard iterator operation. */
-       private final int getShardIteratorMaxAttempts;
+       /** Maximum retry attempts for the get shard iterator operation. */
+       private final int getShardIteratorMaxRetries;
 
        /**
         * Create a new KinesisProxy based on the supplied configuration 
properties.
@@ -146,6 +149,10 @@ public class KinesisProxy implements KinesisProxyInterface 
{
                        configProps.getProperty(
                                
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
                                
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+               this.listShardsMaxRetries = Integer.valueOf(
+                       configProps.getProperty(
+                               ConsumerConfigConstants.LIST_SHARDS_RETRIES,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
 
                this.getRecordsBaseBackoffMillis = Long.valueOf(
                        configProps.getProperty(
@@ -159,7 +166,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
                                
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-               this.getRecordsMaxAttempts = Integer.valueOf(
+               this.getRecordsMaxRetries = Integer.valueOf(
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
                                
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
@@ -176,7 +183,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
                                
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-               this.getShardIteratorMaxAttempts = Integer.valueOf(
+               this.getShardIteratorMaxRetries = Integer.valueOf(
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
                                
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
@@ -217,14 +224,14 @@ public class KinesisProxy implements 
KinesisProxyInterface {
 
                GetRecordsResult getRecordsResult = null;
 
-               int attempt = 0;
-               while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
+               int retryCount = 0;
+               while (retryCount <= getRecordsMaxRetries && getRecordsResult 
== null) {
                        try {
                                getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
                        } catch (SdkClientException ex) {
                                if (isRecoverableSdkClientException(ex)) {
                                        long backoffMillis = fullJitterBackoff(
-                                               getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+                                               getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
                                        LOG.warn("Got recoverable 
SdkClientException. Backing off for "
                                                + backoffMillis + " millis (" + 
ex.getMessage() + ")");
                                        Thread.sleep(backoffMillis);
@@ -235,7 +242,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                }
 
                if (getRecordsResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxAttempts +
+                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxRetries +
                                " retry attempts returned 
ProvisionedThroughputExceededException.");
                }
 
@@ -292,14 +299,14 @@ public class KinesisProxy implements 
KinesisProxyInterface {
        private String getShardIterator(GetShardIteratorRequest 
getShardIteratorRequest) throws InterruptedException {
                GetShardIteratorResult getShardIteratorResult = null;
 
-               int attempt = 0;
-               while (attempt <= getShardIteratorMaxAttempts && 
getShardIteratorResult == null) {
+               int retryCount = 0;
+               while (retryCount <= getShardIteratorMaxRetries && 
getShardIteratorResult == null) {
                        try {
                                        getShardIteratorResult = 
kinesisClient.getShardIterator(getShardIteratorRequest);
                        } catch (AmazonServiceException ex) {
                                if (isRecoverableException(ex)) {
                                        long backoffMillis = fullJitterBackoff(
-                                               
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, 
getShardIteratorExpConstant, attempt++);
+                                               
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, 
getShardIteratorExpConstant, retryCount++);
                                        LOG.warn("Got recoverable 
AmazonServiceException. Backing off for "
                                                + backoffMillis + " millis (" + 
ex.getErrorMessage() + ")");
                                        Thread.sleep(backoffMillis);
@@ -310,7 +317,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                }
 
                if (getShardIteratorResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxAttempts +
+                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxRetries +
                                " retry attempts returned 
ProvisionedThroughputExceededException.");
                }
                return getShardIteratorResult.getShardIterator();
@@ -406,16 +413,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
                ListShardsResult listShardsResults = null;
 
                // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
-               int attemptCount = 0;
+               int retryCount = 0;
                // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
                // are taken up.
-               while (listShardsResults == null) { // retry until we get a 
result
+               while (retryCount <= listShardsMaxRetries && listShardsResults 
== null) { // retry until we get a result
                        try {
 
                                listShardsResults = 
kinesisClient.listShards(listShardsRequest);
                        } catch (LimitExceededException le) {
                                long backoffMillis = fullJitterBackoff(
-                                               listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+                                               listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
                                        LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
                                                                        + ". 
Backing off for " + backoffMillis + " millis.");
                                Thread.sleep(backoffMillis);
@@ -433,6 +440,18 @@ public class KinesisProxy implements KinesisProxyInterface 
{
                        } catch (ExpiredNextTokenException expiredToken) {
                                LOG.warn("List Shards has an expired token. 
Reusing the previous state.");
                                break;
+                       } catch (SdkClientException ex) {
+                               if (retryCount < listShardsMaxRetries && 
isRecoverableSdkClientException(ex)) {
+                                       long backoffMillis = fullJitterBackoff(
+                                               listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
+                                       LOG.warn("Got SdkClientException when 
listing shards from stream {}. Backing off for {} millis.",
+                                               streamName, backoffMillis);
+                                       Thread.sleep(backoffMillis);
+                               } else {
+                                       // propagate if retries exceeded or not 
recoverable
+                                       // (otherwise would return null result 
and keep trying forever)
+                                       throw ex;
+                               }
                        }
                }
                // Kinesalite (mock implementation of Kinesis) does not 
correctly exclude shards before
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 775ae4b..edf6ceb 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -27,16 +27,24 @@ import com.amazonaws.AmazonServiceException;
 import com.amazonaws.AmazonServiceException.ErrorType;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.AmazonKinesisException;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.ListShardsRequest;
 import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
@@ -54,6 +62,7 @@ import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInA
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -92,6 +101,37 @@ public class KinesisProxyTest {
        }
 
        @Test
+       public void testGetRecordsRetry() throws Exception {
+               Properties kinesisConsumerConfig = new Properties();
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+
+               final GetRecordsResult expectedResult = new GetRecordsResult();
+               MutableInt retries = new MutableInt();
+               final Throwable[] retriableExceptions = new Throwable[] {
+                       new AmazonKinesisException("mock"),
+               };
+
+               AmazonKinesisClient mockClient = 
mock(AmazonKinesisClient.class);
+               Mockito.when(mockClient.getRecords(any())).thenAnswer(new 
Answer<GetRecordsResult>() {
+                       @Override
+                       public GetRecordsResult answer(InvocationOnMock 
invocation) throws Throwable{
+                               if (retries.intValue() < 
retriableExceptions.length) {
+                                       retries.increment();
+                                       throw 
retriableExceptions[retries.intValue() - 1];
+                               }
+                               return expectedResult;
+                       }
+               });
+
+               KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+               Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
+
+               GetRecordsResult result = 
kinesisProxy.getRecords("fakeShardIterator", 1);
+               assertEquals(retriableExceptions.length, retries.intValue());
+               assertEquals(expectedResult, result);
+       }
+
+       @Test
        public void testGetShardList() throws Exception {
                List<String> shardIds =
                                Arrays.asList(
@@ -152,6 +192,60 @@ public class KinesisProxyTest {
        }
 
        @Test
+       public void testGetShardListRetry() throws Exception {
+               Properties kinesisConsumerConfig = new Properties();
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+
+               Shard shard = new Shard();
+               shard.setShardId("fake-shard-000000000000");
+               final ListShardsResult expectedResult = new ListShardsResult();
+               expectedResult.withShards(shard);
+
+               MutableInt exceptionCount = new MutableInt();
+               final Throwable[] retriableExceptions = new Throwable[]{
+                       new AmazonKinesisException("attempt1"),
+                       new AmazonKinesisException("attempt2"),
+               };
+
+               AmazonKinesisClient mockClient = 
mock(AmazonKinesisClient.class);
+               Mockito.when(mockClient.listShards(any())).thenAnswer(new 
Answer<ListShardsResult>() {
+
+                       @Override
+                       public ListShardsResult answer(InvocationOnMock 
invocation) throws Throwable {
+                               if (exceptionCount.intValue() < 
retriableExceptions.length) {
+                                       exceptionCount.increment();
+                                       throw 
retriableExceptions[exceptionCount.intValue() - 1];
+                               }
+                               return expectedResult;
+                       }
+               });
+
+               KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+               Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
+
+               HashMap<String, String> streamNames = new HashMap();
+               streamNames.put("fake-stream", null);
+               GetShardListResult result = 
kinesisProxy.getShardList(streamNames);
+               assertEquals(retriableExceptions.length, 
exceptionCount.intValue());
+               assertEquals(true, result.hasRetrievedShards());
+               assertEquals(shard.getShardId(), 
result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());
+
+               // test max attempt count exceeded
+               int maxRetries = 1;
+               exceptionCount.setValue(0);
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, 
String.valueOf(maxRetries));
+               kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+               Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
+               try {
+                       kinesisProxy.getShardList(streamNames);
+                       Assert.fail("exception expected");
+               } catch (SdkClientException ex) {
+                       assertEquals(retriableExceptions[maxRetries], ex);
+               }
+               assertEquals(maxRetries + 1, exceptionCount.intValue());
+       }
+
+       @Test
        public void testCustomConfigurationOverride() {
                Properties configProps = new Properties();
                configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");

Reply via email to