This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.1-incubating by this push:
     new a19fc5f  [Backport] Kinesis: Fix getPartitionIds, should be checking 
isHasMoreShards. (#8174)
a19fc5f is described below

commit a19fc5f1ab40f6dcc87dd788c5c2468f2885afcc
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Jul 29 15:41:55 2019 -0700

    [Backport] Kinesis: Fix getPartitionIds, should be checking 
isHasMoreShards. (#8174)
    
    * Kinesis: Fix getPartitionIds, should be checking isHasMoreShards. (#7830)
    
    * fix merge
---
 .../indexing/kinesis/KinesisRecordSupplier.java    | 78 ++++++++++++++--------
 .../kinesis/KinesisRecordSupplierTest.java         | 35 +++++++---
 2 files changed, 77 insertions(+), 36 deletions(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index a05eade..56b7d72 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -26,6 +26,8 @@ import 
com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -35,11 +37,13 @@ import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
 import 
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.amazonaws.util.AwsHostNameUtils;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Queues;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.common.aws.AWSCredentialsUtils;
@@ -61,12 +65,14 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
@@ -579,12 +585,31 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   @Override
   public Set<String> getPartitionIds(String stream)
   {
-    checkIfClosed();
-    return kinesis.describeStream(stream)
-                  .getStreamDescription()
-                  .getShards()
-                  .stream()
-                  .map(Shard::getShardId).collect(Collectors.toSet());
+    return wrapExceptions(
+        () -> {
+          final Set<String> retVal = new HashSet<>();
+          DescribeStreamRequest request = new DescribeStreamRequest();
+          request.setStreamName(stream);
+
+          while (request != null) {
+            final DescribeStreamResult result = 
kinesis.describeStream(request);
+            final StreamDescription streamDescription = 
result.getStreamDescription();
+            final List<Shard> shards = streamDescription.getShards();
+
+            for (Shard shard : shards) {
+              retVal.add(shard.getShardId());
+            }
+
+            if (streamDescription.isHasMoreShards()) {
+              
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
+            } else {
+              request = null;
+            }
+          }
+
+          return retVal;
+        }
+    );
   }
 
   @Override
@@ -624,12 +649,12 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
         sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
     );
 
-    resource.shardIterator = kinesis.getShardIterator(
+    resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
         partition.getStream(),
         partition.getPartitionId(),
         iteratorEnum.toString(),
         sequenceNumber
-    ).getShardIterator();
+    ).getShardIterator());
 
     checkPartitionsStarted = true;
   }
@@ -655,10 +680,10 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
 
     // filter records in buffer and only retain ones whose partition was not 
seeked
     BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new 
LinkedBlockingQueue<>(recordBufferSize);
-    records
-        .stream()
-        .filter(x -> !partitions.contains(x.getStreamPartition()))
-        .forEachOrdered(newQ::offer);
+
+    records.stream()
+           .filter(x -> !partitions.contains(x.getStreamPartition()))
+           .forEachOrdered(newQ::offer);
 
     records = newQ;
 
@@ -670,20 +695,11 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   @Nullable
   private String getSequenceNumberInternal(StreamPartition<String> partition, 
ShardIteratorType iteratorEnum)
   {
-
-    String shardIterator = null;
-    try {
-      shardIterator = kinesis.getShardIterator(
-          partition.getStream(),
-          partition.getPartitionId(),
-          iteratorEnum.toString()
-      ).getShardIterator();
-    }
-    catch (ResourceNotFoundException e) {
-      log.warn(e, "Caught ResourceNotFoundException while getting 
shardIterator");
-    }
-
-    return getSequenceNumberInternal(partition, shardIterator);
+    return wrapExceptions(() -> getSequenceNumberInternal(
+        partition,
+        kinesis.getShardIterator(partition.getStream(), 
partition.getPartitionId(), iteratorEnum.toString())
+               .getShardIterator()
+    ));
   }
 
   @Nullable
@@ -774,6 +790,16 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
     }
   }
 
+  private static <T> T wrapExceptions(Callable<T> callable)
+  {
+    try {
+      return callable.call();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @VisibleForTesting
   public int bufferSize()
   {
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index bd1fbe8..6dd7de9 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -65,12 +66,14 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   private static String shard1Iterator = "1";
   private static String shard0Iterator = "0";
   private static AmazonKinesis kinesis;
-  private static DescribeStreamResult describeStreamResult;
+  private static DescribeStreamResult describeStreamResult0;
+  private static DescribeStreamResult describeStreamResult1;
   private static GetShardIteratorResult getShardIteratorResult0;
   private static GetShardIteratorResult getShardIteratorResult1;
   private static GetRecordsResult getRecordsResult0;
   private static GetRecordsResult getRecordsResult1;
-  private static StreamDescription streamDescription;
+  private static StreamDescription streamDescription0;
+  private static StreamDescription streamDescription1;
   private static Shard shard0;
   private static Shard shard1;
   private static KinesisRecordSupplier recordSupplier;
@@ -142,12 +145,14 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   public void setupTest()
   {
     kinesis = createMock(AmazonKinesisClient.class);
-    describeStreamResult = createMock(DescribeStreamResult.class);
+    describeStreamResult0 = createMock(DescribeStreamResult.class);
+    describeStreamResult1 = createMock(DescribeStreamResult.class);
     getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
     getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
     getRecordsResult0 = createMock(GetRecordsResult.class);
     getRecordsResult1 = createMock(GetRecordsResult.class);
-    streamDescription = createMock(StreamDescription.class);
+    streamDescription0 = createMock(StreamDescription.class);
+    streamDescription1 = createMock(StreamDescription.class);
     shard0 = createMock(Shard.class);
     shard1 = createMock(Shard.class);
     recordsPerFetch = 1;
@@ -163,11 +168,17 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   @Test
   public void testSupplierSetup()
   {
-    Capture<String> captured = Capture.newInstance();
-    
expect(kinesis.describeStream(capture(captured))).andReturn(describeStreamResult).once();
-    
expect(describeStreamResult.getStreamDescription()).andReturn(streamDescription).once();
-    expect(streamDescription.getShards()).andReturn(ImmutableList.of(shard0, 
shard1)).once();
-    expect(shard0.getShardId()).andReturn(shardId0).once();
+    final Capture<DescribeStreamRequest> capturedRequest = 
Capture.newInstance();
+
+    
expect(kinesis.describeStream(capture(capturedRequest))).andReturn(describeStreamResult0).once();
+    
expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
+    
expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once();
+    expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
+    expect(shard0.getShardId()).andReturn(shardId0).times(2);
+    
expect(kinesis.describeStream(anyObject(DescribeStreamRequest.class))).andReturn(describeStreamResult1).once();
+    
expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
+    
expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once();
+    expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
     expect(shard1.getShardId()).andReturn(shardId1).once();
 
     replayAll();
@@ -199,7 +210,11 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
 
     verifyAll();
-    Assert.assertEquals(stream, captured.getValue());
+
+    final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
+    expectedRequest.setStreamName(stream);
+    expectedRequest.setExclusiveStartShardId("0");
+    Assert.assertEquals(expectedRequest, capturedRequest.getValue());
   }
 
   private static GetRecordsRequest generateGetRecordsReq(String shardIterator, 
int limit)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to