[FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer

This closes #3458.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19322401
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19322401
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19322401

Branch: refs/heads/master
Commit: 1932240179189a88273f52d3d93f277e7bf604de
Parents: a119a30
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Thu Mar 2 18:56:39 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Mar 31 12:33:48 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 36 +++++----
 .../kinesis/internals/KinesisDataFetcher.java   | 77 ++++----------------
 2 files changed, 34 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index ef1afca..1fcc529 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -23,6 +23,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 The Kinesis connector provides access to [Amazon AWS Kinesis 
Streams](http://aws.amazon.com/kinesis/streams/).
 
 To use the connector, add the following Maven dependency to your project:
@@ -53,14 +56,14 @@ mvn clean install -Pinclude-kinesis -DskipTests
 The streaming connectors are not part of the binary distribution. See how to 
link with them for cluster
 execution [here]({{site.baseurl}}/dev/linking.html).
 
-### Using the Amazon Kinesis Streams Service
+## Using the Amazon Kinesis Streams Service
 Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to setup Kinesis streams. Make sure to create the appropriate IAM policy and 
user to read / write to the Kinesis streams.
 
-### Kinesis Consumer
+## Kinesis Consumer
 
 The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source 
that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can handle resharding of 
streams. Each subtask of the consumer is
+streams within the same AWS service region, and can transparently handle 
resharding of streams while the job is running. Each subtask of the consumer is
 responsible for fetching data records from multiple Kinesis shards. The number 
of shards fetched by each subtask will
 change as shards are closed and created by Kinesis.
 
@@ -107,13 +110,16 @@ to `TRIM_HORIZON`, which lets the consumer start reading 
the Kinesis stream from
 
 Other optional configuration keys for the consumer can be found in 
`ConsumerConfigConstants`.
 
-**NOTE:** Currently, resharding can not be handled transparently (i.e., 
without failing and restarting jobs) if there are idle consumer
-subtasks, which occur when the total number of shards is lower than the 
configured consumer parallelism. The job must be
-configured to enable checkpointing, so that the new shards due to resharding 
can be correctly picked up and consumed by the
-Kinesis consumer after the job is restored. This is a temporary limitation 
that will be resolved in future versions.
-Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for 
more detail.
+Note that the configured parallelism of the Flink Kinesis Consumer source
+can be completely independent of the total number of shards in the Kinesis 
streams.
+When the number of shards is larger than the parallelism of the consumer,
+then each consumer subtask can subscribe to multiple shards; otherwise
+if the number of shards is smaller than the parallelism of the consumer,
+then some consumer subtasks will simply be idle and wait until it gets assigned
+new shards (i.e., when the streams are resharded to increase the
+number of shards for higher provisioned Kinesis service throughput).
 
-#### Configuring Starting Position
+### Configuring Starting Position
 
 The Flink Kinesis Consumer currently provides the following options to 
configure where to start reading Kinesis streams, simply by setting 
`ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
 one of the following values in the provided configuration properties (the 
naming of the options identically follows [the namings used by the AWS Kinesis 
Streams 
service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
@@ -127,7 +133,7 @@ properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIME
     If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined 
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
     (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` 
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without 
given a pattern).
 
-#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
+### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
 With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume 
records from shards in Kinesis streams and
 periodically checkpoint each shard's progress. In case of a job failure, Flink 
will restore the streaming program to the
@@ -157,7 +163,7 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
-#### Event Time for Consumed Records
+### Event Time for Consumed Records
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -201,7 +207,7 @@ kinesis = kinesis.assignTimestampsAndWatermarks(new 
CustomTimestampAssigner)
 </div>
 </div>
 
-#### Threading Model
+### Threading Model
 
 The Flink Kinesis Consumer uses multiple threads for shard discovery and data 
consumption.
 
@@ -214,7 +220,7 @@ For data consumption, a single thread will be created to 
consume each discovered
 shard it is responsible of consuming is closed as a result of stream 
resharding. In other words, there will always be
 one thread per open shard.
 
-#### Internally Used Kinesis APIs
+### Internally Used Kinesis APIs
 
 The Flink Kinesis Consumer uses the [AWS Java 
SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
 for shard discovery and data consumption. Due to Amazon's [service limits for 
Kinesis 
Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
@@ -248,7 +254,7 @@ adjusts the maximum number of records each consuming thread 
tries to fetch from
 the latter modifies the sleep interval between each fetch (there will be no 
sleep by default). The retry behaviour of the
 consumer when calling this API can also be modified by using the other keys 
prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
 
-### Kinesis Producer
+## Kinesis Producer
 
 The `FlinkKinesisProducer` is used for putting data from a Flink stream into a 
Kinesis stream. Note that the producer is not participating in
 Flink's checkpointing and doesn't provide exactly-once processing guarantees.
@@ -305,7 +311,7 @@ Otherwise, the returned stream name is used.
 Other optional configuration keys for the producer can be found in 
`ProducerConfigConstants`.
 
 
-### Using Non-AWS Kinesis Endpoints for Testing
+## Using Non-AWS Kinesis Endpoints for Testing
 
 It is sometimes desirable to have Flink operate as a consumer or producer 
against a non-AWS Kinesis endpoint such as
 [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful 
when performing functional testing of a Flink

http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index a06fdca..46847b3 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -19,7 +19,6 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
@@ -325,35 +324,10 @@ public class KinesisDataFetcher<T> {
                                
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
                                
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
 
-               // FLINK-4341:
-               // For downstream operators that work on time (ex. window 
operators), we are required to emit a max value watermark
-               // for subtasks that won't continue to have shards to read from 
unless resharding happens in the future, otherwise
-               // the downstream watermarks would not advance, leading to 
unbounded accumulating state.
-               //
-               // The side-effect of this limitation is that on resharding, we 
must fail hard if the newly discovered shard
-               // is to be subscribed by a subtask that has previously emitted 
a max value watermark, otherwise the watermarks
-               // will be messed up.
-               //
-               // There are 2 cases were we need to either emit a max value 
watermark, or deliberately fail hard:
-               //  (a) if this subtask has no more shards to read from unless 
resharding happens in the future, we emit a max
-               //      value watermark. This case is encountered when 1) all 
previously read shards by this subtask were closed
-               //      due to resharding, 2) when this subtask was initially 
only subscribed to closed shards while the consumer
-               //      was told to start from TRIM_HORIZON, or 3) there was 
initially no shards for this subtask to read on startup.
-               //  (b) this subtask has discovered new shards to read from due 
to a reshard; if this subtask has already emitted
-               //      a max value watermark, we must deliberately fail hard 
to avoid messing up the watermarks. The new shards
-               //      will be subscribed by this subtask after restore as 
initial shards on startup.
-               //
-               // TODO: This is a temporary workaround until a min-watermark 
information service is available in the JobManager
-               // Please see FLINK-4341 for more detail
-
-               boolean emittedMaxValueWatermark = false;
-
                if (this.numberOfActiveShards.get() == 0) {
-                       // FLINK-4341 workaround case (a) - please see the 
above for details on this case
-                       LOG.info("Subtask {} has no initial shards to read on 
startup; emitting max value watermark ...",
+                       LOG.info("Subtask {} has no active shards to read on 
startup; marking the subtask as temporarily idle ...",
                                indexOfThisConsumerSubtask);
-                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                       emittedMaxValueWatermark = true;
+                       sourceContext.markAsTemporarilyIdle();
                }
 
                while (running) {
@@ -363,41 +337,6 @@ public class KinesisDataFetcher<T> {
                        }
                        List<KinesisStreamShard> newShardsDueToResharding = 
discoverNewShardsToSubscribe();
 
-                       // -- NOTE: Potential race condition between 
newShardsDueToResharding and numberOfActiveShards --
-                       // Since numberOfActiveShards is updated by parallel 
shard consuming threads in updateState(), there exists
-                       // a race condition with the currently queried 
newShardsDueToResharding. Therefore, numberOfActiveShards
-                       // may not correctly reflect the discover result in the 
below case determination. This may lead to incorrect
-                       // case determination on the current discovery attempt, 
but can still be correctly handled on future attempts.
-                       //
-                       // Although this can be resolved by wrapping the 
current shard discovery attempt with the below
-                       // case determination within a synchronized block on 
the checkpoint lock for atomicity, there will be
-                       // considerable throughput performance regression as 
shard discovery is a remote call to AWS. Therefore,
-                       // since the case determination is a temporary 
workaround for FLINK-4341, the race condition is tolerable as
-                       // we can still eventually handle max value watermark 
emitting / deliberately failing on successive
-                       // discovery attempts.
-
-                       if (newShardsDueToResharding.size() == 0 && 
this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
-                               // FLINK-4341 workaround case (a) - please see 
the above for details on this case
-                               LOG.info("Subtask {} has completed reading all 
shards; emitting max value watermark ...",
-                                       indexOfThisConsumerSubtask);
-                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                               emittedMaxValueWatermark = true;
-                       } else if (newShardsDueToResharding.size() > 0 && 
emittedMaxValueWatermark) {
-                               // FLINK-4341 workaround case (b) - please see 
the above for details on this case
-                               //
-                               // Note that in the case where on resharding 
this subtask ceased to read all of it's previous shards
-                               // but new shards is also to be subscribed by 
this subtask immediately after, emittedMaxValueWatermark
-                               // will be false; this allows the fetcher to 
continue reading the new shards without failing on such cases.
-                               // However, due to the race condition mentioned 
above, we might still fall into case (a) first, and
-                               // then (b) on the next discovery attempt. 
Although the failure is ideally unnecessary, max value
-                               // watermark emitting still remains to be 
correct.
-
-                               LOG.warn("Subtask {} has discovered {} new 
shards to subscribe, but is failing hard to avoid messing" +
-                                               " up watermarks; the new shards 
will be subscribed by this subtask after restore ...",
-                                       indexOfThisConsumerSubtask, 
newShardsDueToResharding.size());
-                               throw new RuntimeException("Deliberate failure 
to avoid messing up watermarks");
-                       }
-
                        for (KinesisStreamShard shard : 
newShardsDueToResharding) {
                                // since there may be delay in discovering a 
new shard, all new shards due to
                                // resharding should be read starting from the 
earliest record possible
@@ -605,9 +544,19 @@ public class KinesisDataFetcher<T> {
                        // if a shard's state is updated to be 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
                        // we've finished reading the shard and should 
determine it to be non-active
                        if 
(lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-                               this.numberOfActiveShards.decrementAndGet();
                                LOG.info("Subtask {} has reached the end of 
subscribed shard: {}",
                                        indexOfThisConsumerSubtask, 
subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+
+                               // check if we need to mark the source as idle;
+                               // note that on resharding, if 
registerNewSubscribedShardState was invoked for newly discovered shards
+                               // AFTER the old shards had reached the end, 
the subtask's status will be automatically toggled back to
+                               // be active immediately afterwards as soon as 
we collect records from the new shards
+                               if (this.numberOfActiveShards.decrementAndGet() 
== 0) {
+                                       LOG.info("Subtask {} has reached the 
end of all currently subscribed shards; marking the subtask as temporarily idle 
...",
+                                               indexOfThisConsumerSubtask);
+
+                                       sourceContext.markAsTemporarilyIdle();
+                               }
                        }
                }
        }

Reply via email to