[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672248#comment-16672248
 ] 

ASF GitHub Bot commented on FLINK-5697:
---------------------------------------

tweise closed pull request #6980: [FLINK-5697] [kinesis] Add periodic per-shard 
watermark support
URL: https://github.com/apache/flink/pull/6980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 407a5a95524..f0852584ade 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
@@ -78,6 +79,17 @@
  * A custom assigner implementation can be set via {@link 
#setShardAssigner(KinesisShardAssigner)} to optimize the
  * hash function or use static overrides to limit skew.
  *
+ * <p>In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via {@link
+ * #setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks)} and the auto 
watermark emit
+ * interval configured via {@link
+ * org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * <p>Watermarks can only advance when all shards of a subtask continuously 
deliver records. To
+ * avoid an inactive or closed shard to block the watermark progress, the idle 
timeout should be
+ * configured via configuration property {@link
+ * ConsumerConfigConstants#SHARD_IDLE_INTERVAL_MILLIS}. By default, shards 
won't be considered
+ * idle and watermark calculation will wait for newer records to arrive from 
all shards.
+ *
  * @param <T> the type of data emitted
  */
 @PublicEvolving
@@ -108,6 +120,8 @@
         */
        private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
+       private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
        // 
------------------------------------------------------------------------
        //  Runtime state
        // 
------------------------------------------------------------------------
@@ -220,6 +234,22 @@ public void setShardAssigner(KinesisShardAssigner 
shardAssigner) {
                ClosureCleaner.clean(shardAssigner, true);
        }
 
+       public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() 
{
+               return periodicWatermarkAssigner;
+       }
+
+       /**
+        * Set the assigner that will extract the timestamp from {@link T} and 
calculate the
+        * watermark.
+        *
+        * @param periodicWatermarkAssigner
+        */
+       public void setPeriodicWatermarkAssigner(
+               AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+               this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+               ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
+       }
+
        // 
------------------------------------------------------------------------
        //  Source life cycle
        // 
------------------------------------------------------------------------
@@ -414,7 +444,7 @@ public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
                        Properties configProps,
                        KinesisDeserializationSchema<T> deserializationSchema) {
 
-               return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner);
+               return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner);
        }
 
        @VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 443b19ec382..42e2173474b 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -140,6 +140,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
        /** The config to turn on adaptive reads from a shard. */
        public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.adaptivereads";
 
+       /** The interval after which to consider a shard idle for purposes of 
watermark generation. */
+       public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
        // 
------------------------------------------------------------------------
        //  Default values for consumer configuration
@@ -190,6 +192,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
        public static final boolean DEFAULT_SHARD_USE_ADAPTIVE_READS = false;
 
+       public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
+
        /**
         * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
         * getRecords interval can not exceed 5 minutes, which is the expire 
time for retrieved iterators.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 0981b76ce89..60e7201e913 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -21,7 +21,10 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
@@ -35,7 +38,10 @@
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -51,6 +57,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -182,6 +189,30 @@
 
        private volatile boolean running = true;
 
+       private final AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner;
+
+       private PeriodicWatermarkEmitter periodicWatermarkEmitter;
+
+       /**
+        * The watermark related state for each shard consumer. Entries in this 
map will be created when shards
+        * are discovered. After recovery, this shard map will be recreated, 
possibly with different shard index keys,
+        * since those are transient and not part of checkpointed state.
+        */
+       private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks 
= new ConcurrentHashMap<>();
+
+       /**
+        * The most recent watermark, calculated from the per shard watermarks. 
The initial value will never be emitted and
+        * also apply after recovery. The fist watermark that will be emitted 
is derived from actually consumed records.
+        * In case of recovery and replay, the watermark will rewind, 
consistent wth the shard consumer sequence.
+        */
+       private long lastWatermark = Long.MIN_VALUE;
+
+       /**
+        * The time span since last consumed record, after which a shard will 
be considered idle for purpose of watermark
+        * calculation. A positive value will allow the watermark to progress 
even when some shards don't receive new records.
+        */
+       private long shardIdleIntervalMillis = 
ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS;
+
        /**
         * Factory to create Kinesis proxy instances used by a fetcher.
         */
@@ -203,7 +234,8 @@ public KinesisDataFetcher(List<String> streams,
                                                        RuntimeContext 
runtimeContext,
                                                        Properties configProps,
                                                        
KinesisDeserializationSchema<T> deserializationSchema,
-                                                       KinesisShardAssigner 
shardAssigner) {
+                                                       KinesisShardAssigner 
shardAssigner,
+                                                       
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
                this(streams,
                        sourceContext,
                        sourceContext.getCheckpointLock(),
@@ -211,6 +243,7 @@ public KinesisDataFetcher(List<String> streams,
                        configProps,
                        deserializationSchema,
                        shardAssigner,
+                       periodicWatermarkAssigner,
                        new AtomicReference<>(),
                        new ArrayList<>(),
                        
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -225,6 +258,7 @@ protected KinesisDataFetcher(List<String> streams,
                                                                Properties 
configProps,
                                                                
KinesisDeserializationSchema<T> deserializationSchema,
                                                                
KinesisShardAssigner shardAssigner,
+                                                               
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
                                                                
AtomicReference<Throwable> error,
                                                                
List<KinesisStreamShardState> subscribedShardsState,
                                                                HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
@@ -238,6 +272,7 @@ protected KinesisDataFetcher(List<String> streams,
                this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
                this.deserializationSchema = 
checkNotNull(deserializationSchema);
                this.shardAssigner = checkNotNull(shardAssigner);
+               this.periodicWatermarkAssigner = periodicWatermarkAssigner;
                this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
                this.kinesis = kinesisProxyFactory.create(configProps);
 
@@ -339,6 +374,20 @@ public void runFetcher() throws Exception {
                        }
                }
 
+        // start periodic watermark emitter, if a watermark assigner was 
configured
+               if (periodicWatermarkAssigner != null) {
+                       long periodicWatermarkIntervalMillis = 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+                       if (periodicWatermarkIntervalMillis > 0) {
+                               ProcessingTimeService timerService = 
((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
+                               LOG.info("Starting periodic watermark emitter 
with interval {}", periodicWatermarkIntervalMillis);
+                               this.periodicWatermarkEmitter = new 
PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis);
+                               this.periodicWatermarkEmitter.start();
+                       }
+                       this.shardIdleIntervalMillis = Long.parseLong(
+                               
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
+                                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+               }
+
                // 
------------------------------------------------------------------------
 
                // finally, start the infinite shard discovery and consumer 
launching loop;
@@ -546,6 +595,18 @@ protected Properties getConsumerConfiguration() {
         * @param lastSequenceNumber the last sequence number value to update
         */
        protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
+               // track per shard watermarks and emit timestamps extracted 
from the record,
+               // when a watermark assigner was configured.
+               if (periodicWatermarkAssigner != null) {
+                       ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+                       Preconditions.checkNotNull(
+                               sws, "shard watermark state initialized in 
registerNewSubscribedShardState");
+                       recordTimestamp =
+                               
sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
+                       sws.lastRecordTimestamp = recordTimestamp;
+                       sws.lastUpdated = getCurrentTimeMillis();
+               }
+
                synchronized (checkpointLock) {
                        if (record != null) {
                                sourceContext.collectWithTimestamp(record, 
recordTimestamp);
@@ -609,7 +670,119 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
                                this.numberOfActiveShards.incrementAndGet();
                        }
 
-                       return subscribedShardsState.size() - 1;
+                       int shardStateIndex = subscribedShardsState.size() - 1;
+
+                       // track all discovered shards for watermark 
determination
+                       ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
+                       if (sws == null) {
+                               sws = new ShardWatermarkState();
+                               try {
+                                       sws.periodicWatermarkAssigner = 
InstantiationUtil.clone(periodicWatermarkAssigner);
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                               sws.lastUpdated = getCurrentTimeMillis();
+                               sws.lastRecordTimestamp = Long.MIN_VALUE;
+                               shardWatermarks.put(shardStateIndex, sws);
+                       }
+
+                       return shardStateIndex;
+               }
+       }
+
+       /**
+        * Return the current system time. Allow tests to override this to 
simulate progress for watermark
+        * logic.
+        *
+        * @return
+        */
+       @VisibleForTesting
+       protected long getCurrentTimeMillis() {
+               return System.currentTimeMillis();
+       }
+
+       /**
+        * Called periodically to emit a watermark. Checks all shards for the 
current event time
+        * watermark, and possibly emits the next watermark.
+        *
+        * <p>Shards that have not received an update for a certain interval 
are considered inactive so as
+        * to not hold back the watermark indefinitely. When all shards are 
inactive, the subtask will be
+        * marked as temporarily idle to not block downstream operators.
+        */
+       @VisibleForTesting
+       protected void emitWatermark() {
+               LOG.debug(
+                       "###evaluating watermark for subtask {} time {}",
+                       indexOfThisConsumerSubtask,
+                       getCurrentTimeMillis());
+               long potentialWatermark = Long.MAX_VALUE;
+               long idleTime =
+                       (shardIdleIntervalMillis > 0)
+                               ? getCurrentTimeMillis() - 
shardIdleIntervalMillis
+                               : Long.MAX_VALUE;
+
+               for (Map.Entry<Integer, ShardWatermarkState> e : 
shardWatermarks.entrySet()) {
+                       // consider only active shards, or those that would 
advance the watermark
+                       Watermark w = 
e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
+                       if (w != null && (e.getValue().lastUpdated >= idleTime 
|| w.getTimestamp() > lastWatermark)) {
+                               potentialWatermark = 
Math.min(potentialWatermark, w.getTimestamp());
+                       }
+               }
+
+               // advance watermark if possible (watermarks can only be 
ascending)
+               if (potentialWatermark == Long.MAX_VALUE) {
+                       if (shardWatermarks.isEmpty() || 
shardIdleIntervalMillis > 0) {
+                               LOG.debug(
+                                       "###No active shard for subtask {}, 
marking the source idle.",
+                                       indexOfThisConsumerSubtask);
+                               // no active shard, signal downstream operators 
to not wait for a watermark
+                               sourceContext.markAsTemporarilyIdle();
+                       }
+               } else if (potentialWatermark > lastWatermark) {
+                       LOG.debug(
+                               "###emitting watermark {} from subtask {}",
+                               potentialWatermark,
+                               indexOfThisConsumerSubtask);
+                       sourceContext.emitWatermark(new 
Watermark(potentialWatermark));
+                       lastWatermark = potentialWatermark;
+               }
+       }
+
+       /** Per shard tracking of watermark and last activity. */
+       private static class ShardWatermarkState<T> {
+               private AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner;
+               private volatile long lastRecordTimestamp;
+               private volatile long lastUpdated;
+       }
+
+       /**
+        * The periodic watermark emitter. In its given interval, it checks all 
shards for the current
+        * event time watermark, and possibly emits the next watermark.
+        */
+       private class PeriodicWatermarkEmitter implements 
ProcessingTimeCallback {
+
+               private final ProcessingTimeService timerService;
+               private final long interval;
+
+               // -------------------------------------------------
+
+               PeriodicWatermarkEmitter(ProcessingTimeService timerService, 
long autoWatermarkInterval) {
+                       this.timerService = checkNotNull(timerService);
+                       this.interval = autoWatermarkInterval;
+               }
+
+               // -------------------------------------------------
+
+               public void start() {
+                       LOG.debug("registering periodic watermark timer with 
interval {}", interval);
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
+               }
+
+               @Override
+               public void onProcessingTime(long timestamp) {
+                       emitWatermark();
+                       // schedule the next watermark
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index a99e845f249..9a6d2d66a6f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public TestFetcher(
                                HashMap<StreamShardMetadata, SequenceNumber> 
testStateSnapshot,
                                List<StreamShardHandle> 
testInitialDiscoveryShards) {
 
-                       super(streams, sourceContext, runtimeContext, 
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER);
+                       super(streams, sourceContext, runtimeContext, 
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
 
                        this.testStateSnapshot = testStateSnapshot;
                        this.testInitialDiscoveryShards = 
testInitialDiscoveryShards;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index ccf39d0e19b..e3e7287f24a 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -21,6 +21,9 @@
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -40,9 +43,13 @@
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -710,4 +717,98 @@ public void testIsThisSubtaskShouldSubscribeTo() {
                
assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
        }
 
+       private static BoundedOutOfOrdernessTimestampExtractor<String> 
watermarkAssigner =
+               new 
BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
+                       @Override
+                       public long extractTimestamp(String element) {
+                               return Long.parseLong(element);
+                       }
+               };
+
+       @Test
+       public void testPeriodicWatermark() {
+               final MutableLong clock = new MutableLong();
+               final MutableBoolean isTemporaryIdle = new MutableBoolean();
+               final List<Watermark> watermarks = new ArrayList<>();
+
+               String fakeStream1 = "fakeStream1";
+               StreamShardHandle shardHandle =
+                       new StreamShardHandle(
+                               fakeStream1,
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)));
+
+               TestSourceContext<String> sourceContext =
+                       new TestSourceContext<String>() {
+                               @Override
+                               public void emitWatermark(Watermark mark) {
+                                       watermarks.add(mark);
+                               }
+
+                               @Override
+                               public void markAsTemporarilyIdle() {
+                                       isTemporaryIdle.setTrue();
+                               }
+                       };
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest = new HashMap<>();
+
+               final KinesisDataFetcher<String> fetcher =
+                       new TestableKinesisDataFetcher<String>(
+                               Collections.singletonList(fakeStream1),
+                               sourceContext,
+                               new java.util.Properties(),
+                               new KinesisDeserializationSchemaWrapper<>(new 
org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
+                               1,
+                               1,
+                               new AtomicReference<>(),
+                               new LinkedList<>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(new HashMap<>())) {
+
+                               @Override
+                               protected long getCurrentTimeMillis() {
+                                       return clock.getValue();
+                               }
+                       };
+               Whitebox.setInternalState(fetcher, "periodicWatermarkAssigner", 
watermarkAssigner);
+
+               SequenceNumber seq = new SequenceNumber("fakeSequenceNumber");
+               // register shards to subsequently emit records
+               int shardIndex =
+                       fetcher.registerNewSubscribedShardState(
+                               new KinesisStreamShardState(
+                                       
KinesisDataFetcher.convertToStreamShardMetadata(shardHandle), shardHandle, 
seq));
+
+               StreamRecord<String> record1 =
+                       new StreamRecord<>(String.valueOf(Long.MIN_VALUE), 
Long.MIN_VALUE);
+               fetcher.emitRecordAndUpdateState(record1.getValue(), 
record1.getTimestamp(), shardIndex, seq);
+               Assert.assertEquals(record1, 
sourceContext.getCollectedOutputs().poll());
+
+               fetcher.emitWatermark();
+               Assert.assertTrue("potential watermark equals previous 
watermark", watermarks.isEmpty());
+
+               StreamRecord<String> record2 = new 
StreamRecord<>(String.valueOf(1), 1);
+               fetcher.emitRecordAndUpdateState(record2.getValue(), 
record2.getTimestamp(), shardIndex, seq);
+               Assert.assertEquals(record2, 
sourceContext.getCollectedOutputs().poll());
+
+               fetcher.emitWatermark();
+               Assert.assertFalse("watermark advanced", watermarks.isEmpty());
+               Assert.assertEquals(new Watermark(record2.getTimestamp()), 
watermarks.remove(0));
+               Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+
+               // test idle timeout
+               long idleTimeout = 10;
+               // advance clock idleTimeout
+               clock.add(idleTimeout + 1);
+               fetcher.emitWatermark();
+               Assert.assertFalse("not idle", isTemporaryIdle.booleanValue());
+               Assert.assertTrue("not idle, no new watermark", 
watermarks.isEmpty());
+
+               // activate idle timeout
+               Whitebox.setInternalState(fetcher, "shardIdleIntervalMillis", 
idleTimeout);
+               fetcher.emitWatermark();
+               Assert.assertTrue("idle", isTemporaryIdle.booleanValue());
+               Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 21588c9a7a7..a44028766e1 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -69,6 +69,7 @@ public TestableKinesisDataFetcher(
                        fakeConfiguration,
                        deserializationSchema,
                        DEFAULT_SHARD_ASSIGNER,
+                       null,
                        thrownErrorUnderTest,
                        subscribedShardsStateUnderTest,
                        subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add per-shard watermarks for FlinkKinesisConsumer
> -------------------------------------------------
>
>                 Key: FLINK-5697
>                 URL: https://issues.apache.org/jira/browse/FLINK-5697
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: pull-request-available
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to