rmetzger commented on a change in pull request #13261:
URL: https://github.com/apache/flink/pull/13261#discussion_r490903573



##########
File path: docs/dev/connectors/kinesis.md
##########
@@ -129,6 +129,181 @@ shard IDs are not consecutive (as result of dynamic 
re-sharding in Kinesis).
 For cases where skew in the assignment leads to significant imbalanced 
consumption,
 a custom implementation of `KinesisShardAssigner` can be set on the consumer.
 
+### Using Enhanced Fan Out

Review comment:
       I'm not sure if it makes sense to place EFO at the top of the "advanced 
usage sections". It seems that this topic is not relevant for first-time users, 
rather for users later in the adoption processes.
   Maybe it would make sense to move it past "Fault Tolerance for Exactly-Once 
User-Defined State Update Semantics", so that first-time users have a better 
experience.

##########
File path: docs/dev/connectors/kinesis.md
##########
@@ -129,6 +129,181 @@ shard IDs are not consecutive (as result of dynamic 
re-sharding in Kinesis).
 For cases where skew in the assignment leads to significant imbalanced 
consumption,
 a custom implementation of `KinesisShardAssigner` can be set on the consumer.
 
+### Using Enhanced Fan Out
+
+[Enhanced Fan Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum 
+number of concurrent consumers per Kinesis stream.
+Without EFO, all concurrent consumers share a single read quota per shard. 
+Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers. 
+Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
+ 
+In order to enable EFO two additional configuration parameters are required:
+
+- `RECORD_PUBLISHER_TYPE`: Determines whether to use `EFO` or `POLLING`. The 
default `RecordPublisher` is `POLLING`.
+- `EFO_CONSUMER_NAME`: A name to identify the consumer. 
+For a given Kinesis data stream, each consumer must have a unique name. 
+However, consumer names do not have to be unique across data streams. 
+Reusing a consumer name will result in existing subscriptions being terminated.
+
+The code snippet below shows a simple example configurating an EFO consumer.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+{% endhighlight %}
+</div>
+</div>
+
+#### EFO Stream Consumer Registration/Deregistration
+
+In order to use EFO, a stream consumer must be registered against each stream 
you wish to consume.
+By default, the `FlinkKinesisConsumer` will register the stream consumer 
automatically when the Flink job starts.
+The stream consumer will be registered using the name provided by the 
`EFO_CONSUMER_NAME` configuration.
+`FlinkKinesisConsumer` provides three registration strategies:
+
+- Registration
+  - `LAZY` (default): Stream consumers are registered when the Flink job 
starts running.
+    If the stream consumer already exists, it will be reused.
+    This is the preferred strategy for the majority of applications.
+    However, jobs with parallelism greater than 1 will result in tasks 
competing to register and acquire the stream consumer ARN.
+    For jobs with very large parallelism this can result in an increased 
start-up time.
+    The describe operation has a limit of 20 [transactions per 
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
+    this means application startup time will increase by roughly 
`parallelism/20 seconds`.
+  - `EAGER`: Stream consumers are registered in the `FlinkKinesisConstructor`.
+    If the stream consumer already exists, it will be reused. 
+    This will result in registration occurring when the job is constructed, 
+    either on the Flink Job Manager or client environment submitting the job.
+    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN, 
+    reducing startup time over `LAZY` (with large parallelism).
+    However, consider that the client environment will require access to the 
AWS services.

Review comment:
       Ha! this was a question I had when reading the text. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to