Repository: flink Updated Branches: refs/heads/master 2029c14eb -> f5f4f7a27
[FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour This closes #2822. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5f4f7a2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5f4f7a2 Branch: refs/heads/master Commit: f5f4f7a27a7beca23915e8c6030c76d820fa0dbf Parents: 2029c14 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Thu Nov 17 14:24:24 2016 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 24 18:51:18 2016 +0800 ---------------------------------------------------------------------- .../connectors/kinesis/proxy/KinesisProxy.java | 22 ++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f5f4f7a2/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 1113fde..9ffc8e6 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -34,7 +34,9 @@ import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.Map; @@ -212,7 +214,7 @@ public class KinesisProxy implements KinesisProxyInterface { * {@inheritDoc} */ @Override - public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException { + public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException { GetShardListResult result = new GetShardListResult(); for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) { @@ -227,7 +229,7 @@ public class KinesisProxy implements KinesisProxyInterface { * {@inheritDoc} */ @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; int attempt = 0; @@ -251,7 +253,7 @@ public class KinesisProxy implements KinesisProxyInterface { return getShardIteratorResult.getShardIterator(); } - private List<KinesisStreamShard> getShardsOfStream(String streamName, String lastSeenShardId) throws InterruptedException { + private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); DescribeStreamResult describeStreamResult; @@ -283,7 +285,7 @@ public class KinesisProxy implements KinesisProxyInterface { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException { + private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); @@ -314,6 +316,18 @@ public class KinesisProxy implements KinesisProxyInterface { } } + // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive + // start shard id in the returned shards list; check if we need to remove these erroneously returned shards + if (startShardId != null) { + List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); + Iterator<Shard> shardItr = shards.iterator(); + while (shardItr.hasNext()) { + if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { + shardItr.remove(); + } + } + } + return describeStreamResult; }
