Repository: flink
Updated Branches:
  refs/heads/master 78d9ae9ba -> b7d83899a


[FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions

This closes #2432


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d83899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d83899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d83899

Branch: refs/heads/master
Commit: b7d83899abfe8175b1fc9e526b6afb2ca7a056ed
Parents: 78d9ae9
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Aug 29 17:30:39 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Aug 30 23:50:40 2016 +0800

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |  7 ++
 .../kinesis/internals/ShardConsumer.java        | 74 +++++++++++++++-----
 .../kinesis/util/KinesisConfigUtil.java         | 30 +++++---
 .../kinesis/internals/ShardConsumerTest.java    | 40 +++++++++++
 .../testutils/FakeKinesisBehavioursFactory.java | 66 +++++++++++++++--
 5 files changed, 187 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 28ff3e4..76c20ed 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 
 /**
@@ -128,4 +129,10 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
 
        public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
10000L;
 
+       /**
+        * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
+        * getRecords interval can not exceed 5 minutes, which is the expire 
time for retrieved iterators.
+        */
+       public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 6e24e65..612a4a7 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
@@ -29,6 +30,8 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
@@ -44,6 +47,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ShardConsumer<T> implements Runnable {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(ShardConsumer.class);
+
        private final KinesisDeserializationSchema<T> deserializer;
 
        private final KinesisProxyInterface kinesis;
@@ -133,7 +138,7 @@ public class ShardConsumer<T> implements Runnable {
                                                
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
 
                                        // get only the last aggregated record
-                                       GetRecordsResult getRecordsResult = 
kinesis.getRecords(itrForLastAggregatedRecord, 1);
+                                       GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
 
                                        List<UserRecord> fetchedRecords = 
deaggregateRecords(
                                                getRecordsResult.getRecords(),
@@ -168,7 +173,7 @@ public class ShardConsumer<T> implements Runnable {
                                                
Thread.sleep(fetchIntervalMillis);
                                        }
 
-                                       GetRecordsResult getRecordsResult = 
kinesis.getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+                                       GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
                                        // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
                                        List<UserRecord> fetchedRecords = 
deaggregateRecords(
@@ -199,11 +204,15 @@ public class ShardConsumer<T> implements Runnable {
        }
 
        /**
-        * Deserializes a record for collection, and accordingly updates the 
shard state in the fetcher.
+        * Deserializes a record for collection, and accordingly updates the 
shard state in the fetcher. The last
+        * successfully collected sequence number in this shard consumer is 
also updated so that
+        * {@link ShardConsumer#getRecords(String, int)} may be able to use the 
correct sequence number to refresh shard
+        * iterators if necessary.
+        *
         * Note that the server-side Kinesis timestamp is attached to the 
record when collected. When the
         * user programs uses {@link TimeCharacteristic#EventTime}, this 
timestamp will be used by default.
         *
-        * @param record
+        * @param record record to deserialize and collect
         * @throws IOException
         */
        private void deserializeRecordForCollectionAndUpdateState(UserRecord 
record)
@@ -223,19 +232,52 @@ public class ShardConsumer<T> implements Runnable {
                        subscribedShard.getStreamName(),
                        subscribedShard.getShard().getShardId());
 
-               if (record.isAggregated()) {
-                       fetcherRef.emitRecordAndUpdateState(
-                               value,
-                               approxArrivalTimestamp,
-                               subscribedShardStateIndex,
-                               new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber()));
-               } else {
-                       fetcherRef.emitRecordAndUpdateState(
-                               value,
-                               approxArrivalTimestamp,
-                               subscribedShardStateIndex,
-                               new SequenceNumber(record.getSequenceNumber()));
+               SequenceNumber collectedSequenceNumber = (record.isAggregated())
+                       ? new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber())
+                       : new SequenceNumber(record.getSequenceNumber());
+
+               fetcherRef.emitRecordAndUpdateState(
+                       value,
+                       approxArrivalTimestamp,
+                       subscribedShardStateIndex,
+                       collectedSequenceNumber);
+
+               lastSequenceNum = collectedSequenceNumber;
+       }
+
+       /**
+        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
+        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
+        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
+        * be used for the next call to this method.
+        *
+        * Note: it is important that this method is not called again before 
all the records from the last result have been
+        * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
+        * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
+        * incorrect shard iteration if the iterator had to be refreshed.
+        *
+        * @param shardItr shard iterator to use
+        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
+        * @return get records result
+        * @throws InterruptedException
+        */
+       private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws InterruptedException {
+               GetRecordsResult getRecordsResult = null;
+               while (getRecordsResult == null) {
+                       try {
+                               getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
+                       } catch (ExpiredIteratorException eiEx) {
+                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
+                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
+                               shardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
+
+                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
+                               if (fetchIntervalMillis != 0) {
+                                       Thread.sleep(fetchIntervalMillis);
+                               }
+                       }
                }
+               return getRecordsResult;
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d9d553b..9aa14ad 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
 
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -65,13 +66,13 @@ public class KinesisConfigUtil {
                        "Invalid value given for maximum retry attempts for 
getRecords shard operation. Must be a valid non-negative integer value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
-                       "Invalid value given for get records operation base 
backoff milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for get records operation base 
backoff milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
-                       "Invalid value given for get records operation max 
backoff milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for get records operation max 
backoff milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
-                       "Invalid value given for get records operation backoff 
exponential constant. Must be a valid non-negative double value");
+                       "Invalid value given for get records operation backoff 
exponential constant. Must be a valid non-negative double value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
                        "Invalid value given for getRecords sleep interval in 
milliseconds. Must be a valid non-negative long value.");
@@ -80,25 +81,34 @@ public class KinesisConfigUtil {
                        "Invalid value given for maximum retry attempts for 
getShardIterator shard operation. Must be a valid non-negative integer value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
-                       "Invalid value given for get shard iterator operation 
base backoff milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for get shard iterator operation 
base backoff milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
-                       "Invalid value given for get shard iterator operation 
max backoff milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for get shard iterator operation 
max backoff milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
-                       "Invalid value given for get shard iterator operation 
backoff exponential constant. Must be a valid non-negative double value");
+                       "Invalid value given for get shard iterator operation 
backoff exponential constant. Must be a valid non-negative double value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
-                       "Invalid value given for shard discovery sleep interval 
in milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for shard discovery sleep interval 
in milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
-                       "Invalid value given for describe stream operation base 
backoff milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for describe stream operation base 
backoff milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveLongProperty(config, 
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
-                       "Invalid value given for describe stream operation max 
backoff milliseconds. Must be a valid non-negative long value");
+                       "Invalid value given for describe stream operation max 
backoff milliseconds. Must be a valid non-negative long value.");
 
                validateOptionalPositiveDoubleProperty(config, 
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-                       "Invalid value given for describe stream operation 
backoff exponential constant. Must be a valid non-negative double value");
+                       "Invalid value given for describe stream operation 
backoff exponential constant. Must be a valid non-negative double value.");
+
+               if 
(config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
+                       checkArgument(
+                               
Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
+                                       < 
ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
+                               "Invalid value given for getRecords sleep 
interval in milliseconds. Must be lower than " +
+                                       
ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
+                       );
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 5b3e1a5..96764a4 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -79,4 +79,44 @@ public class ShardConsumerTest {
                        
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
        }
 
+       @Test
+       public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
+               KinesisStreamShard fakeToBeConsumedShard = new 
KinesisStreamShard(
+                       "fakeStream",
+                       new Shard()
+                               
.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+                               .withHashKeyRange(
+                                       new HashKeyRange()
+                                               .withStartingHashKey("0")
+                                               .withEndingHashKey(new 
BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+               LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest = new LinkedList<>();
+               subscribedShardsStateUnderTest.add(
+                       new KinesisStreamShardState(fakeToBeConsumedShard, new 
SequenceNumber("fakeStartingState")));
+
+               TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               Collections.singletonList("fakeStream"),
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               subscribedShardsStateUnderTest,
+                               
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+                               Mockito.mock(KinesisProxyInterface.class));
+
+               new ShardConsumer<>(
+                       fetcher,
+                       0,
+                       
subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+                       
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+                       // Get a total of 1000 records with 9 getRecords() 
calls,
+                       // and the 7th getRecords() call will encounter an 
unexpected expired shard iterator
+                       
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000,
 9, 7)).run();
+
+               assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+               
assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+                       
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index fc98fca..65e6d4e 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -33,13 +34,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Factory for different kinds of fake Kinesis behaviours using the {@link 
KinesisProxyInterface} interface.
  */
 public class FakeKinesisBehavioursFactory {
 
        // 
------------------------------------------------------------------------
-       //  Behaviours related to shard listing and resharding, used in 
ShardDiscovererTest
+       //  Behaviours related to shard listing and resharding, used in 
KinesisDataFetcherTest
        // 
------------------------------------------------------------------------
 
        public static KinesisProxyInterface 
noShardsFoundForRequestedStreamsBehaviour() {
@@ -75,14 +78,69 @@ public class FakeKinesisBehavioursFactory {
        public static KinesisProxyInterface 
totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int 
numOfGetRecordsCalls) {
                return new 
SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
        }
+       
+       public static KinesisProxyInterface 
totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
+               final int numOfRecords, final int numOfGetRecordsCall, final 
int orderOfCallToExpire) {
+               return new 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
+                       numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
+       }
+
+       public static class 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends 
SingleShardEmittingFixNumOfRecordsKinesis {
+
+               private boolean expiredOnceAlready = false;
+               private boolean expiredIteratorRefreshed = false;
+               private final int orderOfCallToExpire;
+
+               public 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(final int 
numOfRecords,
+                                                                               
                                                                        final 
int numOfGetRecordsCalls,
+                                                                               
                                                                        final 
int orderOfCallToExpire) {
+                       super(numOfRecords, numOfGetRecordsCalls);
+                       checkArgument(orderOfCallToExpire <= 
numOfGetRecordsCalls,
+                               "can not test unexpected expired iterator if 
orderOfCallToExpire is larger than numOfGetRecordsCalls");
+                       this.orderOfCallToExpire = orderOfCallToExpire;
+               }
+
+               @Override
+               public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+                       if ((Integer.valueOf(shardIterator) == 
orderOfCallToExpire-1) && !expiredOnceAlready) {
+                               // we fake only once the expired iterator 
exception at the specified get records attempt order
+                               expiredOnceAlready = true;
+                               throw new ExpiredIteratorException("Artificial 
expired shard iterator");
+                       } else if (expiredOnceAlready && 
!expiredIteratorRefreshed) {
+                               // if we've thrown the expired iterator 
exception already, but the iterator was not refreshed,
+                               // throw a hard exception to the test that is 
testing this Kinesis behaviour
+                               throw new RuntimeException("expired shard 
iterator was not refreshed on the next getRecords() call");
+                       } else {
+                               // assuming that the maxRecordsToGet is always 
large enough
+                               return new GetRecordsResult()
+                                       
.withRecords(shardItrToRecordBatch.get(shardIterator))
+                                       .withNextShardIterator(
+                                               (Integer.valueOf(shardIterator) 
== totalNumOfGetRecordsCalls - 1)
+                                                       ? null : 
String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard 
iterator is null
+                       }
+               }
+
+               @Override
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+                       if (!expiredOnceAlready) {
+                               // for the first call, just return the iterator 
of the first batch of records
+                               return "0";
+                       } else {
+                               // fake the iterator refresh when this is 
called again after getRecords throws expired iterator
+                               // exception on the orderOfCallToExpire attempt
+                               expiredIteratorRefreshed = true;
+                               return String.valueOf(orderOfCallToExpire-1);
+                       }
+               }
+       }
 
        private static class SingleShardEmittingFixNumOfRecordsKinesis 
implements KinesisProxyInterface {
 
-               private final int totalNumOfGetRecordsCalls;
+               protected final int totalNumOfGetRecordsCalls;
 
-               private final int totalNumOfRecords;
+               protected final int totalNumOfRecords;
 
-               private final Map<String,List<Record>> shardItrToRecordBatch;
+               protected final Map<String,List<Record>> shardItrToRecordBatch;
 
                public SingleShardEmittingFixNumOfRecordsKinesis(final int 
numOfRecords, final int numOfGetRecordsCalls) {
                        this.totalNumOfRecords = numOfRecords;

Reply via email to