[FLINK-4523] Allow Kinesis Consumer to start from specific timestamp / Date
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d8a5abf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d8a5abf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d8a5abf Branch: refs/heads/master Commit: 8d8a5abfcc4d2452a3ae46f18a3223b66588c191 Parents: a8e85a2 Author: Tony Wei <[email protected]> Authored: Thu Dec 1 11:40:46 2016 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jan 24 14:20:07 2017 +0800 ---------------------------------------------------------------------- .../kinesis/config/ConsumerConfigConstants.java | 8 ++++- .../kinesis/internals/ShardConsumer.java | 18 ++++++++++ .../kinesis/model/SentinelSequenceNumber.java | 4 +++ .../connectors/kinesis/proxy/KinesisProxy.java | 36 ++++++++++++++++++-- .../kinesis/proxy/KinesisProxyInterface.java | 8 +++-- .../kinesis/util/KinesisConfigUtil.java | 30 +++++++++++++++- .../kinesis/FlinkKinesisConsumerTest.java | 32 +++++++++++++++++ .../testutils/FakeKinesisBehavioursFactory.java | 8 ++--- 8 files changed, 132 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 76c20ed..4ffe0ad 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -37,7 +37,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants { TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), /** Start reading from the latest incoming record */ - LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM), + + /** Start reading from the record at the specified timestamp */ + AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM); private SentinelSequenceNumber sentinelSequenceNumber; @@ -53,6 +56,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; + /** The base backoff time between each describeStream attempt */ public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 612a4a7..f6c53ce 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -30,12 +30,15 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.text.ParseException; +import java.util.Date; import java.util.List; import java.util.Properties; @@ -64,6 +67,8 @@ public class ShardConsumer<T> implements Runnable { private SequenceNumber lastSequenceNum; + private Date initTimestamp; + /** * Creates a shard consumer. * @@ -107,6 +112,17 @@ public class ShardConsumer<T> implements Runnable { this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { + String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); + try { + this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); + } catch (ParseException e) { + this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + } + } else { + this.initTimestamp = null; + } } @SuppressWarnings("unchecked") @@ -128,6 +144,8 @@ public class ShardConsumer<T> implements Runnable { nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { nextShardItr = null; + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp); } else { // we will be starting from an actual sequence number (due to restore from failure). // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java index 8182201..7f9dbbb 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java @@ -35,6 +35,10 @@ public enum SentinelSequenceNumber { * start to be read from the earliest records that haven't expired yet */ SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ), + /** Flag value for shard's sequence numbers to indicate that the shard should + * start to be read from the specified timestamp */ + SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ), + /** Flag value to indicate that we have already read the last record of this shard * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */ SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") ); http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 0b0fccf..580555f 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -29,6 +29,8 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.ShardIteratorType; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -42,6 +44,7 @@ import java.util.List; import java.util.Properties; import java.util.Map; import java.util.Random; +import java.util.Date; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -234,14 +237,41 @@ public class KinesisProxy implements KinesisProxyInterface { * {@inheritDoc} */ @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + if (startingMarker instanceof Date) { + getShardIteratorRequest.setTimestamp((Date) startingMarker); + } else { + throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object."); + } + break; + case AT_SEQUENCE_NUMBER: + case AFTER_SEQUENCE_NUMBER: + if (startingMarker instanceof String) { + getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker); + } else { + throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String."); + } + } + return getShardIterator(getShardIteratorRequest); + } + + private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; int attempt = 0; while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { try { - getShardIteratorResult = - kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); + getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); } catch (AmazonServiceException ex) { if (isRecoverableException(ex)) { long backoffMillis = fullJitterBackoff( http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java index 39ddc52..9f6d594 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java @@ -34,14 +34,16 @@ public interface KinesisProxyInterface { * * @param shard the shard to get the iterator * @param shardIteratorType the iterator type, defining how the shard is to be iterated - * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) - * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST + * (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) + * @param startingMarker should be {@code null} if shardIteratorType is TRIM_HORIZON or LATEST, + * should be a {@code Date} value if shardIteratorType is AT_TIMESTAMP, + * should be a {@code String} representing the sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER * @return shard iterator which can be used to read data from Kinesis * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the * operation has exceeded the rate limit; this exception will be thrown * if the backoff is interrupted. */ - String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException; + String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) throws InterruptedException; /** * Get the next batch of data records using a specific shard iterator http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index d8ea0a2..eb29d78 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -26,6 +26,8 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkArgument; @@ -35,6 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Utilities for Flink Kinesis connector configuration. */ public class KinesisConfigUtil { + public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); /** * Validate configuration properties for {@link FlinkKinesisConsumer}. @@ -47,7 +50,7 @@ public class KinesisConfigUtil { if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) { String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION); - // specified initial position in stream must be either LATEST or TRIM_HORIZON + // specified initial position in stream must be either LATEST, TRIM_HORIZON or AT_TIMESTAMP try { InitialPosition.valueOf(initPosType); } catch (IllegalArgumentException e) { @@ -57,6 +60,17 @@ public class KinesisConfigUtil { } throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString()); } + + // specified initial timestamp in stream when using AT_TIMESTAMP + if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) { + if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) { + throw new IllegalArgumentException("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + } + validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); + } } validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX, @@ -207,4 +221,18 @@ public class KinesisConfigUtil { } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + initTimestampDateFormat.parse(config.getProperty(key)); + double value = Double.parseDouble(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (ParseException | NumberFormatException e) { + throw new IllegalArgumentException(message); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index a72d8df..2cc0270 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -133,6 +133,38 @@ public class FlinkKinesisConsumerTest { } @Test + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); http://git-wip-us.apache.org/repos/asf/flink/blob/8d8a5abf/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java index 65e6d4e..964ee76 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java @@ -54,7 +54,7 @@ public class FakeKinesisBehavioursFactory { } @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) { return null; } @@ -121,7 +121,7 @@ public class FakeKinesisBehavioursFactory { } @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) { if (!expiredOnceAlready) { // for the first call, just return the iterator of the first batch of records return "0"; @@ -180,7 +180,7 @@ public class FakeKinesisBehavioursFactory { } @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) { // this will be called only one time per ShardConsumer; // so, simply return the iterator of the first batch of records return "0"; @@ -250,7 +250,7 @@ public class FakeKinesisBehavioursFactory { } @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) { return null; }
