[FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer
This closes #3458. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19322401 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19322401 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19322401 Branch: refs/heads/master Commit: 1932240179189a88273f52d3d93f277e7bf604de Parents: a119a30 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Thu Mar 2 18:56:39 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Mar 31 12:33:48 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/kinesis.md | 36 +++++---- .../kinesis/internals/KinesisDataFetcher.java | 77 ++++---------------- 2 files changed, 34 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/docs/dev/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index ef1afca..1fcc529 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -23,6 +23,9 @@ specific language governing permissions and limitations under the License. --> +* This will be replaced by the TOC +{:toc} + The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/). To use the connector, add the following Maven dependency to your project: @@ -53,14 +56,14 @@ mvn clean install -Pinclude-kinesis -DskipTests The streaming connectors are not part of the binary distribution. See how to link with them for cluster execution [here]({{site.baseurl}}/dev/linking.html). -### Using the Amazon Kinesis Streams Service +## Using the Amazon Kinesis Streams Service Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams. -### Kinesis Consumer +## Kinesis Consumer The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis -streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is +streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will change as shards are closed and created by Kinesis. @@ -107,13 +110,16 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. -**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer -subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The job must be -configured to enable checkpointing, so that the new shards due to resharding can be correctly picked up and consumed by the -Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions. -Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail. +Note that the configured parallelism of the Flink Kinesis Consumer source +can be completely independent of the total number of shards in the Kinesis streams. +When the number of shards is larger than the parallelism of the consumer, +then each consumer subtask can subscribe to multiple shards; otherwise +if the number of shards is smaller than the parallelism of the consumer, +then some consumer subtasks will simply be idle and wait until it gets assigned +new shards (i.e., when the streams are resharded to increase the +number of shards for higher provisioned Kinesis service throughput). -#### Configuring Starting Position +### Configuring Starting Position The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)): @@ -127,7 +133,7 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). -#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics +### Fault Tolerance for Exactly-Once User-Defined State Update Semantics With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the @@ -157,7 +163,7 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. -#### Event Time for Consumed Records +### Event Time for Consumed Records <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -201,7 +207,7 @@ kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner) </div> </div> -#### Threading Model +### Threading Model The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption. @@ -214,7 +220,7 @@ For data consumption, a single thread will be created to consume each discovered shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be one thread per open shard. -#### Internally Used Kinesis APIs +### Internally Used Kinesis APIs The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) @@ -248,7 +254,7 @@ adjusts the maximum number of records each consuming thread tries to fetch from the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`. -### Kinesis Producer +## Kinesis Producer The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. @@ -305,7 +311,7 @@ Otherwise, the returned stream name is used. Other optional configuration keys for the producer can be found in `ProducerConfigConstants`. -### Using Non-AWS Kinesis Endpoints for Testing +## Using Non-AWS Kinesis Endpoints for Testing It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink http://git-wip-us.apache.org/repos/asf/flink/blob/19322401/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index a06fdca..46847b3 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kinesis.internals; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; @@ -325,35 +324,10 @@ public class KinesisDataFetcher<T> { ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); - // FLINK-4341: - // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark - // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise - // the downstream watermarks would not advance, leading to unbounded accumulating state. - // - // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard - // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks - // will be messed up. - // - // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard: - // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max - // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed - // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer - // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup. - // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted - // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards - // will be subscribed by this subtask after restore as initial shards on startup. - // - // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager - // Please see FLINK-4341 for more detail - - boolean emittedMaxValueWatermark = false; - if (this.numberOfActiveShards.get() == 0) { - // FLINK-4341 workaround case (a) - please see the above for details on this case - LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...", + LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...", indexOfThisConsumerSubtask); - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - emittedMaxValueWatermark = true; + sourceContext.markAsTemporarilyIdle(); } while (running) { @@ -363,41 +337,6 @@ public class KinesisDataFetcher<T> { } List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe(); - // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards -- - // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists - // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards - // may not correctly reflect the discover result in the below case determination. This may lead to incorrect - // case determination on the current discovery attempt, but can still be correctly handled on future attempts. - // - // Although this can be resolved by wrapping the current shard discovery attempt with the below - // case determination within a synchronized block on the checkpoint lock for atomicity, there will be - // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore, - // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as - // we can still eventually handle max value watermark emitting / deliberately failing on successive - // discovery attempts. - - if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { - // FLINK-4341 workaround case (a) - please see the above for details on this case - LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", - indexOfThisConsumerSubtask); - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - emittedMaxValueWatermark = true; - } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { - // FLINK-4341 workaround case (b) - please see the above for details on this case - // - // Note that in the case where on resharding this subtask ceased to read all of it's previous shards - // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark - // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. - // However, due to the race condition mentioned above, we might still fall into case (a) first, and - // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value - // watermark emitting still remains to be correct. - - LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + - " up watermarks; the new shards will be subscribed by this subtask after restore ...", - indexOfThisConsumerSubtask, newShardsDueToResharding.size()); - throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); - } - for (KinesisStreamShard shard : newShardsDueToResharding) { // since there may be delay in discovering a new shard, all new shards due to // resharding should be read starting from the earliest record possible @@ -605,9 +544,19 @@ public class KinesisDataFetcher<T> { // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread, // we've finished reading the shard and should determine it to be non-active if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - this.numberOfActiveShards.decrementAndGet(); LOG.info("Subtask {} has reached the end of subscribed shard: {}", indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard()); + + // check if we need to mark the source as idle; + // note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards + // AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to + // be active immediately afterwards as soon as we collect records from the new shards + if (this.numberOfActiveShards.decrementAndGet() == 0) { + LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...", + indexOfThisConsumerSubtask); + + sourceContext.markAsTemporarilyIdle(); + } } } }
