This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.15.1-incubating by this push:
new a19fc5f [Backport] Kinesis: Fix getPartitionIds, should be checking
isHasMoreShards. (#8174)
a19fc5f is described below
commit a19fc5f1ab40f6dcc87dd788c5c2468f2885afcc
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Jul 29 15:41:55 2019 -0700
[Backport] Kinesis: Fix getPartitionIds, should be checking
isHasMoreShards. (#8174)
* Kinesis: Fix getPartitionIds, should be checking isHasMoreShards. (#7830)
* fix merge
---
.../indexing/kinesis/KinesisRecordSupplier.java | 78 ++++++++++++++--------
.../kinesis/KinesisRecordSupplierTest.java | 35 +++++++---
2 files changed, 77 insertions(+), 36 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index a05eade..56b7d72 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -26,6 +26,8 @@ import
com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -35,11 +37,13 @@ import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
import
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
@@ -61,12 +65,14 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
@@ -579,12 +585,31 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
@Override
public Set<String> getPartitionIds(String stream)
{
- checkIfClosed();
- return kinesis.describeStream(stream)
- .getStreamDescription()
- .getShards()
- .stream()
- .map(Shard::getShardId).collect(Collectors.toSet());
+ return wrapExceptions(
+ () -> {
+ final Set<String> retVal = new HashSet<>();
+ DescribeStreamRequest request = new DescribeStreamRequest();
+ request.setStreamName(stream);
+
+ while (request != null) {
+ final DescribeStreamResult result =
kinesis.describeStream(request);
+ final StreamDescription streamDescription =
result.getStreamDescription();
+ final List<Shard> shards = streamDescription.getShards();
+
+ for (Shard shard : shards) {
+ retVal.add(shard.getShardId());
+ }
+
+ if (streamDescription.isHasMoreShards()) {
+
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
+ } else {
+ request = null;
+ }
+ }
+
+ return retVal;
+ }
+ );
}
@Override
@@ -624,12 +649,12 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
);
- resource.shardIterator = kinesis.getShardIterator(
+ resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
partition.getStream(),
partition.getPartitionId(),
iteratorEnum.toString(),
sequenceNumber
- ).getShardIterator();
+ ).getShardIterator());
checkPartitionsStarted = true;
}
@@ -655,10 +680,10 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
// filter records in buffer and only retain ones whose partition was not
seeked
BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new
LinkedBlockingQueue<>(recordBufferSize);
- records
- .stream()
- .filter(x -> !partitions.contains(x.getStreamPartition()))
- .forEachOrdered(newQ::offer);
+
+ records.stream()
+ .filter(x -> !partitions.contains(x.getStreamPartition()))
+ .forEachOrdered(newQ::offer);
records = newQ;
@@ -670,20 +695,11 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
@Nullable
private String getSequenceNumberInternal(StreamPartition<String> partition,
ShardIteratorType iteratorEnum)
{
-
- String shardIterator = null;
- try {
- shardIterator = kinesis.getShardIterator(
- partition.getStream(),
- partition.getPartitionId(),
- iteratorEnum.toString()
- ).getShardIterator();
- }
- catch (ResourceNotFoundException e) {
- log.warn(e, "Caught ResourceNotFoundException while getting
shardIterator");
- }
-
- return getSequenceNumberInternal(partition, shardIterator);
+ return wrapExceptions(() -> getSequenceNumberInternal(
+ partition,
+ kinesis.getShardIterator(partition.getStream(),
partition.getPartitionId(), iteratorEnum.toString())
+ .getShardIterator()
+ ));
}
@Nullable
@@ -774,6 +790,16 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
}
}
+ private static <T> T wrapExceptions(Callable<T> callable)
+ {
+ try {
+ return callable.call();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@VisibleForTesting
public int bufferSize()
{
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index bd1fbe8..6dd7de9 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -65,12 +66,14 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
private static String shard1Iterator = "1";
private static String shard0Iterator = "0";
private static AmazonKinesis kinesis;
- private static DescribeStreamResult describeStreamResult;
+ private static DescribeStreamResult describeStreamResult0;
+ private static DescribeStreamResult describeStreamResult1;
private static GetShardIteratorResult getShardIteratorResult0;
private static GetShardIteratorResult getShardIteratorResult1;
private static GetRecordsResult getRecordsResult0;
private static GetRecordsResult getRecordsResult1;
- private static StreamDescription streamDescription;
+ private static StreamDescription streamDescription0;
+ private static StreamDescription streamDescription1;
private static Shard shard0;
private static Shard shard1;
private static KinesisRecordSupplier recordSupplier;
@@ -142,12 +145,14 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
public void setupTest()
{
kinesis = createMock(AmazonKinesisClient.class);
- describeStreamResult = createMock(DescribeStreamResult.class);
+ describeStreamResult0 = createMock(DescribeStreamResult.class);
+ describeStreamResult1 = createMock(DescribeStreamResult.class);
getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
getRecordsResult0 = createMock(GetRecordsResult.class);
getRecordsResult1 = createMock(GetRecordsResult.class);
- streamDescription = createMock(StreamDescription.class);
+ streamDescription0 = createMock(StreamDescription.class);
+ streamDescription1 = createMock(StreamDescription.class);
shard0 = createMock(Shard.class);
shard1 = createMock(Shard.class);
recordsPerFetch = 1;
@@ -163,11 +168,17 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
@Test
public void testSupplierSetup()
{
- Capture<String> captured = Capture.newInstance();
-
expect(kinesis.describeStream(capture(captured))).andReturn(describeStreamResult).once();
-
expect(describeStreamResult.getStreamDescription()).andReturn(streamDescription).once();
- expect(streamDescription.getShards()).andReturn(ImmutableList.of(shard0,
shard1)).once();
- expect(shard0.getShardId()).andReturn(shardId0).once();
+ final Capture<DescribeStreamRequest> capturedRequest =
Capture.newInstance();
+
+
expect(kinesis.describeStream(capture(capturedRequest))).andReturn(describeStreamResult0).once();
+
expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
+
expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once();
+ expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
+ expect(shard0.getShardId()).andReturn(shardId0).times(2);
+
expect(kinesis.describeStream(anyObject(DescribeStreamRequest.class))).andReturn(describeStreamResult1).once();
+
expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
+
expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once();
+ expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
expect(shard1.getShardId()).andReturn(shardId1).once();
replayAll();
@@ -199,7 +210,11 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
verifyAll();
- Assert.assertEquals(stream, captured.getValue());
+
+ final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
+ expectedRequest.setStreamName(stream);
+ expectedRequest.setExclusiveStartShardId("0");
+ Assert.assertEquals(expectedRequest, capturedRequest.getValue());
}
private static GetRecordsRequest generateGetRecordsReq(String shardIterator,
int limit)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]