This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1f63b44 Mitigate Kinesis stream LimitExceededException by using
listShards API (#12161)
1f63b44 is described below
commit 1f63b447c45cb852924d5d3101a1005b670a054b
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Jan 21 10:15:51 2022 +0530
Mitigate Kinesis stream LimitExceededException by using listShards API
(#12161)
Makes kinesis ingestion resilient to `LimitExceededException` caused by
resharding.
Replace `describeStream` with `listShards` (recommended) to get shard
related info.
`describeStream` has a limit (100) to the number of shards returned per
call and a low default TPS limit of 10.
`listShards` returns the info for at most 1000 shards and has a higher TPS
limit of 100 as well.
Key changed/added classes in this PR
* `KinesisRecordSupplier`
* `KinesisAdminClient`
---
.../indexing/kinesis/KinesisRecordSupplier.java | 55 +++++++++++-----------
.../kinesis/KinesisRecordSupplierTest.java | 52 ++++++++++----------
.../druid/testing/utils/KinesisAdminClient.java | 55 +++++++++++++++-------
3 files changed, 90 insertions(+), 72 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 64e3bad..c6b1362 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -26,24 +26,22 @@ import
com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamDescription;
import
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSClientUtil;
@@ -70,11 +68,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -663,34 +661,35 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
+ /**
+ * Use the API listShards which is the recommended way instead of
describeStream
+ * listShards can return 1000 shards per call and has a limit of 100TPS
+ * This makes the method resilient to LimitExceeded exceptions (compared to
100 shards, 10 TPS of describeStream)
+ *
+ * @param stream name of stream
+ *
+ * @return Set of Shard ids
+ */
@Override
public Set<String> getPartitionIds(String stream)
{
- return wrapExceptions(
- () -> {
- final Set<String> retVal = new HashSet<>();
- DescribeStreamRequest request = new DescribeStreamRequest();
- request.setStreamName(stream);
-
- while (request != null) {
- final DescribeStreamResult result =
kinesis.describeStream(request);
- final StreamDescription streamDescription =
result.getStreamDescription();
- final List<Shard> shards = streamDescription.getShards();
-
- for (Shard shard : shards) {
- retVal.add(shard.getShardId());
- }
-
- if (streamDescription.isHasMoreShards()) {
-
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
- } else {
- request = null;
- }
- }
-
+ return wrapExceptions(() -> {
+ final Set<String> retVal = new TreeSet<>();
+ ListShardsRequest request = new
ListShardsRequest().withStreamName(stream);
+ while (true) {
+ ListShardsResult result = kinesis.listShards(request);
+ retVal.addAll(result.getShards()
+ .stream()
+ .map(Shard::getShardId)
+ .collect(Collectors.toList())
+ );
+ String nextToken = result.getNextToken();
+ if (nextToken == null) {
return retVal;
}
- );
+ request = new ListShardsRequest().withNextToken(nextToken);
+ }
+ });
}
/**
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index ec8ab62..dda0e20 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -23,15 +23,14 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.amazonaws.services.kinesis.model.StreamDescription;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -146,14 +145,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
private static int recordsPerFetch;
private static AmazonKinesis kinesis;
- private static DescribeStreamResult describeStreamResult0;
- private static DescribeStreamResult describeStreamResult1;
+ private static ListShardsResult listShardsResult0;
+ private static ListShardsResult listShardsResult1;
private static GetShardIteratorResult getShardIteratorResult0;
private static GetShardIteratorResult getShardIteratorResult1;
private static GetRecordsResult getRecordsResult0;
private static GetRecordsResult getRecordsResult1;
- private static StreamDescription streamDescription0;
- private static StreamDescription streamDescription1;
private static Shard shard0;
private static Shard shard1;
private static KinesisRecordSupplier recordSupplier;
@@ -162,14 +159,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
public void setupTest()
{
kinesis = createMock(AmazonKinesisClient.class);
- describeStreamResult0 = createMock(DescribeStreamResult.class);
- describeStreamResult1 = createMock(DescribeStreamResult.class);
+ listShardsResult0 = createMock(ListShardsResult.class);
+ listShardsResult1 = createMock(ListShardsResult.class);
getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
getRecordsResult0 = createMock(GetRecordsResult.class);
getRecordsResult1 = createMock(GetRecordsResult.class);
- streamDescription0 = createMock(StreamDescription.class);
- streamDescription1 = createMock(StreamDescription.class);
shard0 = createMock(Shard.class);
shard1 = createMock(Shard.class);
recordsPerFetch = 1;
@@ -187,19 +182,17 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
@Test
public void testSupplierSetup()
{
- final Capture<DescribeStreamRequest> capturedRequest =
Capture.newInstance();
-
-
EasyMock.expect(kinesis.describeStream(EasyMock.capture(capturedRequest))).andReturn(describeStreamResult0).once();
-
EasyMock.expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
-
EasyMock.expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once();
-
EasyMock.expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
- EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).times(2);
-
EasyMock.expect(kinesis.describeStream(EasyMock.anyObject(DescribeStreamRequest.class)))
- .andReturn(describeStreamResult1)
- .once();
-
EasyMock.expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
-
EasyMock.expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once();
-
EasyMock.expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
+ final Capture<ListShardsRequest> capturedRequest0 = Capture.newInstance();
+ final Capture<ListShardsRequest> capturedRequest1 = Capture.newInstance();
+
+
EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest0))).andReturn(listShardsResult0).once();
+
EasyMock.expect(listShardsResult0.getShards()).andReturn(ImmutableList.of(shard0)).once();
+ String nextToken = "nextToken";
+
EasyMock.expect(listShardsResult0.getNextToken()).andReturn(nextToken).once();
+ EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).once();
+
EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest1))).andReturn(listShardsResult1).once();
+
EasyMock.expect(listShardsResult1.getShards()).andReturn(ImmutableList.of(shard1)).once();
+ EasyMock.expect(listShardsResult1.getNextToken()).andReturn(null).once();
EasyMock.expect(shard1.getShardId()).andReturn(SHARD_ID1).once();
replayAll();
@@ -236,10 +229,13 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
verifyAll();
- final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
- expectedRequest.setStreamName(STREAM);
- expectedRequest.setExclusiveStartShardId("0");
- Assert.assertEquals(expectedRequest, capturedRequest.getValue());
+ final ListShardsRequest expectedRequest0 = new ListShardsRequest();
+ expectedRequest0.setStreamName(STREAM);
+ Assert.assertEquals(expectedRequest0, capturedRequest0.getValue());
+
+ final ListShardsRequest expectedRequest1 = new ListShardsRequest();
+ expectedRequest1.setNextToken(nextToken);
+ Assert.assertEquals(expectedRequest1, capturedRequest1.getValue());
}
private static GetRecordsRequest generateGetRecordsReq(String shardIterator,
int limit)
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
index 7c8759a..53e3284 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
@@ -30,21 +30,27 @@ import
com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
import com.amazonaws.services.kinesis.model.CreateStreamResult;
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ScalingType;
+import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
import com.amazonaws.util.AwsHostNameUtils;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.ISE;
import java.io.FileInputStream;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
public class KinesisAdminClient implements StreamAdminClient
{
- private AmazonKinesis amazonKinesis;
+ private final AmazonKinesis amazonKinesis;
public KinesisAdminClient(String endpoint) throws Exception
{
@@ -107,6 +113,9 @@ public class KinesisAdminClient implements StreamAdminClient
public void updatePartitionCount(String streamName, int newShardCount,
boolean blocksUntilStarted)
{
int originalShardCount = getStreamPartitionCount(streamName);
+ if (originalShardCount == newShardCount) {
+ return;
+ }
UpdateShardCountRequest updateShardCountRequest = new
UpdateShardCountRequest();
updateShardCountRequest.setStreamName(streamName);
updateShardCountRequest.setTargetShardCount(newShardCount);
@@ -119,13 +128,13 @@ public class KinesisAdminClient implements
StreamAdminClient
// Wait until the resharding started (or finished)
ITRetryUtil.retryUntil(
() -> {
- StreamDescription streamDescription =
getStreamDescription(streamName);
- int updatedShardCount = getStreamShardCount(streamDescription);
- return verifyStreamStatus(streamDescription,
StreamStatus.UPDATING) ||
- (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) &&
updatedShardCount > originalShardCount);
- },
- true,
- 30,
+ int updatedShardCount = getStreamPartitionCount(streamName);
+ // Stream should be in active or updating state AND
+ // the number of shards must have increased irrespective of the
value of newShardCount
+ return verifyStreamStatus(streamName, StreamStatus.ACTIVE,
StreamStatus.UPDATING)
+ && updatedShardCount > originalShardCount;
+ }, true,
+ 300, // higher value to avoid exceeding kinesis TPS limit
30,
"Kinesis stream resharding to start (or finished)"
);
@@ -135,15 +144,13 @@ public class KinesisAdminClient implements
StreamAdminClient
@Override
public boolean isStreamActive(String streamName)
{
- StreamDescription streamDescription = getStreamDescription(streamName);
- return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE);
+ return verifyStreamStatus(streamName, StreamStatus.ACTIVE);
}
@Override
public int getStreamPartitionCount(String streamName)
{
- StreamDescription streamDescription = getStreamDescription(streamName);
- return getStreamShardCount(streamDescription);
+ return listShards(streamName).size();
}
@Override
@@ -156,15 +163,31 @@ public class KinesisAdminClient implements
StreamAdminClient
return actualShardCount == oldShardCount + newShardCount;
}
+ private Set<Shard> listShards(String streamName)
+ {
+ ListShardsRequest listShardsRequest = new
ListShardsRequest().withStreamName(streamName);
+ ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+ while (true) {
+ ListShardsResult listShardsResult =
amazonKinesis.listShards(listShardsRequest);
+ shards.addAll(listShardsResult.getShards());
+ String nextToken = listShardsResult.getNextToken();
+ if (nextToken == null) {
+ return shards.build();
+ }
+ listShardsRequest = new
ListShardsRequest().withNextToken(listShardsResult.getNextToken());
+ }
+ }
- private boolean verifyStreamStatus(StreamDescription streamDescription,
StreamStatus streamStatusToCheck)
+ private boolean verifyStreamStatus(String streamName, StreamStatus...
streamStatuses)
{
- return
streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
+ return Arrays.stream(streamStatuses)
+ .map(StreamStatus::toString)
+ .anyMatch(getStreamStatus(streamName)::equals);
}
- private int getStreamShardCount(StreamDescription streamDescription)
+ private String getStreamStatus(String streamName)
{
- return streamDescription.getShards().size();
+ return getStreamDescription(streamName).getStreamStatus();
}
private StreamDescription getStreamDescription(String streamName)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]