psolomin commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1165249910


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires 
implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, 
EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving 
data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * 
href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html";>Consumers
 with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link 
Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances 
via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": 
"arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": 
"arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link 
Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer 
will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not 
(re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest, 
SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.
+ *
+ * <p>EFO source, when consuming from a stream with often re-sharding, may 
eventually get skewed
+ * load among runner workers: some may end up with no active shard 
subscriptions at all.
+ *
+ * <h5>Enhanced Fan-Out and KinesisIO state management</h5>
+ *
+ * <p>Different runners may behave differently when a Beam application is 
started from a persisted
+ * state. Examples of persisted state are:
+ *
+ * <ul>
+ *   <li><a 
href="https://cloud.google.com/dataflow/docs/guides/using-snapshots";>GCP 
Dataflow
+ *       snapshots</a>
+ *   <li><a
+ *       
href="https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/";>Flink
+ *       savepoints</a>
+ *   <li><a
+ *       
href="https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-fault-snapshot.html";>Kinesis
+ *       Data Analytics snapshots</a>
+ * </ul>
+ *
+ * <p>Depending on their internals, runners may persist <b>entire</b> {@link 
Read} object inside the
+ * state, like Flink runner does. It means that, once enabled via {@link
+ * Read#withConsumerArn(String)} in Flink runner, as long as the Beam 
application starts from a
+ * savepoint, further changes to {@link Read#withConsumerArn(String)} won't 
take effect.
+ *
+ * <p>If your runner persists {@link Read} object, disabling / changing 
consumer ARN and restoring
+ * from persisted state can be done via {@link 
KinesisIOOptions#setKinesisIOConsumerArns(Map)}:
+ *
+ * <pre>{@code --kinesisIOConsumerArns={
+ *   "stream-01": " < new consumer ARN > ",  <- updated ARN
+ *   "stream-02": null,  <- disabling EFO
+ *   ...
+ * }}</pre>
+ *
+ * <p>EFO can be enabled / disabled any time without loosing consumer's 
positions in shards which
+ * were already checkpoint-ed. Consumer ARN for a given stream can be changed 
any time, too.
+ *
+ * <h5>Enhanced Fan-Out and other KinesisIO settings</h5>
+ *
+ * <p>When EFO is enabled, the following configurations are ignored:
+ *
+ * <ul>
+ *   <li>{@link Read#withMaxCapacityPerShard(Integer)}

Review Comment:
   Created https://github.com/apache/beam/issues/26257 to address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to