asfgit closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..3c5e3c7303f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,22 @@
  * A custom assigner implementation can be set via {@link 
#setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * <p>In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto 
watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * <p>Watermarks can only advance when all shards of a subtask continuously 
deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle 
timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards 
won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from 
all shards.
+ *
+ * <p>Note that re-sharding of the Kinesis stream while an application (that 
relies on
+ * the Kinesis records for watermarking) is running can lead to incorrect late 
events.
+ * This depends on how shards are assigned to subtasks and applies regardless 
of whether watermarks
+ * are generated in the source or a downstream operator.
+ *
  * @param <T> the type of data emitted
  */
 @PublicEvolving
@@ -108,6 +125,8 @@
         */
        private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
+       private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
        // 
------------------------------------------------------------------------
        //  Runtime state
        // 
------------------------------------------------------------------------
@@ -213,13 +232,28 @@ public KinesisShardAssigner getShardAssigner() {
 
        /**
         * Provide a custom assigner to influence how shards are distributed 
over subtasks.
-        * @param shardAssigner
+        * @param shardAssigner shard assigner
         */
        public void setShardAssigner(KinesisShardAssigner shardAssigner) {
                this.shardAssigner = checkNotNull(shardAssigner, "function can 
not be null");
                ClosureCleaner.clean(shardAssigner, true);
        }
 
+       public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() 
{
+               return periodicWatermarkAssigner;
+       }
+
+       /**
+        * Set the assigner that will extract the timestamp from {@link T} and 
calculate the
+        * watermark.
+        * @param periodicWatermarkAssigner periodic watermark assigner
+        */
+       public void setPeriodicWatermarkAssigner(
+               AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+               this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+               ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+       }
+
        // 
------------------------------------------------------------------------
        //  Source life cycle
        // 
------------------------------------------------------------------------
@@ -414,7 +448,7 @@ public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
                        Properties configProps,
                        KinesisDeserializationSchema<T> deserializationSchema) {
 
-               return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner);
+               return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner);
        }
 
        @VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 443b19ec382..42e2173474b 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -140,6 +140,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
        /** The config to turn on adaptive reads from a shard. */
        public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.adaptivereads";
 
+       /** The interval after which to consider a shard idle for purposes of 
watermark generation. */
+       public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
        // 
------------------------------------------------------------------------
        //  Default values for consumer configuration
@@ -190,6 +192,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
        public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
 
+       public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
+
        /**
         * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
         * getRecords interval can not exceed 5 minutes, which is the expire 
time for retrieved iterators.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 0981b76ce89..77ca23c9d37 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,7 +21,10 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
@@ -35,7 +38,10 @@
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -51,6 +57,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -182,6 +189,28 @@
 
        private volatile boolean running = true;
 
+       private final AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner;
+
+       /**
+        * The watermark related state for each shard consumer. Entries in this 
map will be created when shards
+        * are discovered. After recovery, this shard map will be recreated, 
possibly with different shard index keys,
+        * since those are transient and not part of checkpointed state.
+        */
+       private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks 
= new ConcurrentHashMap<>();
+
+       /**
+        * The most recent watermark, calculated from the per shard watermarks. 
The initial value will never be emitted and
+        * also apply after recovery. The fist watermark that will be emitted 
is derived from actually consumed records.
+        * In case of recovery and replay, the watermark will rewind, 
consistent wth the shard consumer sequence.
+        */
+       private long lastWatermark = Long.MIN_VALUE;
+
+       /**
+        * The time span since last consumed record, after which a shard will 
be considered idle for purpose of watermark
+        * calculation. A positive value will allow the watermark to progress 
even when some shards don't receive new records.
+        */
+       private long shardIdleIntervalMillis = 
ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS;
+
        /**
         * Factory to create Kinesis proxy instances used by a fetcher.
         */
@@ -203,7 +232,8 @@ public KinesisDataFetcher(List<String> streams,
                                                        RuntimeContext 
runtimeContext,
                                                        Properties configProps,
                                                        
KinesisDeserializationSchema<T> deserializationSchema,
-                                                       KinesisShardAssigner 
shardAssigner) {
+                                                       KinesisShardAssigner 
shardAssigner,
+                                                       
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
                this(streams,
                        sourceContext,
                        sourceContext.getCheckpointLock(),
@@ -211,6 +241,7 @@ public KinesisDataFetcher(List<String> streams,
                        configProps,
                        deserializationSchema,
                        shardAssigner,
+                       periodicWatermarkAssigner,
                        new AtomicReference<>(),
                        new ArrayList<>(),
                        
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -225,6 +256,7 @@ protected KinesisDataFetcher(List<String> streams,
                                                                Properties 
configProps,
                                                                
KinesisDeserializationSchema<T> deserializationSchema,
                                                                
KinesisShardAssigner shardAssigner,
+                                                               
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
                                                                
AtomicReference<Throwable> error,
                                                                
List<KinesisStreamShardState> subscribedShardsState,
                                                                HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
@@ -238,6 +270,7 @@ protected KinesisDataFetcher(List<String> streams,
                this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
                this.deserializationSchema = 
checkNotNull(deserializationSchema);
                this.shardAssigner = checkNotNull(shardAssigner);
+               this.periodicWatermarkAssigner = periodicWatermarkAssigner;
                this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
                this.kinesis = kinesisProxyFactory.create(configProps);
 
@@ -339,6 +372,19 @@ public void runFetcher() throws Exception {
                        }
                }
 
+        // start periodic watermark emitter, if a watermark assigner was 
configured
+               if (periodicWatermarkAssigner != null) {
+                       long periodicWatermarkIntervalMillis = 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+                       if (periodicWatermarkIntervalMillis > 0) {
+                               ProcessingTimeService timerService = 
((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
+                               LOG.info("Starting periodic watermark emitter 
with interval {}", periodicWatermarkIntervalMillis);
+                               new PeriodicWatermarkEmitter(timerService, 
periodicWatermarkIntervalMillis).start();
+                       }
+                       this.shardIdleIntervalMillis = Long.parseLong(
+                               
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
+                                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+               }
+
                // 
------------------------------------------------------------------------
 
                // finally, start the infinite shard discovery and consumer 
launching loop;
@@ -546,6 +592,18 @@ protected Properties getConsumerConfiguration() {
         * @param lastSequenceNumber the last sequence number value to update
         */
        protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
+               // track per shard watermarks and emit timestamps extracted 
from the record,
+               // when a watermark assigner was configured.
+               if (periodicWatermarkAssigner != null) {
+                       ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+                       Preconditions.checkNotNull(
+                               sws, "shard watermark state initialized in 
registerNewSubscribedShardState");
+                       recordTimestamp =
+                               
sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
+                       sws.lastRecordTimestamp = recordTimestamp;
+                       sws.lastUpdated = getCurrentTimeMillis();
+               }
+
                synchronized (checkpointLock) {
                        if (record != null) {
                                sourceContext.collectWithTimestamp(record, 
recordTimestamp);
@@ -609,7 +667,110 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
                                this.numberOfActiveShards.incrementAndGet();
                        }
 
-                       return subscribedShardsState.size() - 1;
+                       int shardStateIndex = subscribedShardsState.size() - 1;
+
+                       // track all discovered shards for watermark 
determination
+                       ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+                       if (sws == null) {
+                               sws = new ShardWatermarkState();
+                               try {
+                                       sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+                               } catch (Exception e) {
+                                       throw new RuntimeException("Failed to 
instantiate new WatermarkAssigner", e);
+                               }
+                               sws.lastUpdated = getCurrentTimeMillis();
+                               sws.lastRecordTimestamp = Long.MIN_VALUE;
+                               shardWatermarks.put(shardStateIndex, sws);
+                       }
+
+                       return shardStateIndex;
+               }
+       }
+
+       /**
+        * Return the current system time. Allow tests to override this to 
simulate progress for watermark
+        * logic.
+        *
+        * @return current processing time
+        */
+       @VisibleForTesting
+       protected long getCurrentTimeMillis() {
+               return System.currentTimeMillis();
+       }
+
+       /**
+        * Called periodically to emit a watermark. Checks all shards for the 
current event time
+        * watermark, and possibly emits the next watermark.
+        *
+        * <p>Shards that have not received an update for a certain interval 
are considered inactive so as
+        * to not hold back the watermark indefinitely. When all shards are 
inactive, the subtask will be
+        * marked as temporarily idle to not block downstream operators.
+        */
+       @VisibleForTesting
+       protected void emitWatermark() {
+               LOG.debug("Evaluating watermark for subtask {} time {}", 
indexOfThisConsumerSubtask, getCurrentTimeMillis());
+               long potentialWatermark = Long.MAX_VALUE;
+               long idleTime =
+                       (shardIdleIntervalMillis > 0)
+                               ? getCurrentTimeMillis() - 
shardIdleIntervalMillis
+                               : Long.MAX_VALUE;
+
+               for (Map.Entry<Integer, ShardWatermarkState> e : 
shardWatermarks.entrySet()) {
+                       // consider only active shards, or those that would 
advance the watermark
+                       Watermark w = 
e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+                       if (w != null && (e.getValue().lastUpdated >= idleTime 
|| w.getTimestamp() > lastWatermark)) {
+                               potentialWatermark = 
Math.min(potentialWatermark, w.getTimestamp());
+                       }
+               }
+
+               // advance watermark if possible (watermarks can only be 
ascending)
+               if (potentialWatermark == Long.MAX_VALUE) {
+                       if (shardWatermarks.isEmpty() || 
shardIdleIntervalMillis > 0) {
+                               LOG.debug("No active shard for subtask {}, 
marking the source idle.",
+                                       indexOfThisConsumerSubtask);
+                               // no active shard, signal downstream operators 
to not wait for a watermark
+                               sourceContext.markAsTemporarilyIdle();
+                       }
+               } else if (potentialWatermark > lastWatermark) {
+                       LOG.debug("Emitting watermark {} from subtask {}",
+                               potentialWatermark,
+                               indexOfThisConsumerSubtask);
+                       sourceContext.emitWatermark(new 
Watermark(potentialWatermark));
+                       lastWatermark = potentialWatermark;
+               }
+       }
+
+       /** Per shard tracking of watermark and last activity. */
+       private static class ShardWatermarkState<T> {
+               private AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner;
+               private volatile long lastRecordTimestamp;
+               private volatile long lastUpdated;
+       }
+
+       /**
+        * The periodic watermark emitter. In its given interval, it checks all 
shards for the current
+        * event time watermark, and possibly emits the next watermark.
+        */
+       private class PeriodicWatermarkEmitter implements 
ProcessingTimeCallback {
+
+               private final ProcessingTimeService timerService;
+               private final long interval;
+
+               PeriodicWatermarkEmitter(ProcessingTimeService timerService, 
long autoWatermarkInterval) {
+                       this.timerService = checkNotNull(timerService);
+                       this.interval = autoWatermarkInterval;
+               }
+
+               public void start() {
+                       LOG.debug("registering periodic watermark timer with 
interval {}", interval);
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
+               }
+
+               @Override
+               public void onProcessingTime(long timestamp) {
+                       emitWatermark();
+                       // schedule the next watermark
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index a99e845f249..9a6d2d66a6f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public TestFetcher(
                                HashMap<StreamShardMetadata, SequenceNumber> 
testStateSnapshot,
                                List<StreamShardHandle> 
testInitialDiscoveryShards) {
 
-                       super(streams, sourceContext, runtimeContext, 
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER);
+                       super(streams, sourceContext, runtimeContext, 
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
 
                        this.testStateSnapshot = testStateSnapshot;
                        this.testInitialDiscoveryShards = 
testInitialDiscoveryShards;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index c18f4309be4..84e18bdc98c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
@@ -32,7 +31,13 @@
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -41,10 +46,16 @@
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -59,16 +70,21 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -243,8 +259,7 @@ public void testListStateChangedAfterSnapshotState() throws 
Exception {
        @Test
        @SuppressWarnings("unchecked")
        public void 
testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws 
Exception {
-               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
-               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+               KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 
                // assume the given config is correct
                PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -286,11 +301,10 @@ public void 
testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws
                // mock fetcher
                // 
----------------------------------------------------------------------
 
-               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
+               KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
                List<StreamShardHandle> shards = new ArrayList<>();
                shards.addAll(fakeRestoredState.keySet());
                
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
                // assume the given config is correct
                PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -348,11 +362,10 @@ public void 
testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exceptio
                // mock fetcher
                // 
----------------------------------------------------------------------
 
-               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
+               KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
                List<StreamShardHandle> shards = new ArrayList<>();
                shards.addAll(fakeRestoredState.keySet());
                
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
                // assume the given config is correct
                PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -441,13 +454,12 @@ public void 
testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShar
                // mock fetcher
                // 
----------------------------------------------------------------------
 
-               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
+               KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
                List<StreamShardHandle> shards = new ArrayList<>();
                shards.addAll(fakeRestoredState.keySet());
                shards.add(new StreamShardHandle("fakeStream2",
                        new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
                
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
                // assume the given config is correct
                PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -553,7 +565,7 @@ public void 
testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
                // mock fetcher
                // 
----------------------------------------------------------------------
 
-               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
+               KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
                List<StreamShardHandle> shards = new ArrayList<>();
 
                // create a fake stream shard handle based on the first entry 
in the restored state
@@ -567,7 +579,6 @@ public void 
testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheSt
                shards.add(closedStreamShardHandle);
 
                
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
-               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
 
                // assume the given config is correct
                PowerMockito.mockStatic(KinesisConfigUtil.class);
@@ -664,34 +675,166 @@ public void addAll(List<T> values) throws Exception {
                return fakeRestoredState;
        }
 
-       /**
-        * A non-serializable {@link KinesisDeserializationSchema} (because it 
is a nested class with reference
-        * to the enclosing class, which is not serializable) used for testing.
-        */
-       private final class NonSerializableDeserializationSchema implements 
KinesisDeserializationSchema<String> {
-               @Override
-               public String deserialize(byte[] recordValue, String 
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String 
shardId) throws IOException {
-                       return new String(recordValue);
-               }
+       private static KinesisDataFetcher mockKinesisDataFetcher() throws 
Exception {
+               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
 
-               @Override
-               public TypeInformation<String> getProducedType() {
-                       return BasicTypeInfo.STRING_TYPE_INFO;
+               java.lang.reflect.Constructor<KinesisDataFetcher> ctor = 
(java.lang.reflect.Constructor<KinesisDataFetcher>) 
KinesisDataFetcher.class.getConstructors()[0];
+               Class<?>[] otherParamTypes = new 
Class<?>[ctor.getParameterTypes().length - 1];
+               System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 
0, ctor.getParameterTypes().length - 1);
+
+               Supplier<Object[]> argumentSupplier = () -> {
+                       Object[] otherParamArgs = new 
Object[otherParamTypes.length];
+                       for (int i = 0; i < otherParamTypes.length; i++) {
+                               otherParamArgs[i] = 
Mockito.nullable(otherParamTypes[i]);
+                       }
+                       return otherParamArgs;
+               };
+               
PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
+                       argumentSupplier.get()).thenReturn(mockedFetcher);
+               return mockedFetcher;
+       }
+
+       @Test
+       public void testPeriodicWatermark() throws Exception {
+
+               String streamName = "fakeStreamName";
+               Time maxOutOfOrderness = Time.milliseconds(5);
+               long autoWatermarkInterval = 1_000;
+
+               HashMap<String, String> 
subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
+               subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
+
+               KinesisDeserializationSchema<String> deserializationSchema = 
new KinesisDeserializationSchemaWrapper<>(
+                       new SimpleStringSchema());
+               Properties props = new Properties();
+               props.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, 
Long.toString(10L));
+
+               BlockingQueue<String> shard1 = new LinkedBlockingQueue();
+               BlockingQueue<String> shard2 = new LinkedBlockingQueue();
+
+               Map<String, List<BlockingQueue<String>>> streamToQueueMap = new 
HashMap<>();
+               streamToQueueMap.put(streamName, Lists.newArrayList(shard1, 
shard2));
+
+               // override createFetcher to mock Kinesis
+               FlinkKinesisConsumer<String> sourceFunc =
+                       new FlinkKinesisConsumer<String>(streamName, 
deserializationSchema, props) {
+                               @Override
+                               protected KinesisDataFetcher<String> 
createFetcher(
+                                       List<String> streams,
+                                       SourceContext<String> sourceContext,
+                                       RuntimeContext runtimeContext,
+                                       Properties configProps,
+                                       KinesisDeserializationSchema<String> 
deserializationSchema) {
+
+                                       KinesisDataFetcher<String> fetcher =
+                                               new KinesisDataFetcher<String>(
+                                                       streams,
+                                                       sourceContext,
+                                                       
sourceContext.getCheckpointLock(),
+                                                       runtimeContext,
+                                                       configProps,
+                                                       deserializationSchema,
+                                                       getShardAssigner(),
+                                                       
getPeriodicWatermarkAssigner(),
+                                                       new AtomicReference<>(),
+                                                       new ArrayList<>(),
+                                                       
subscribedStreamsToLastDiscoveredShardIds,
+                                                       (props) -> 
FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap)
+                                                       ) {};
+                                       return fetcher;
+                               }
+                       };
+
+               sourceFunc.setShardAssigner(
+                       (streamShardHandle, i) -> {
+                               // shardId-000000000000
+                               return Integer.parseInt(
+                                       
streamShardHandle.getShard().getShardId().substring("shardId-".length()));
+                       });
+
+               sourceFunc.setPeriodicWatermarkAssigner(new 
TestTimestampExtractor(maxOutOfOrderness));
+
+               // there is currently no test harness specifically for sources,
+               // so we overlay the source thread here
+               AbstractStreamOperatorTestHarness<Object> testHarness =
+                       new AbstractStreamOperatorTestHarness<Object>(
+                               new StreamSource(sourceFunc), 1, 1, 0);
+               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+               
testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
+
+               testHarness.initializeState(null);
+               testHarness.open();
+
+               ConcurrentLinkedQueue<Watermark> watermarks = new 
ConcurrentLinkedQueue<>();
+
+               @SuppressWarnings("unchecked")
+               SourceFunction.SourceContext<String> sourceContext = new 
CollectingSourceContext(
+                       testHarness.getCheckpointLock(), 
testHarness.getOutput()) {
+                       @Override
+                       public void emitWatermark(Watermark mark) {
+                               watermarks.add(mark);
+                       }
+               };
+
+               new Thread(
+                       () -> {
+                               try {
+                                       sourceFunc.run(sourceContext);
+                               } catch (InterruptedException e) {
+                                       // expected on cancel
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       })
+                       .start();
+
+               shard1.put("1");
+               shard1.put("2");
+               shard2.put("10");
+               int recordCount = 3;
+               int watermarkCount = 0;
+               awaitRecordCount(testHarness.getOutput(), recordCount);
+
+               // trigger watermark emit
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
autoWatermarkInterval);
+               watermarkCount++;
+
+               // advance watermark
+               shard1.put("10");
+               recordCount++;
+               awaitRecordCount(testHarness.getOutput(), recordCount);
+
+               // trigger watermark emit
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
autoWatermarkInterval);
+               watermarkCount++;
+
+               sourceFunc.cancel();
+               testHarness.close();
+
+               assertEquals("record count", recordCount, 
testHarness.getOutput().size());
+               assertEquals("watermark count", watermarkCount, 
watermarks.size());
+               assertThat(watermarks, org.hamcrest.Matchers.contains(new 
Watermark(-3), new Watermark(5)));
+       }
+
+       private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> 
queue, int count) throws Exception {
+               long timeoutMillis = System.currentTimeMillis() + 10_000;
+               while (System.currentTimeMillis() < timeoutMillis && 
queue.size() < count) {
+                       Thread.sleep(10);
                }
        }
 
-       /**
-        * A static, serializable {@link KinesisDeserializationSchema}.
-        */
-       private static final class SerializableDeserializationSchema implements 
KinesisDeserializationSchema<String> {
-               @Override
-               public String deserialize(byte[] recordValue, String 
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String 
shardId) throws IOException {
-                       return new String(recordValue);
+       private static class TestTimestampExtractor extends 
BoundedOutOfOrdernessTimestampExtractor<String> {
+               private static final long serialVersionUID = 1L;
+
+               public TestTimestampExtractor(Time maxAllowedLateness) {
+                       super(maxAllowedLateness);
                }
 
                @Override
-               public TypeInformation<String> getProducedType() {
-                       return BasicTypeInfo.STRING_TYPE_INFO;
+               public long extractTimestamp(String element) {
+                       return Long.parseLong(element);
                }
        }
+
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index ccf39d0e19b..e3e7287f24a 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -21,6 +21,9 @@
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -40,9 +43,13 @@
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -710,4 +717,98 @@ public void testIsThisSubtaskShouldSubscribeTo() {
                
assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
        }
 
+       private static BoundedOutOfOrdernessTimestampExtractor<String> 
watermarkAssigner =
+               new 
BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
+                       @Override
+                       public long extractTimestamp(String element) {
+                               return Long.parseLong(element);
+                       }
+               };
+
+       @Test
+       public void testPeriodicWatermark() {
+               final MutableLong clock = new MutableLong();
+               final MutableBoolean isTemporaryIdle = new MutableBoolean();
+               final List<Watermark> watermarks = new ArrayList<>();
+
+               String fakeStream1 = "fakeStream1";
+               StreamShardHandle shardHandle =
+                       new StreamShardHandle(
+                               fakeStream1,
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
+
+               TestSourceContext<String> sourceContext =
+                       new TestSourceContext<String>() {
+                               @Override
+                               public void emitWatermark(Watermark mark) {
+                                       watermarks.add(mark);
+                               }
+
+                               @Override
+                               public void markAsTemporarilyIdle() {
+                                       isTemporaryIdle.setTrue();
+                               }
+                       };
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
+
+               final KinesisDataFetcher<String> fetcher =
+                       new TestableKinesisDataFetcher<String>(
+                               Collections.singletonList(fakeStream1),
+                               sourceContext,
+                               new java.util.Properties(),
+                               new KinesisDeserializationSchemaWrapper<>(new 
org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
+                               1,
+                               1,
+                               new AtomicReference<>(),
+                               new LinkedList<>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
+
+                               @Override
+                               protected long getCurrentTimeMillis() {
+                                       return clock.getValue();
+                               }
+                       };
+               Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", 
watermarkAssigner);
+
+               SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
+               // register shards to subsequently emit records
+               int shardIndex =
+                       fetcher.registerNewSubscribedShardState(
+                               new KinesisStreamShardState(
+                                       
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, 
seq));
+
+               StreamRecord<String> record1 =
+                       new StreamRecord<>(String.valueOf(Long.MIN_VALUE), 
Long.MIN_VALUE);
+               fetcher.emitRecordAndUpdateState(record1.getValue(), 
record1.getTimestamp(), shardIndex, seq);
+               Assert.assertEquals(record1, 
sourceContext.getCollectedOutputs().poll());
+
+               fetcher.emitWatermark();
+               Assert.assertTrue("potential watermark equals previous 
watermark", watermarks.isEmpty());
+
+               StreamRecord<String> record2 = new 
StreamRecord<>(String.valueOf(1), 1);
+               fetcher.emitRecordAndUpdateState(record2.getValue(), 
record2.getTimestamp(), shardIndex, seq);
+               Assert.assertEquals(record2, 
sourceContext.getCollectedOutputs().poll());
+
+               fetcher.emitWatermark();
+               Assert.assertFalse("watermark advanced", watermarks.isEmpty());
+               Assert.assertEquals(new Watermark(record2.getTimestamp()), 
watermarks.remove(0));
+               Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+
+               // test idle timeout
+               long idleTimeout = 10;
+               // advance clock idleTimeout
+               clock.add(idleTimeout + 1);
+               fetcher.emitWatermark();
+               Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+               Assert.assertTrue("not idle, no new watermark", 
watermarks.isEmpty());
+
+               // activate idle timeout
+               Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", 
idleTimeout);
+               fetcher.emitWatermark();
+               Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
+               Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index eb3415572c0..c8e1f0919dc 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -22,20 +22,25 @@
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -102,6 +107,10 @@ public static KinesisProxyInterface 
initialNumOfRecordsAfterNumOfGetRecordsCalls
                                millisBehindLatest);
        }
 
+       public static KinesisProxyInterface blockingQueueGetRecords(Map<String, 
List<BlockingQueue<String>>> streamsToShardQueues) {
+               return new BlockingQueueKinesis(streamsToShardQueues);
+       }
+
        private static class 
SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends 
SingleShardEmittingFixNumOfRecordsKinesis {
 
                private final long millisBehindLatest;
@@ -387,4 +396,85 @@ public GetRecordsResult getRecords(String shardIterator, 
int maxRecordsToGet) {
                        return null;
                }
        }
+
+       private static class BlockingQueueKinesis implements 
KinesisProxyInterface {
+
+               private Map<String, List<StreamShardHandle>> 
streamsWithListOfShards = new HashMap<>();
+               private Map<String, BlockingQueue<String>> 
shardIteratorToQueueMap = new HashMap<>();
+
+               private static String getShardIterator(StreamShardHandle 
shardHandle) {
+                       return shardHandle.getStreamName() + "-" + 
shardHandle.getShard().getShardId();
+               }
+
+               public BlockingQueueKinesis(Map<String, 
List<BlockingQueue<String>>> streamsToShardCount) {
+                       for (Map.Entry<String, List<BlockingQueue<String>>> 
streamToShardQueues : streamsToShardCount.entrySet()) {
+                               String streamName = 
streamToShardQueues.getKey();
+                               int shardCount = 
streamToShardQueues.getValue().size();
+
+                               if (shardCount == 0) {
+                                       // don't do anything
+                               } else {
+                                       List<StreamShardHandle> shardsOfStream 
= new ArrayList<>(shardCount);
+                                       for (int i = 0; i < shardCount; i++) {
+                                               StreamShardHandle shardHandle = 
new StreamShardHandle(
+                                                       streamName,
+                                                       new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))
+                                                               
.withSequenceNumberRange(new 
SequenceNumberRange().withStartingSequenceNumber("0"))
+                                                               
.withHashKeyRange(new 
HashKeyRange().withStartingHashKey("0").withEndingHashKey("0")));
+                                               shardsOfStream.add(shardHandle);
+                                               
shardIteratorToQueueMap.put(getShardIterator(shardHandle), 
streamToShardQueues.getValue().get(i));
+                                       }
+                                       streamsWithListOfShards.put(streamName, 
shardsOfStream);
+                               }
+                       }
+               }
+
+               @Override
+               public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSeenShardIds) {
+                       GetShardListResult result = new GetShardListResult();
+                       for (Map.Entry<String, List<StreamShardHandle>> 
streamsWithShards : streamsWithListOfShards.entrySet()) {
+                               String streamName = streamsWithShards.getKey();
+                               for (StreamShardHandle shard : 
streamsWithShards.getValue()) {
+                                       if 
(streamNamesWithLastSeenShardIds.get(streamName) == null) {
+                                               
result.addRetrievedShardToStream(streamName, shard);
+                                       } else {
+                                               if 
(StreamShardHandle.compareShardIds(
+                                                       
shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) 
> 0) {
+                                                       
result.addRetrievedShardToStream(streamName, shard);
+                                               }
+                                       }
+                               }
+                       }
+                       return result;
+               }
+
+               @Override
+               public String getShardIterator(StreamShardHandle shard, String 
shardIteratorType, Object startingMarker) {
+                       return getShardIterator(shard);
+               }
+
+               @Override
+               public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+                       BlockingQueue<String> queue = 
Preconditions.checkNotNull(this.shardIteratorToQueueMap.get(shardIterator),
+                       "no queue for iterator %s", shardIterator);
+                       List<Record> records = Collections.emptyList();
+                       try {
+                               String data = queue.take();
+                               Record record = new Record()
+                                       .withData(
+                                               
ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
+                                       
.withPartitionKey(UUID.randomUUID().toString())
+                                       .withApproximateArrivalTimestamp(new 
Date(System.currentTimeMillis()))
+                                       .withSequenceNumber(String.valueOf(0));
+                               records = Collections.singletonList(record);
+                       } catch (InterruptedException e) {
+                               shardIterator = null;
+                       }
+                       return new GetRecordsResult()
+                               .withRecords(records)
+                               .withMillisBehindLatest(0L)
+                               .withNextShardIterator(shardIterator);
+               }
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 21588c9a7a7..a44028766e1 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -69,6 +69,7 @@ public TestableKinesisDataFetcher(
                        fakeConfiguration,
                        deserializationSchema,
                        DEFAULT_SHARD_ASSIGNER,
+                       null,
                        thrownErrorUnderTest,
                        subscribedShardsStateUnderTest,
                        subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
diff --git a/pom.xml b/pom.xml
index 2a5dfced140..e6446094d07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@ under the License.
                <avro.version>1.8.2</avro.version>
                <junit.version>4.12</junit.version>
                <mockito.version>2.21.0</mockito.version>
-               <powermock.version>2.0.0-beta.5</powermock.version>
+               <powermock.version>2.0.0-RC.4</powermock.version>
                <hamcrest.version>1.3</hamcrest.version>
                <japicmp.skip>false</japicmp.skip>
                <codebase>new</codebase>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to