Repository: flink Updated Branches: refs/heads/release-1.2 be09143cd -> 3b5882afa
[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector This closes #3078. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5882af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5882af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5882af Branch: refs/heads/release-1.2 Commit: 3b5882afa0c5e60cfb39c2b098fcae2b112a5990 Parents: be09143c Author: Scott Kidder <[email protected]> Authored: Fri Dec 16 08:46:54 2016 -0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Jan 20 16:48:53 2017 +0100 ---------------------------------------------------------------------- .../connectors/kinesis/proxy/KinesisProxy.java | 58 ++++++++++++++---- .../kinesis/proxy/KinesisProxyTest.java | 63 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 9ffc8e6..0b0fccf 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,14 +17,15 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; @@ -193,12 +194,16 @@ public class KinesisProxy implements KinesisProxyInterface { while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); + } catch (AmazonServiceException ex) { + if (isRecoverableException(ex)) { + long backoffMillis = fullJitterBackoff( + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); + LOG.warn("Got recoverable AmazonServiceException. Backing off for " + + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); + Thread.sleep(backoffMillis); + } else { + throw ex; + } } } @@ -237,12 +242,16 @@ public class KinesisProxy implements KinesisProxyInterface { try { getShardIteratorResult = kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); + } catch (AmazonServiceException ex) { + if (isRecoverableException(ex)) { + long backoffMillis = fullJitterBackoff( + getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); + LOG.warn("Got recoverable AmazonServiceException. Backing off for " + + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); + Thread.sleep(backoffMillis); + } else { + throw ex; + } } } @@ -253,6 +262,29 @@ public class KinesisProxy implements KinesisProxyInterface { return getShardIteratorResult.getShardIterator(); } + /** + * Determines whether the exception is recoverable using exponential-backoff. + * + * @param ex Exception to inspect + * @return <code>true</code> if the exception can be recovered from, else + * <code>false</code> + */ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { + case Client: + return ex instanceof ProvisionedThroughputExceededException; + case Service: + case Unknown: + return true; + default: + return false; + } + } + private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java new file mode 100644 index 0000000..86202c5 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the {@link KinesisProxy} class. + */ +public class KinesisProxyTest { + + @Test + public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() { + final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf"); + ex.setErrorType(ErrorType.Client); + assertTrue(KinesisProxy.isRecoverableException(ex)); + } + + @Test + public void testIsRecoverableExceptionWithServiceException() { + final AmazonServiceException ex = new AmazonServiceException("asdf"); + ex.setErrorType(ErrorType.Service); + assertTrue(KinesisProxy.isRecoverableException(ex)); + } + + @Test + public void testIsRecoverableExceptionWithExpiredIteratorException() { + final ExpiredIteratorException ex = new ExpiredIteratorException("asdf"); + ex.setErrorType(ErrorType.Client); + assertFalse(KinesisProxy.isRecoverableException(ex)); + } + + @Test + public void testIsRecoverableExceptionWithNullErrorType() { + final AmazonServiceException ex = new AmazonServiceException("asdf"); + ex.setErrorType(null); + assertFalse(KinesisProxy.isRecoverableException(ex)); + } + +}
