rohangarg commented on a change in pull request #12161:
URL: https://github.com/apache/druid/pull/12161#discussion_r788604447
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -663,34 +660,35 @@ public String
getEarliestSequenceNumber(StreamPartition<String> partition)
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
+ /**
+ * Use the API listShards which is the recommended way instead of
describeStream
+ * listShards can return 1000 shards per call and has a limit of 100TPS
Review comment:
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html -
says 1000TPS per stream. typo?
##########
File path:
integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
##########
@@ -119,31 +127,27 @@ public void updatePartitionCount(String streamName, int
newShardCount, boolean b
// Wait until the resharding started (or finished)
ITRetryUtil.retryUntil(
() -> {
- StreamDescription streamDescription =
getStreamDescription(streamName);
- int updatedShardCount = getStreamShardCount(streamDescription);
- return verifyStreamStatus(streamDescription,
StreamStatus.UPDATING) ||
- (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) &&
updatedShardCount > originalShardCount);
- },
- true,
- 30,
- 30,
- "Kinesis stream resharding to start (or finished)"
+ int updatedShardCount = getStreamPartitionCount(streamName);
+ // Stream should be in active or updating state AND
+ // the number of shards must have increased irrespective
+ return verifyStreamStatus(streamName, StreamStatus.ACTIVE,
StreamStatus.UPDATING)
+ && updatedShardCount > originalShardCount;
+ }, true, 300, // higher value to avoid exceeding kinesis TPS limit
Review comment:
fix formatting in this and below line
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -663,34 +660,35 @@ public String
getEarliestSequenceNumber(StreamPartition<String> partition)
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
+ /**
+ * Use the API listShards which is the recommended way instead of
describeStream
+ * listShards can return 1000 shards per call and has a limit of 100TPS
+ * This makes the method resilient to LimitExceeded exceptions (compared to
100 shards, 10 TPS of describeStream)
+ *
+ * @param stream name of stream
+ *
+ * @return Set of Shard ids
+ */
@Override
public Set<String> getPartitionIds(String stream)
{
- return wrapExceptions(
- () -> {
- final Set<String> retVal = new HashSet<>();
- DescribeStreamRequest request = new DescribeStreamRequest();
- request.setStreamName(stream);
-
- while (request != null) {
- final DescribeStreamResult result =
kinesis.describeStream(request);
- final StreamDescription streamDescription =
result.getStreamDescription();
- final List<Shard> shards = streamDescription.getShards();
-
- for (Shard shard : shards) {
- retVal.add(shard.getShardId());
- }
-
- if (streamDescription.isHasMoreShards()) {
-
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
- } else {
- request = null;
- }
- }
-
+ return wrapExceptions(() -> {
+ final Set<String> retVal = new HashSet<>();
Review comment:
can make this object immutable via an `ImmutableSet`
##########
File path:
integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
##########
@@ -156,15 +160,35 @@ public boolean verfiyPartitionCountUpdated(String
streamName, int oldShardCount,
return actualShardCount == oldShardCount + newShardCount;
}
+ private List<Shard> listShards(String streamName)
+ {
+ ListShardsRequest listShardsRequest = new
ListShardsRequest().withStreamName(streamName);
+ List<Shard> shards = new ArrayList<>();
+ while (true) {
+ ListShardsResult listShardsResult =
amazonKinesis.listShards(listShardsRequest);
+ shards.addAll(listShardsResult.getShards());
+ String nextToken = listShardsResult.getNextToken();
+ if (nextToken == null) {
+ return shards;
+ }
+ listShardsRequest = new
ListShardsRequest().withNextToken(listShardsResult.getNextToken());
+ }
+ }
- private boolean verifyStreamStatus(StreamDescription streamDescription,
StreamStatus streamStatusToCheck)
+ private boolean verifyStreamStatus(String streamName, StreamStatus...
streamStatuses)
{
- return
streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
+ String status = getStreamStatus(streamName);
+ for (StreamStatus streamStatus : streamStatuses) {
Review comment:
nit: this loop can be `return
Arrays.stream(streamStatuses).map(StreamStatus::toString).anyMatch(streamName::equals);`
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -663,34 +660,35 @@ public String
getEarliestSequenceNumber(StreamPartition<String> partition)
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
+ /**
+ * Use the API listShards which is the recommended way instead of
describeStream
+ * listShards can return 1000 shards per call and has a limit of 100TPS
+ * This makes the method resilient to LimitExceeded exceptions (compared to
100 shards, 10 TPS of describeStream)
+ *
+ * @param stream name of stream
+ *
+ * @return Set of Shard ids
+ */
@Override
public Set<String> getPartitionIds(String stream)
{
- return wrapExceptions(
- () -> {
- final Set<String> retVal = new HashSet<>();
- DescribeStreamRequest request = new DescribeStreamRequest();
- request.setStreamName(stream);
-
- while (request != null) {
- final DescribeStreamResult result =
kinesis.describeStream(request);
- final StreamDescription streamDescription =
result.getStreamDescription();
- final List<Shard> shards = streamDescription.getShards();
-
- for (Shard shard : shards) {
- retVal.add(shard.getShardId());
- }
-
- if (streamDescription.isHasMoreShards()) {
-
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
- } else {
- request = null;
- }
- }
-
+ return wrapExceptions(() -> {
+ final Set<String> retVal = new HashSet<>();
+ ListShardsRequest request = new
ListShardsRequest().withStreamName(stream);
+ while (true) {
+ ListShardsResult result = kinesis.listShards(request);
+ retVal.addAll(result.getShards()
+ .stream()
+ .map(x -> x.getShardId())
Review comment:
nit: does the IDE convert this to `Shard::getShardId` ?
##########
File path:
integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
##########
@@ -156,15 +160,35 @@ public boolean verfiyPartitionCountUpdated(String
streamName, int oldShardCount,
return actualShardCount == oldShardCount + newShardCount;
}
+ private List<Shard> listShards(String streamName)
Review comment:
1. why have this in a separate method instead of inlining it in the only
caller?
2. using Arraylist here whereas the main method uses Set - can there be
duplicates? if not, then we can only maintain a counter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]