This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d96aa15  retry 500 and 503 errors against kinesis (#10059)
d96aa15 is described below

commit d96aa1586aa17433562ebbfbac4182b6975243bb
Author: Harshpreet Singh <[email protected]>
AuthorDate: Tue Jun 23 15:49:34 2020 -0700

    retry 500 and 503 errors against kinesis (#10059)
    
    * retry 500 and 503 errors against kinesis
    
    * add test that exercises retry logic
    
    * more branch coverage
    
    * retry 500 and 503 on getRecords request when fetching sequence numberu
    
    Co-authored-by: Harshpreet Singh <[email protected]>
---
 .../indexing/kinesis/KinesisRecordSupplier.java    |  7 +-
 .../kinesis/KinesisRecordSupplierTest.java         | 89 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 1 deletion(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 65ae1f6..23e5bec 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -110,7 +110,8 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   {
     final boolean isIOException = ex.getCause() instanceof IOException;
     final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
-    return isIOException || isTimeout;
+    final boolean isInternalError = ex.getStatusCode() == 500 || 
ex.getStatusCode() == 503;
+    return isIOException || isTimeout || isInternalError;
   }
 
   /**
@@ -809,6 +810,10 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
                 );
                 return true;
               }
+              if (throwable instanceof AmazonServiceException) {
+                AmazonServiceException ase = (AmazonServiceException) 
throwable;
+                return isServiceExceptionRecoverable(ase);
+              }
               return false;
             },
             GET_SEQUENCE_NUMBER_RETRY_COUNT
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 6b2f32f..881750d 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.kinesis;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
@@ -317,6 +318,94 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   }
 
   @Test
+  public void testPollWithKinesisInternalFailure() throws InterruptedException
+  {
+    recordsPerFetch = 100;
+
+    EasyMock.expect(kinesis.getShardIterator(
+            EasyMock.anyObject(),
+            EasyMock.eq(SHARD_ID0),
+            EasyMock.anyString(),
+            EasyMock.anyString()
+    )).andReturn(
+            getShardIteratorResult0).anyTimes();
+
+    EasyMock.expect(kinesis.getShardIterator(
+            EasyMock.anyObject(),
+            EasyMock.eq(SHARD_ID1),
+            EasyMock.anyString(),
+            EasyMock.anyString()
+    )).andReturn(
+            getShardIteratorResult1).anyTimes();
+
+    
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
+    
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+            .andReturn(getRecordsResult0)
+            .anyTimes();
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
+            .andReturn(getRecordsResult1)
+            .anyTimes();
+    AmazonServiceException getException = new 
AmazonServiceException("InternalFailure");
+    getException.setErrorCode("InternalFailure");
+    getException.setStatusCode(500);
+    getException.setServiceName("AmazonKinesis");
+    
EasyMock.expect(getRecordsResult0.getRecords()).andThrow(getException).once();
+    
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
+    AmazonServiceException getException2 = new 
AmazonServiceException("InternalFailure");
+    getException2.setErrorCode("InternalFailure");
+    getException2.setStatusCode(503);
+    getException2.setServiceName("AmazonKinesis");
+    
EasyMock.expect(getRecordsResult1.getRecords()).andThrow(getException2).once();
+    
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
+    
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
+    
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
+    
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
+    
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
+    
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
+    
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
+
+    replayAll();
+
+    Set<StreamPartition<String>> partitions = ImmutableSet.of(
+            StreamPartition.of(STREAM, SHARD_ID0),
+            StreamPartition.of(STREAM, SHARD_ID1)
+    );
+
+
+    recordSupplier = new KinesisRecordSupplier(
+            kinesis,
+            recordsPerFetch,
+            0,
+            2,
+            false,
+            100,
+            5000,
+            5000,
+            60000,
+            100,
+            true
+    );
+
+    recordSupplier.assign(partitions);
+    recordSupplier.seekToEarliest(partitions);
+    recordSupplier.start();
+
+    while (recordSupplier.bufferSize() < 14) {
+      Thread.sleep(100);
+    }
+
+    List<OrderedPartitionableRecord<String, String>> polledRecords = 
cleanRecords(recordSupplier.poll(
+            POLL_TIMEOUT_MILLIS));
+
+    verifyAll();
+
+    Assert.assertEquals(partitions, recordSupplier.getAssignment());
+    Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
+    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionResourcesTimeLag());
+  }
+
+  @Test
   public void testSeek()
       throws InterruptedException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to