[
https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583372#comment-16583372
]
ASF GitHub Bot commented on FLINK-10020:
----------------------------------------
asfgit closed pull request #6482: [FLINK-10020] [kinesis] Support recoverable
exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 48a0b3c9559..443b19ec382 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -92,6 +92,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The power constant for exponential backoff between each
describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT
= "flink.stream.describe.backoff.expconst";
+ /** The maximum number of listShards attempts if we get a recoverable
exception. */
+ public static final String LIST_SHARDS_RETRIES =
"flink.list.shards.maxretries";
+
/** The base backoff time between each listShards attempt. */
public static final String LIST_SHARDS_BACKOFF_BASE =
"flink.list.shards.backoff.base";
@@ -104,7 +107,7 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The maximum number of records to try to get each time we fetch
records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX =
"flink.shard.getrecords.maxrecordcount";
- /** The maximum number of getRecords attempts if we get
ProvisionedThroughputExceededException. */
+ /** The maximum number of getRecords attempts if we get a recoverable
exception. */
public static final String SHARD_GETRECORDS_RETRIES =
"flink.shard.getrecords.maxretries";
/** The base backoff time between getRecords attempts if we get a
ProvisionedThroughputExceededException. */
@@ -161,6 +164,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
public static final double
DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+ public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7e6a3604414..262181ae3bc 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -91,6 +91,9 @@
/** Exponential backoff power constant for the list shards operation. */
private final double listShardsExpConstant;
+ /** Maximum retry attempts for the list shards operation. */
+ private final int listShardsMaxRetries;
+
//
------------------------------------------------------------------------
// getRecords() related performance settings
//
------------------------------------------------------------------------
@@ -104,8 +107,8 @@
/** Exponential backoff power constant for the get records operation. */
private final double getRecordsExpConstant;
- /** Maximum attempts for the get records operation. */
- private final int getRecordsMaxAttempts;
+ /** Maximum retry attempts for the get records operation. */
+ private final int getRecordsMaxRetries;
//
------------------------------------------------------------------------
// getShardIterator() related performance settings
@@ -120,8 +123,8 @@
/** Exponential backoff power constant for the get shard iterator
operation. */
private final double getShardIteratorExpConstant;
- /** Maximum attempts for the get shard iterator operation. */
- private final int getShardIteratorMaxAttempts;
+ /** Maximum retry attempts for the get shard iterator operation. */
+ private final int getShardIteratorMaxRetries;
/**
* Create a new KinesisProxy based on the supplied configuration
properties.
@@ -146,6 +149,10 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+ this.listShardsMaxRetries = Integer.valueOf(
+ configProps.getProperty(
+ ConsumerConfigConstants.LIST_SHARDS_RETRIES,
+
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
this.getRecordsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
@@ -159,7 +166,7 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getRecordsMaxAttempts = Integer.valueOf(
+ this.getRecordsMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
@@ -176,7 +183,7 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getShardIteratorMaxAttempts = Integer.valueOf(
+ this.getShardIteratorMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
@@ -217,14 +224,14 @@ public GetRecordsResult getRecords(String shardIterator,
int maxRecordsToGet) th
GetRecordsResult getRecordsResult = null;
- int attempt = 0;
- while (attempt <= getRecordsMaxAttempts && getRecordsResult ==
null) {
+ int retryCount = 0;
+ while (retryCount <= getRecordsMaxRetries && getRecordsResult
== null) {
try {
getRecordsResult =
kinesisClient.getRecords(getRecordsRequest);
} catch (SdkClientException ex) {
if (isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
- getRecordsBaseBackoffMillis,
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+ getRecordsBaseBackoffMillis,
getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
LOG.warn("Got recoverable
SdkClientException. Backing off for "
+ backoffMillis + " millis (" +
ex.getMessage() + ")");
Thread.sleep(backoffMillis);
@@ -235,7 +242,7 @@ public GetRecordsResult getRecords(String shardIterator,
int maxRecordsToGet) th
}
if (getRecordsResult == null) {
- throw new RuntimeException("Rate Exceeded for
getRecords operation - all " + getRecordsMaxAttempts +
+ throw new RuntimeException("Rate Exceeded for
getRecords operation - all " + getRecordsMaxRetries +
" retry attempts returned
ProvisionedThroughputExceededException.");
}
@@ -292,14 +299,14 @@ public String getShardIterator(StreamShardHandle shard,
String shardIteratorType
private String getShardIterator(GetShardIteratorRequest
getShardIteratorRequest) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;
- int attempt = 0;
- while (attempt <= getShardIteratorMaxAttempts &&
getShardIteratorResult == null) {
+ int retryCount = 0;
+ while (retryCount <= getShardIteratorMaxRetries &&
getShardIteratorResult == null) {
try {
getShardIteratorResult =
kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
-
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis,
getShardIteratorExpConstant, attempt++);
+
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis,
getShardIteratorExpConstant, retryCount++);
LOG.warn("Got recoverable
AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" +
ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
@@ -310,7 +317,7 @@ private String getShardIterator(GetShardIteratorRequest
getShardIteratorRequest)
}
if (getShardIteratorResult == null) {
- throw new RuntimeException("Rate Exceeded for
getShardIterator operation - all " + getShardIteratorMaxAttempts +
+ throw new RuntimeException("Rate Exceeded for
getShardIterator operation - all " + getShardIteratorMaxRetries +
" retry attempts returned
ProvisionedThroughputExceededException.");
}
return getShardIteratorResult.getShardIterator();
@@ -406,16 +413,16 @@ private ListShardsResult listShards(String streamName,
@Nullable String startSha
ListShardsResult listShardsResults = null;
// Call ListShards, with full-jitter backoff (if we get
LimitExceededException).
- int attemptCount = 0;
+ int retryCount = 0;
// List Shards returns just the first 1000 shard entries. Make
sure that all entries
// are taken up.
- while (listShardsResults == null) { // retry until we get a
result
+ while (retryCount <= listShardsMaxRetries && listShardsResults
== null) { // retry until we get a result
try {
listShardsResults =
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
- listShardsBaseBackoffMillis,
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+ listShardsBaseBackoffMillis,
listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got LimitExceededException
when listing shards from stream " + streamName
+ ".
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
@@ -433,6 +440,18 @@ private ListShardsResult listShards(String streamName,
@Nullable String startSha
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token.
Reusing the previous state.");
break;
+ } catch (SdkClientException ex) {
+ if (retryCount < listShardsMaxRetries &&
isRecoverableSdkClientException(ex)) {
+ long backoffMillis = fullJitterBackoff(
+ listShardsBaseBackoffMillis,
listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
+ LOG.warn("Got SdkClientException when
listing shards from stream {}. Backing off for {} millis.",
+ streamName, backoffMillis);
+ Thread.sleep(backoffMillis);
+ } else {
+ // propagate if retries exceeded or not
recoverable
+ // (otherwise would return null result
and keep trying forever)
+ throw ex;
+ }
}
}
// Kinesalite (mock implementation of Kinesis) does not
correctly exclude shards before
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 775ae4b3352..edf6ceb0d57 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -27,16 +27,24 @@
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.AmazonKinesisException;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
@@ -54,6 +62,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -91,6 +100,37 @@ public void testIsRecoverableExceptionWithNullErrorType() {
assertFalse(KinesisProxy.isRecoverableException(ex));
}
+ @Test
+ public void testGetRecordsRetry() throws Exception {
+ Properties kinesisConsumerConfig = new Properties();
+
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
+ final GetRecordsResult expectedResult = new GetRecordsResult();
+ MutableInt retries = new MutableInt();
+ final Throwable[] retriableExceptions = new Throwable[] {
+ new AmazonKinesisException("mock"),
+ };
+
+ AmazonKinesisClient mockClient =
mock(AmazonKinesisClient.class);
+ Mockito.when(mockClient.getRecords(any())).thenAnswer(new
Answer<GetRecordsResult>() {
+ @Override
+ public GetRecordsResult answer(InvocationOnMock
invocation) throws Throwable{
+ if (retries.intValue() <
retriableExceptions.length) {
+ retries.increment();
+ throw
retriableExceptions[retries.intValue() - 1];
+ }
+ return expectedResult;
+ }
+ });
+
+ KinesisProxy kinesisProxy = new
KinesisProxy(kinesisConsumerConfig);
+ Whitebox.getField(KinesisProxy.class,
"kinesisClient").set(kinesisProxy, mockClient);
+
+ GetRecordsResult result =
kinesisProxy.getRecords("fakeShardIterator", 1);
+ assertEquals(retriableExceptions.length, retries.intValue());
+ assertEquals(expectedResult, result);
+ }
+
@Test
public void testGetShardList() throws Exception {
List<String> shardIds =
@@ -151,6 +191,60 @@ public void testGetShardList() throws Exception {
expectedStreamShard.toArray(new
StreamShardHandle[actualShardList.size()])));
}
+ @Test
+ public void testGetShardListRetry() throws Exception {
+ Properties kinesisConsumerConfig = new Properties();
+
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
+ Shard shard = new Shard();
+ shard.setShardId("fake-shard-000000000000");
+ final ListShardsResult expectedResult = new ListShardsResult();
+ expectedResult.withShards(shard);
+
+ MutableInt exceptionCount = new MutableInt();
+ final Throwable[] retriableExceptions = new Throwable[]{
+ new AmazonKinesisException("attempt1"),
+ new AmazonKinesisException("attempt2"),
+ };
+
+ AmazonKinesisClient mockClient =
mock(AmazonKinesisClient.class);
+ Mockito.when(mockClient.listShards(any())).thenAnswer(new
Answer<ListShardsResult>() {
+
+ @Override
+ public ListShardsResult answer(InvocationOnMock
invocation) throws Throwable {
+ if (exceptionCount.intValue() <
retriableExceptions.length) {
+ exceptionCount.increment();
+ throw
retriableExceptions[exceptionCount.intValue() - 1];
+ }
+ return expectedResult;
+ }
+ });
+
+ KinesisProxy kinesisProxy = new
KinesisProxy(kinesisConsumerConfig);
+ Whitebox.getField(KinesisProxy.class,
"kinesisClient").set(kinesisProxy, mockClient);
+
+ HashMap<String, String> streamNames = new HashMap();
+ streamNames.put("fake-stream", null);
+ GetShardListResult result =
kinesisProxy.getShardList(streamNames);
+ assertEquals(retriableExceptions.length,
exceptionCount.intValue());
+ assertEquals(true, result.hasRetrievedShards());
+ assertEquals(shard.getShardId(),
result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());
+
+ // test max attempt count exceeded
+ int maxRetries = 1;
+ exceptionCount.setValue(0);
+
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES,
String.valueOf(maxRetries));
+ kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+ Whitebox.getField(KinesisProxy.class,
"kinesisClient").set(kinesisProxy, mockClient);
+ try {
+ kinesisProxy.getShardList(streamNames);
+ Assert.fail("exception expected");
+ } catch (SdkClientException ex) {
+ assertEquals(retriableExceptions[maxRetries], ex);
+ }
+ assertEquals(maxRetries + 1, exceptionCount.intValue());
+ }
+
@Test
public void testCustomConfigurationOverride() {
Properties configProps = new Properties();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kinesis Consumer listShards should support more recoverable exceptions
> ----------------------------------------------------------------------
>
> Key: FLINK-10020
> URL: https://issues.apache.org/jira/browse/FLINK-10020
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Reporter: Thomas Weise
> Assignee: Thomas Weise
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0
>
>
> Currently transient errors in listShards make the consumer fail and cause the
> entire job to reset. That is unnecessary for certain exceptions (like status
> 503 errors). It should be possible to control the exceptions that qualify for
> retry, similar to getRecords/isRecoverableSdkClientException.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)