Repository: flink
Updated Branches:
  refs/heads/release-1.2 be09143cd -> 3b5882afa


[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis 
Streaming Connector

This closes #3078.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5882af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5882af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5882af

Branch: refs/heads/release-1.2
Commit: 3b5882afa0c5e60cfb39c2b098fcae2b112a5990
Parents: be09143c
Author: Scott Kidder <[email protected]>
Authored: Fri Dec 16 08:46:54 2016 -0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Jan 20 16:48:53 2017 +0100

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  | 58 ++++++++++++++----
 .../kinesis/proxy/KinesisProxyTest.java         | 63 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 9ffc8e6..0b0fccf 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -193,12 +194,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
                while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
                        try {
                                getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
-                       } catch (ProvisionedThroughputExceededException ex) {
-                               long backoffMillis = fullJitterBackoff(
-                                       getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-                               LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-                                       + backoffMillis + " millis.");
-                               Thread.sleep(backoffMillis);
+                       } catch (AmazonServiceException ex) {
+                               if (isRecoverableException(ex)) {
+                                       long backoffMillis = fullJitterBackoff(
+                                               getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+                                       LOG.warn("Got recoverable 
AmazonServiceException. Backing off for "
+                                               + backoffMillis + " millis (" + 
ex.getErrorMessage() + ")");
+                                       Thread.sleep(backoffMillis);
+                               } else {
+                                       throw ex;
+                               }
                        }
                }
 
@@ -237,12 +242,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
                        try {
                                getShardIteratorResult =
                                        
kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
-                       } catch (ProvisionedThroughputExceededException ex) {
-                               long backoffMillis = fullJitterBackoff(
-                                       getShardIteratorBaseBackoffMillis, 
getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
-                               LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-                                       + backoffMillis + " millis.");
-                               Thread.sleep(backoffMillis);
+                       } catch (AmazonServiceException ex) {
+                               if (isRecoverableException(ex)) {
+                                       long backoffMillis = fullJitterBackoff(
+                                               
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, 
getShardIteratorExpConstant, attempt++);
+                                       LOG.warn("Got recoverable 
AmazonServiceException. Backing off for "
+                                               + backoffMillis + " millis (" + 
ex.getErrorMessage() + ")");
+                                       Thread.sleep(backoffMillis);
+                               } else {
+                                       throw ex;
+                               }
                        }
                }
 
@@ -253,6 +262,29 @@ public class KinesisProxy implements KinesisProxyInterface 
{
                return getShardIteratorResult.getShardIterator();
        }
 
+       /**
+        * Determines whether the exception is recoverable using 
exponential-backoff.
+        * 
+        * @param ex Exception to inspect
+        * @return <code>true</code> if the exception can be recovered from, 
else
+        *         <code>false</code>
+        */
+       protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+               if (ex.getErrorType() == null) {
+                       return false;
+               }
+
+               switch (ex.getErrorType()) {
+                       case Client:
+                               return ex instanceof 
ProvisionedThroughputExceededException;
+                       case Service:
+                       case Unknown:
+                               return true;
+                       default:
+                               return false;
+               }
+       }
+
        private List<KinesisStreamShard> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
                List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
new file mode 100644
index 0000000..86202c5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the {@link KinesisProxy} class.
+ */
+public class KinesisProxyTest {
+
+       @Test
+       public void 
testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
+               final ProvisionedThroughputExceededException ex = new 
ProvisionedThroughputExceededException("asdf");
+               ex.setErrorType(ErrorType.Client);
+               assertTrue(KinesisProxy.isRecoverableException(ex));
+       }
+
+       @Test
+       public void testIsRecoverableExceptionWithServiceException() {
+               final AmazonServiceException ex = new 
AmazonServiceException("asdf");
+               ex.setErrorType(ErrorType.Service);
+               assertTrue(KinesisProxy.isRecoverableException(ex));
+       }
+
+       @Test
+       public void testIsRecoverableExceptionWithExpiredIteratorException() {
+               final ExpiredIteratorException ex = new 
ExpiredIteratorException("asdf");
+               ex.setErrorType(ErrorType.Client);
+               assertFalse(KinesisProxy.isRecoverableException(ex));
+       }
+
+       @Test
+       public void testIsRecoverableExceptionWithNullErrorType() {
+               final AmazonServiceException ex = new 
AmazonServiceException("asdf");
+               ex.setErrorType(null);
+               assertFalse(KinesisProxy.isRecoverableException(ex));
+       }
+
+}

Reply via email to