This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d96aa15 retry 500 and 503 errors against kinesis (#10059)
d96aa15 is described below
commit d96aa1586aa17433562ebbfbac4182b6975243bb
Author: Harshpreet Singh <[email protected]>
AuthorDate: Tue Jun 23 15:49:34 2020 -0700
retry 500 and 503 errors against kinesis (#10059)
* retry 500 and 503 errors against kinesis
* add test that exercises retry logic
* more branch coverage
* retry 500 and 503 on getRecords request when fetching sequence numberu
Co-authored-by: Harshpreet Singh <[email protected]>
---
.../indexing/kinesis/KinesisRecordSupplier.java | 7 +-
.../kinesis/KinesisRecordSupplierTest.java | 89 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 1 deletion(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 65ae1f6..23e5bec 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -110,7 +110,8 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
- return isIOException || isTimeout;
+ final boolean isInternalError = ex.getStatusCode() == 500 ||
ex.getStatusCode() == 503;
+ return isIOException || isTimeout || isInternalError;
}
/**
@@ -809,6 +810,10 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
);
return true;
}
+ if (throwable instanceof AmazonServiceException) {
+ AmazonServiceException ase = (AmazonServiceException)
throwable;
+ return isServiceExceptionRecoverable(ase);
+ }
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 6b2f32f..881750d 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.kinesis;
+import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
@@ -317,6 +318,94 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
}
@Test
+ public void testPollWithKinesisInternalFailure() throws InterruptedException
+ {
+ recordsPerFetch = 100;
+
+ EasyMock.expect(kinesis.getShardIterator(
+ EasyMock.anyObject(),
+ EasyMock.eq(SHARD_ID0),
+ EasyMock.anyString(),
+ EasyMock.anyString()
+ )).andReturn(
+ getShardIteratorResult0).anyTimes();
+
+ EasyMock.expect(kinesis.getShardIterator(
+ EasyMock.anyObject(),
+ EasyMock.eq(SHARD_ID1),
+ EasyMock.anyString(),
+ EasyMock.anyString()
+ )).andReturn(
+ getShardIteratorResult1).anyTimes();
+
+
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
+
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ .andReturn(getRecordsResult0)
+ .anyTimes();
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
+ .andReturn(getRecordsResult1)
+ .anyTimes();
+ AmazonServiceException getException = new
AmazonServiceException("InternalFailure");
+ getException.setErrorCode("InternalFailure");
+ getException.setStatusCode(500);
+ getException.setServiceName("AmazonKinesis");
+
EasyMock.expect(getRecordsResult0.getRecords()).andThrow(getException).once();
+
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
+ AmazonServiceException getException2 = new
AmazonServiceException("InternalFailure");
+ getException2.setErrorCode("InternalFailure");
+ getException2.setStatusCode(503);
+ getException2.setServiceName("AmazonKinesis");
+
EasyMock.expect(getRecordsResult1.getRecords()).andThrow(getException2).once();
+
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
+
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
+
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
+
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
+
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
+
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
+
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
+
+ replayAll();
+
+ Set<StreamPartition<String>> partitions = ImmutableSet.of(
+ StreamPartition.of(STREAM, SHARD_ID0),
+ StreamPartition.of(STREAM, SHARD_ID1)
+ );
+
+
+ recordSupplier = new KinesisRecordSupplier(
+ kinesis,
+ recordsPerFetch,
+ 0,
+ 2,
+ false,
+ 100,
+ 5000,
+ 5000,
+ 60000,
+ 100,
+ true
+ );
+
+ recordSupplier.assign(partitions);
+ recordSupplier.seekToEarliest(partitions);
+ recordSupplier.start();
+
+ while (recordSupplier.bufferSize() < 14) {
+ Thread.sleep(100);
+ }
+
+ List<OrderedPartitionableRecord<String, String>> polledRecords =
cleanRecords(recordSupplier.poll(
+ POLL_TIMEOUT_MILLIS));
+
+ verifyAll();
+
+ Assert.assertEquals(partitions, recordSupplier.getAssignment());
+ Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
+ Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionResourcesTimeLag());
+ }
+
+ @Test
public void testSeek()
throws InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]