This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f63b44  Mitigate Kinesis stream LimitExceededException by using 
listShards API (#12161)
1f63b44 is described below

commit 1f63b447c45cb852924d5d3101a1005b670a054b
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Jan 21 10:15:51 2022 +0530

    Mitigate Kinesis stream LimitExceededException by using listShards API 
(#12161)
    
    Makes kinesis ingestion resilient to `LimitExceededException` caused by 
resharding.
    Replace `describeStream` with `listShards` (recommended) to get shard 
related info.
    `describeStream` has a limit (100) to the number of shards returned per 
call and a low default TPS limit of 10.
    `listShards` returns the info for at most 1000 shards and has a higher TPS 
limit of 100 as well.
    
    Key changed/added classes in this PR
     * `KinesisRecordSupplier`
     * `KinesisAdminClient`
---
 .../indexing/kinesis/KinesisRecordSupplier.java    | 55 +++++++++++-----------
 .../kinesis/KinesisRecordSupplierTest.java         | 52 ++++++++++----------
 .../druid/testing/utils/KinesisAdminClient.java    | 55 +++++++++++++++-------
 3 files changed, 90 insertions(+), 72 deletions(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 64e3bad..c6b1362 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -26,24 +26,22 @@ import 
com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.InvalidArgumentException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamDescription;
 import 
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.amazonaws.util.AwsHostNameUtils;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.druid.common.aws.AWSClientUtil;
@@ -70,11 +68,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -663,34 +661,35 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
     return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
   }
 
+  /**
+   * Use the API listShards which is the recommended way instead of 
describeStream
+   * listShards can return 1000 shards per call and has a limit of 100TPS
+   * This makes the method resilient to LimitExceeded exceptions (compared to 
100 shards, 10 TPS of describeStream)
+   *
+   * @param stream name of stream
+   *
+   * @return Set of Shard ids
+   */
   @Override
   public Set<String> getPartitionIds(String stream)
   {
-    return wrapExceptions(
-        () -> {
-          final Set<String> retVal = new HashSet<>();
-          DescribeStreamRequest request = new DescribeStreamRequest();
-          request.setStreamName(stream);
-
-          while (request != null) {
-            final DescribeStreamResult result = 
kinesis.describeStream(request);
-            final StreamDescription streamDescription = 
result.getStreamDescription();
-            final List<Shard> shards = streamDescription.getShards();
-
-            for (Shard shard : shards) {
-              retVal.add(shard.getShardId());
-            }
-
-            if (streamDescription.isHasMoreShards()) {
-              
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
-            } else {
-              request = null;
-            }
-          }
-
+    return wrapExceptions(() -> {
+      final Set<String> retVal = new TreeSet<>();
+      ListShardsRequest request = new 
ListShardsRequest().withStreamName(stream);
+      while (true) {
+        ListShardsResult result = kinesis.listShards(request);
+        retVal.addAll(result.getShards()
+                            .stream()
+                            .map(Shard::getShardId)
+                            .collect(Collectors.toList())
+        );
+        String nextToken = result.getNextToken();
+        if (nextToken == null) {
           return retVal;
         }
-    );
+        request = new ListShardsRequest().withNextToken(nextToken);
+      }
+    });
   }
 
   /**
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index ec8ab62..dda0e20 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -23,15 +23,14 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -146,14 +145,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
   private static int recordsPerFetch;
   private static AmazonKinesis kinesis;
-  private static DescribeStreamResult describeStreamResult0;
-  private static DescribeStreamResult describeStreamResult1;
+  private static ListShardsResult listShardsResult0;
+  private static ListShardsResult listShardsResult1;
   private static GetShardIteratorResult getShardIteratorResult0;
   private static GetShardIteratorResult getShardIteratorResult1;
   private static GetRecordsResult getRecordsResult0;
   private static GetRecordsResult getRecordsResult1;
-  private static StreamDescription streamDescription0;
-  private static StreamDescription streamDescription1;
   private static Shard shard0;
   private static Shard shard1;
   private static KinesisRecordSupplier recordSupplier;
@@ -162,14 +159,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   public void setupTest()
   {
     kinesis = createMock(AmazonKinesisClient.class);
-    describeStreamResult0 = createMock(DescribeStreamResult.class);
-    describeStreamResult1 = createMock(DescribeStreamResult.class);
+    listShardsResult0 = createMock(ListShardsResult.class);
+    listShardsResult1 = createMock(ListShardsResult.class);
     getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
     getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
     getRecordsResult0 = createMock(GetRecordsResult.class);
     getRecordsResult1 = createMock(GetRecordsResult.class);
-    streamDescription0 = createMock(StreamDescription.class);
-    streamDescription1 = createMock(StreamDescription.class);
     shard0 = createMock(Shard.class);
     shard1 = createMock(Shard.class);
     recordsPerFetch = 1;
@@ -187,19 +182,17 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   @Test
   public void testSupplierSetup()
   {
-    final Capture<DescribeStreamRequest> capturedRequest = 
Capture.newInstance();
-
-    
EasyMock.expect(kinesis.describeStream(EasyMock.capture(capturedRequest))).andReturn(describeStreamResult0).once();
-    
EasyMock.expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
-    
EasyMock.expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once();
-    
EasyMock.expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
-    EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).times(2);
-    
EasyMock.expect(kinesis.describeStream(EasyMock.anyObject(DescribeStreamRequest.class)))
-            .andReturn(describeStreamResult1)
-            .once();
-    
EasyMock.expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
-    
EasyMock.expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once();
-    
EasyMock.expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
+    final Capture<ListShardsRequest> capturedRequest0 = Capture.newInstance();
+    final Capture<ListShardsRequest> capturedRequest1 = Capture.newInstance();
+
+    
EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest0))).andReturn(listShardsResult0).once();
+    
EasyMock.expect(listShardsResult0.getShards()).andReturn(ImmutableList.of(shard0)).once();
+    String nextToken = "nextToken";
+    
EasyMock.expect(listShardsResult0.getNextToken()).andReturn(nextToken).once();
+    EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).once();
+    
EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest1))).andReturn(listShardsResult1).once();
+    
EasyMock.expect(listShardsResult1.getShards()).andReturn(ImmutableList.of(shard1)).once();
+    EasyMock.expect(listShardsResult1.getNextToken()).andReturn(null).once();
     EasyMock.expect(shard1.getShardId()).andReturn(SHARD_ID1).once();
 
     replayAll();
@@ -236,10 +229,13 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     verifyAll();
 
-    final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
-    expectedRequest.setStreamName(STREAM);
-    expectedRequest.setExclusiveStartShardId("0");
-    Assert.assertEquals(expectedRequest, capturedRequest.getValue());
+    final ListShardsRequest expectedRequest0 = new ListShardsRequest();
+    expectedRequest0.setStreamName(STREAM);
+    Assert.assertEquals(expectedRequest0, capturedRequest0.getValue());
+
+    final ListShardsRequest expectedRequest1 = new ListShardsRequest();
+    expectedRequest1.setNextToken(nextToken);
+    Assert.assertEquals(expectedRequest1, capturedRequest1.getValue());
   }
 
   private static GetRecordsRequest generateGetRecordsReq(String shardIterator, 
int limit)
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
index 7c8759a..53e3284 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
@@ -30,21 +30,27 @@ import 
com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
 import com.amazonaws.services.kinesis.model.CreateStreamResult;
 import com.amazonaws.services.kinesis.model.DeleteStreamResult;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import com.amazonaws.services.kinesis.model.ScalingType;
+import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
 import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
 import com.amazonaws.util.AwsHostNameUtils;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.ISE;
 
 import java.io.FileInputStream;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 public class KinesisAdminClient implements StreamAdminClient
 {
-  private AmazonKinesis amazonKinesis;
+  private final AmazonKinesis amazonKinesis;
 
   public KinesisAdminClient(String endpoint) throws Exception
   {
@@ -107,6 +113,9 @@ public class KinesisAdminClient implements StreamAdminClient
   public void updatePartitionCount(String streamName, int newShardCount, 
boolean blocksUntilStarted)
   {
     int originalShardCount = getStreamPartitionCount(streamName);
+    if (originalShardCount == newShardCount) {
+      return;
+    }
     UpdateShardCountRequest updateShardCountRequest = new 
UpdateShardCountRequest();
     updateShardCountRequest.setStreamName(streamName);
     updateShardCountRequest.setTargetShardCount(newShardCount);
@@ -119,13 +128,13 @@ public class KinesisAdminClient implements 
StreamAdminClient
       // Wait until the resharding started (or finished)
       ITRetryUtil.retryUntil(
           () -> {
-            StreamDescription streamDescription = 
getStreamDescription(streamName);
-            int updatedShardCount = getStreamShardCount(streamDescription);
-            return verifyStreamStatus(streamDescription, 
StreamStatus.UPDATING) ||
-                (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) && 
updatedShardCount > originalShardCount);
-          },
-          true,
-          30,
+            int updatedShardCount = getStreamPartitionCount(streamName);
+            // Stream should be in active or updating state AND
+            // the number of shards must have increased irrespective of the 
value of newShardCount
+            return verifyStreamStatus(streamName, StreamStatus.ACTIVE, 
StreamStatus.UPDATING)
+                   && updatedShardCount > originalShardCount;
+          }, true,
+          300, // higher value to avoid exceeding kinesis TPS limit
           30,
           "Kinesis stream resharding to start (or finished)"
       );
@@ -135,15 +144,13 @@ public class KinesisAdminClient implements 
StreamAdminClient
   @Override
   public boolean isStreamActive(String streamName)
   {
-    StreamDescription streamDescription = getStreamDescription(streamName);
-    return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE);
+    return verifyStreamStatus(streamName, StreamStatus.ACTIVE);
   }
 
   @Override
   public int getStreamPartitionCount(String streamName)
   {
-    StreamDescription streamDescription = getStreamDescription(streamName);
-    return getStreamShardCount(streamDescription);
+    return listShards(streamName).size();
   }
 
   @Override
@@ -156,15 +163,31 @@ public class KinesisAdminClient implements 
StreamAdminClient
     return actualShardCount == oldShardCount + newShardCount;
   }
 
+  private Set<Shard> listShards(String streamName)
+  {
+    ListShardsRequest listShardsRequest = new 
ListShardsRequest().withStreamName(streamName);
+    ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+    while (true) {
+      ListShardsResult listShardsResult = 
amazonKinesis.listShards(listShardsRequest);
+      shards.addAll(listShardsResult.getShards());
+      String nextToken = listShardsResult.getNextToken();
+      if (nextToken == null) {
+        return shards.build();
+      }
+      listShardsRequest = new 
ListShardsRequest().withNextToken(listShardsResult.getNextToken());
+    }
+  }
 
-  private boolean verifyStreamStatus(StreamDescription streamDescription, 
StreamStatus streamStatusToCheck)
+  private boolean verifyStreamStatus(String streamName, StreamStatus... 
streamStatuses)
   {
-    return 
streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
+    return Arrays.stream(streamStatuses)
+                 .map(StreamStatus::toString)
+                 .anyMatch(getStreamStatus(streamName)::equals);
   }
 
-  private int getStreamShardCount(StreamDescription streamDescription)
+  private String getStreamStatus(String streamName)
   {
-    return streamDescription.getShards().size();
+    return getStreamDescription(streamName).getStreamStatus();
   }
 
   private StreamDescription getStreamDescription(String streamName)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to