rohangarg commented on a change in pull request #12161:
URL: https://github.com/apache/druid/pull/12161#discussion_r788604447



##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -663,34 +660,35 @@ public String 
getEarliestSequenceNumber(StreamPartition<String> partition)
     return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
   }
 
+  /**
+   * Use the API listShards which is the recommended way instead of 
describeStream
+   * listShards can return 1000 shards per call and has a limit of 100TPS

Review comment:
       
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html - 
says 1000TPS per stream. typo?

##########
File path: 
integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
##########
@@ -119,31 +127,27 @@ public void updatePartitionCount(String streamName, int 
newShardCount, boolean b
       // Wait until the resharding started (or finished)
       ITRetryUtil.retryUntil(
           () -> {
-            StreamDescription streamDescription = 
getStreamDescription(streamName);
-            int updatedShardCount = getStreamShardCount(streamDescription);
-            return verifyStreamStatus(streamDescription, 
StreamStatus.UPDATING) ||
-                (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) && 
updatedShardCount > originalShardCount);
-          },
-          true,
-          30,
-          30,
-          "Kinesis stream resharding to start (or finished)"
+            int updatedShardCount = getStreamPartitionCount(streamName);
+            // Stream should be in active or updating state AND
+            // the number of shards must have increased irrespective
+            return verifyStreamStatus(streamName, StreamStatus.ACTIVE, 
StreamStatus.UPDATING)
+                   && updatedShardCount > originalShardCount;
+          }, true, 300, // higher value to avoid exceeding kinesis TPS limit

Review comment:
       fix formatting in this and below line

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -663,34 +660,35 @@ public String 
getEarliestSequenceNumber(StreamPartition<String> partition)
     return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
   }
 
+  /**
+   * Use the API listShards which is the recommended way instead of 
describeStream
+   * listShards can return 1000 shards per call and has a limit of 100TPS
+   * This makes the method resilient to LimitExceeded exceptions (compared to 
100 shards, 10 TPS of describeStream)
+   *
+   * @param stream name of stream
+   *
+   * @return Set of Shard ids
+   */
   @Override
   public Set<String> getPartitionIds(String stream)
   {
-    return wrapExceptions(
-        () -> {
-          final Set<String> retVal = new HashSet<>();
-          DescribeStreamRequest request = new DescribeStreamRequest();
-          request.setStreamName(stream);
-
-          while (request != null) {
-            final DescribeStreamResult result = 
kinesis.describeStream(request);
-            final StreamDescription streamDescription = 
result.getStreamDescription();
-            final List<Shard> shards = streamDescription.getShards();
-
-            for (Shard shard : shards) {
-              retVal.add(shard.getShardId());
-            }
-
-            if (streamDescription.isHasMoreShards()) {
-              
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
-            } else {
-              request = null;
-            }
-          }
-
+    return wrapExceptions(() -> {
+      final Set<String> retVal = new HashSet<>();

Review comment:
       can make this object immutable via an `ImmutableSet`

##########
File path: 
integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
##########
@@ -156,15 +160,35 @@ public boolean verfiyPartitionCountUpdated(String 
streamName, int oldShardCount,
     return actualShardCount == oldShardCount + newShardCount;
   }
 
+  private List<Shard> listShards(String streamName)
+  {
+    ListShardsRequest listShardsRequest = new 
ListShardsRequest().withStreamName(streamName);
+    List<Shard> shards = new ArrayList<>();
+    while (true) {
+      ListShardsResult listShardsResult = 
amazonKinesis.listShards(listShardsRequest);
+      shards.addAll(listShardsResult.getShards());
+      String nextToken = listShardsResult.getNextToken();
+      if (nextToken == null) {
+        return shards;
+      }
+      listShardsRequest = new 
ListShardsRequest().withNextToken(listShardsResult.getNextToken());
+    }
+  }
 
-  private boolean verifyStreamStatus(StreamDescription streamDescription, 
StreamStatus streamStatusToCheck)
+  private boolean verifyStreamStatus(String streamName, StreamStatus... 
streamStatuses)
   {
-    return 
streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
+    String status = getStreamStatus(streamName);
+    for (StreamStatus streamStatus : streamStatuses) {

Review comment:
       nit: this loop can be `return 
Arrays.stream(streamStatuses).map(StreamStatus::toString).anyMatch(streamName::equals);`
 

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -663,34 +660,35 @@ public String 
getEarliestSequenceNumber(StreamPartition<String> partition)
     return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
   }
 
+  /**
+   * Use the API listShards which is the recommended way instead of 
describeStream
+   * listShards can return 1000 shards per call and has a limit of 100TPS
+   * This makes the method resilient to LimitExceeded exceptions (compared to 
100 shards, 10 TPS of describeStream)
+   *
+   * @param stream name of stream
+   *
+   * @return Set of Shard ids
+   */
   @Override
   public Set<String> getPartitionIds(String stream)
   {
-    return wrapExceptions(
-        () -> {
-          final Set<String> retVal = new HashSet<>();
-          DescribeStreamRequest request = new DescribeStreamRequest();
-          request.setStreamName(stream);
-
-          while (request != null) {
-            final DescribeStreamResult result = 
kinesis.describeStream(request);
-            final StreamDescription streamDescription = 
result.getStreamDescription();
-            final List<Shard> shards = streamDescription.getShards();
-
-            for (Shard shard : shards) {
-              retVal.add(shard.getShardId());
-            }
-
-            if (streamDescription.isHasMoreShards()) {
-              
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
-            } else {
-              request = null;
-            }
-          }
-
+    return wrapExceptions(() -> {
+      final Set<String> retVal = new HashSet<>();
+      ListShardsRequest request = new 
ListShardsRequest().withStreamName(stream);
+      while (true) {
+        ListShardsResult result = kinesis.listShards(request);
+        retVal.addAll(result.getShards()
+                            .stream()
+                            .map(x -> x.getShardId())

Review comment:
       nit: does the IDE convert this to `Shard::getShardId` ?

##########
File path: 
integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
##########
@@ -156,15 +160,35 @@ public boolean verfiyPartitionCountUpdated(String 
streamName, int oldShardCount,
     return actualShardCount == oldShardCount + newShardCount;
   }
 
+  private List<Shard> listShards(String streamName)

Review comment:
       1. why have this in a separate method instead of inlining it in the only 
caller?
   2. using Arraylist here whereas the main method uses Set - can there be 
duplicates? if not, then we can only maintain a counter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to