Repository: flink
Updated Branches:
  refs/heads/master 2029c14eb -> f5f4f7a27


[FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API 
behaviour

This closes #2822.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5f4f7a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5f4f7a2

Branch: refs/heads/master
Commit: f5f4f7a27a7beca23915e8c6030c76d820fa0dbf
Parents: 2029c14
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Thu Nov 17 14:24:24 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 24 18:51:18 2016 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  | 22 ++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5f4f7a2/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 1113fde..9ffc8e6 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -34,7 +34,9 @@ import 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Map;
@@ -212,7 +214,7 @@ public class KinesisProxy implements KinesisProxyInterface {
         * {@inheritDoc}
         */
        @Override
-       public GetShardListResult getShardList(Map<String,String> 
streamNamesWithLastSeenShardIds) throws InterruptedException {
+       public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSeenShardIds) throws InterruptedException {
                GetShardListResult result = new GetShardListResult();
 
                for (Map.Entry<String,String> streamNameWithLastSeenShardId : 
streamNamesWithLastSeenShardIds.entrySet()) {
@@ -227,7 +229,7 @@ public class KinesisProxy implements KinesisProxyInterface {
         * {@inheritDoc}
         */
        @Override
-       public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) throws InterruptedException {
+       public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
                GetShardIteratorResult getShardIteratorResult = null;
 
                int attempt = 0;
@@ -251,7 +253,7 @@ public class KinesisProxy implements KinesisProxyInterface {
                return getShardIteratorResult.getShardIterator();
        }
 
-       private List<KinesisStreamShard> getShardsOfStream(String streamName, 
String lastSeenShardId) throws InterruptedException {
+       private List<KinesisStreamShard> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
                List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
 
                DescribeStreamResult describeStreamResult;
@@ -283,7 +285,7 @@ public class KinesisProxy implements KinesisProxyInterface {
         * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
         * @return the result of the describe stream operation
         */
-       private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
+       private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
                final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
                describeStreamRequest.setStreamName(streamName);
                describeStreamRequest.setExclusiveStartShardId(startShardId);
@@ -314,6 +316,18 @@ public class KinesisProxy implements KinesisProxyInterface 
{
                        }
                }
 
+               // Kinesalite (mock implementation of Kinesis) does not 
correctly exclude shards before the exclusive
+               // start shard id in the returned shards list; check if we need 
to remove these erroneously returned shards
+               if (startShardId != null) {
+                       List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
+                       Iterator<Shard> shardItr = shards.iterator();
+                       while (shardItr.hasNext()) {
+                               if 
(KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) 
<= 0) {
+                                       shardItr.remove();
+                               }
+                       }
+               }
+
                return describeStreamResult;
        }
 

Reply via email to