[FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8a5abf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8a5abf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8a5abf

Branch: refs/heads/master
Commit: 8d8a5abfcc4d2452a3ae46f18a3223b66588c191
Parents: a8e85a2
Author: Tony Wei <[email protected]>
Authored: Thu Dec 1 11:40:46 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jan 24 14:20:07 2017 +0800

----------------------------------------------------------------------
 .../kinesis/config/ConsumerConfigConstants.java |  8 ++++-
 .../kinesis/internals/ShardConsumer.java        | 18 ++++++++++
 .../kinesis/model/SentinelSequenceNumber.java   |  4 +++
 .../connectors/kinesis/proxy/KinesisProxy.java  | 36 ++++++++++++++++++--
 .../kinesis/proxy/KinesisProxyInterface.java    |  8 +++--
 .../kinesis/util/KinesisConfigUtil.java         | 30 +++++++++++++++-
 .../kinesis/FlinkKinesisConsumerTest.java       | 32 +++++++++++++++++
 .../testutils/FakeKinesisBehavioursFactory.java |  8 ++---
 8 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 76c20ed..4ffe0ad 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -37,7 +37,10 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
                
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
 
                /** Start reading from the latest incoming record */
-               LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+               LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),
+
+               /** Start reading from the record at the specified timestamp */
+               
AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);
 
                private SentinelSequenceNumber sentinelSequenceNumber;
 
@@ -53,6 +56,9 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
        /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
        public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
 
+       /** The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+       public static final String STREAM_INITIAL_TIMESTAMP = 
"flink.stream.initpos.timestamp";
+
        /** The base backoff time between each describeStream attempt */
        public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 612a4a7..f6c53ce 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,12 +30,15 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 
@@ -64,6 +67,8 @@ public class ShardConsumer<T> implements Runnable {
 
        private SequenceNumber lastSequenceNum;
 
+       private Date initTimestamp;
+
        /**
         * Creates a shard consumer.
         *
@@ -107,6 +112,17 @@ public class ShardConsumer<T> implements Runnable {
                this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(
                        
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
                        
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+                       String timestamp = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+                       try {
+                               this.initTimestamp = 
KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
+                       } catch (ParseException e) {
+                               this.initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
+                       }
+               } else {
+                       this.initTimestamp = null;
+               }
        }
 
        @SuppressWarnings("unchecked")
@@ -128,6 +144,8 @@ public class ShardConsumer<T> implements Runnable {
                                nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
                        } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
                                nextShardItr = null;
+                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
                        } else {
                                // we will be starting from an actual sequence 
number (due to restore from failure).
                                // if the last sequence number refers to an 
aggregated record, we need to clean up any dangling sub-records

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 8182201..7f9dbbb 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -35,6 +35,10 @@ public enum SentinelSequenceNumber {
         * start to be read from the earliest records that haven't expired yet 
*/
        SENTINEL_EARLIEST_SEQUENCE_NUM( new 
SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
 
+       /** Flag value for shard's sequence numbers to indicate that the shard 
should
+        * start to be read from the specified timestamp */
+       SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new 
SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ),
+
        /** Flag value to indicate that we have already read the last record of 
this shard
         * (Note: Kinesis shards that have been closed due to a split or merge 
will have an ending data record) */
        SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new 
SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 0b0fccf..580555f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -29,6 +29,8 @@ import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -42,6 +44,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Map;
 import java.util.Random;
+import java.util.Date;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -234,14 +237,41 @@ public class KinesisProxy implements 
KinesisProxyInterface {
         * {@inheritDoc}
         */
        @Override
-       public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
+       public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable Object startingMarker) throws InterruptedException 
{
+               GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
+                       .withStreamName(shard.getStreamName())
+                       .withShardId(shard.getShard().getShardId())
+                       .withShardIteratorType(shardIteratorType);
+
+               switch (ShardIteratorType.fromValue(shardIteratorType)) {
+                       case TRIM_HORIZON:
+                       case LATEST:
+                               break;
+                       case AT_TIMESTAMP:
+                               if (startingMarker instanceof Date) {
+                                       
getShardIteratorRequest.setTimestamp((Date) startingMarker);
+                               } else {
+                                       throw new 
IllegalArgumentException("Invalid object given for GetShardIteratorRequest() 
when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
+                               }
+                               break;
+                       case AT_SEQUENCE_NUMBER:
+                       case AFTER_SEQUENCE_NUMBER:
+                               if (startingMarker instanceof String) {
+                                       
getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
+                               } else {
+                                       throw new 
IllegalArgumentException("Invalid object given for GetShardIteratorRequest() 
when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be 
a String.");
+                               }
+               }
+               return getShardIterator(getShardIteratorRequest);
+       }
+
+       private String getShardIterator(GetShardIteratorRequest 
getShardIteratorRequest) throws InterruptedException {
                GetShardIteratorResult getShardIteratorResult = null;
 
                int attempt = 0;
                while (attempt <= getShardIteratorMaxAttempts && 
getShardIteratorResult == null) {
                        try {
-                               getShardIteratorResult =
-                                       
kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+                                       getShardIteratorResult = 
kinesisClient.getShardIterator(getShardIteratorRequest);
                        } catch (AmazonServiceException ex) {
                                if (isRecoverableException(ex)) {
                                        long backoffMillis = fullJitterBackoff(

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 39ddc52..9f6d594 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -34,14 +34,16 @@ public interface KinesisProxyInterface {
         *
         * @param shard the shard to get the iterator
         * @param shardIteratorType the iterator type, defining how the shard 
is to be iterated
-        *                          (one of: TRIM_HORIZON, LATEST, 
AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
-        * @param startingSeqNum sequence number, must be null if 
shardIteratorType is TRIM_HORIZON or LATEST
+        *                          (one of: TRIM_HORIZON, LATEST, 
AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+        * @param startingMarker should be {@code null} if shardIteratorType is 
TRIM_HORIZON or LATEST,
+        *                       should be a {@code Date} value if 
shardIteratorType is AT_TIMESTAMP,
+        *                       should be a {@code String} representing the 
sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, 
AFTER_SEQUENCE_NUMBER
         * @return shard iterator which can be used to read data from Kinesis
         * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
         *                              operation has exceeded the rate limit; 
this exception will be thrown
         *                              if the backoff is interrupted.
         */
-       String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) throws InterruptedException;
+       String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, Object startingMarker) throws InterruptedException;
 
        /**
         * Get the next batch of data records using a specific shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d8ea0a2..eb29d78 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -35,6 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
+       public static SimpleDateFormat initTimestampDateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
 
        /**
         * Validate configuration properties for {@link FlinkKinesisConsumer}.
@@ -47,7 +50,7 @@ public class KinesisConfigUtil {
                if 
(config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
                        String initPosType = 
config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
 
-                       // specified initial position in stream must be either 
LATEST or TRIM_HORIZON
+                       // specified initial position in stream must be either 
LATEST, TRIM_HORIZON or AT_TIMESTAMP
                        try {
                                InitialPosition.valueOf(initPosType);
                        } catch (IllegalArgumentException e) {
@@ -57,6 +60,17 @@ public class KinesisConfigUtil {
                                }
                                throw new IllegalArgumentException("Invalid 
initial position in stream set in config. Valid values are: " + sb.toString());
                        }
+
+                       // specified initial timestamp in stream when using 
AT_TIMESTAMP
+                       if (InitialPosition.valueOf(initPosType) == 
InitialPosition.AT_TIMESTAMP) {
+                               if 
(!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+                                       throw new 
IllegalArgumentException("Please set value for initial timestamp ('"
+                                               + 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP 
initial position.");
+                               }
+                               validateOptionalDateProperty(config, 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+                                       "Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
+                                               + "Must be a valid format: 
yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 
2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
+                       }
                }
 
                validateOptionalPositiveIntProperty(config, 
ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
@@ -207,4 +221,18 @@ public class KinesisConfigUtil {
                        }
                }
        }
+
+       private static void validateOptionalDateProperty(Properties config, 
String key, String message) {
+               if (config.containsKey(key)) {
+                       try {
+                               
initTimestampDateFormat.parse(config.getProperty(key));
+                               double value = 
Double.parseDouble(config.getProperty(key));
+                               if (value < 0) {
+                                       throw new NumberFormatException();
+                               }
+                       } catch (ParseException | NumberFormatException e) {
+                               throw new IllegalArgumentException(message);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index a72d8df..2cc0270 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -133,6 +133,38 @@ public class FlinkKinesisConsumerTest {
        }
 
        @Test
+       public void 
testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Please set value for initial timestamp 
('"
+                       + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + 
"') when using AT_TIMESTAMP initial position.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableDateForInitialTimestampInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"unparsableDate");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
        public void 
testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("Invalid value given for describe 
stream operation base backoff milliseconds");

http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 65e6d4e..964ee76 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -54,7 +54,7 @@ public class FakeKinesisBehavioursFactory {
                        }
 
                        @Override
-                       public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorType, String startingSeqNum) {
+                       public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorType, Object startingMarker) {
                                return null;
                        }
 
@@ -121,7 +121,7 @@ public class FakeKinesisBehavioursFactory {
                }
 
                @Override
-               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, Object startingMarker) {
                        if (!expiredOnceAlready) {
                                // for the first call, just return the iterator 
of the first batch of records
                                return "0";
@@ -180,7 +180,7 @@ public class FakeKinesisBehavioursFactory {
                }
 
                @Override
-               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, Object startingMarker) {
                        // this will be called only one time per ShardConsumer;
                        // so, simply return the iterator of the first batch of 
records
                        return "0";
@@ -250,7 +250,7 @@ public class FakeKinesisBehavioursFactory {
                }
 
                @Override
-               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
+               public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, Object startingMarker) {
                        return null;
                }
 

Reply via email to