psolomin commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1165249910
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
* Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires
implementing {@link
* RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
*
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out,
EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving
data from the stream.
+ *
+ * <p>More details can be found here: <a
+ *
href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers
with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link
Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ * .withStreamName("streamName")
+ * .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ * .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances
via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ * "stream-01":
"arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ * "stream-02":
"arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ * ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link
Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer
will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not
(re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest,
SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.
+ *
+ * <p>EFO source, when consuming from a stream with often re-sharding, may
eventually get skewed
+ * load among runner workers: some may end up with no active shard
subscriptions at all.
+ *
+ * <h5>Enhanced Fan-Out and KinesisIO state management</h5>
+ *
+ * <p>Different runners may behave differently when a Beam application is
started from a persisted
+ * state. Examples of persisted state are:
+ *
+ * <ul>
+ * <li><a
href="https://cloud.google.com/dataflow/docs/guides/using-snapshots">GCP
Dataflow
+ * snapshots</a>
+ * <li><a
+ *
href="https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/">Flink
+ * savepoints</a>
+ * <li><a
+ *
href="https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-fault-snapshot.html">Kinesis
+ * Data Analytics snapshots</a>
+ * </ul>
+ *
+ * <p>Depending on their internals, runners may persist <b>entire</b> {@link
Read} object inside the
+ * state, like Flink runner does. It means that, once enabled via {@link
+ * Read#withConsumerArn(String)} in Flink runner, as long as the Beam
application starts from a
+ * savepoint, further changes to {@link Read#withConsumerArn(String)} won't
take effect.
+ *
+ * <p>If your runner persists {@link Read} object, disabling / changing
consumer ARN and restoring
+ * from persisted state can be done via {@link
KinesisIOOptions#setKinesisIOConsumerArns(Map)}:
+ *
+ * <pre>{@code --kinesisIOConsumerArns={
+ * "stream-01": " < new consumer ARN > ", <- updated ARN
+ * "stream-02": null, <- disabling EFO
+ * ...
+ * }}</pre>
+ *
+ * <p>EFO can be enabled / disabled any time without loosing consumer's
positions in shards which
+ * were already checkpoint-ed. Consumer ARN for a given stream can be changed
any time, too.
+ *
+ * <h5>Enhanced Fan-Out and other KinesisIO settings</h5>
+ *
+ * <p>When EFO is enabled, the following configurations are ignored:
+ *
+ * <ul>
+ * <li>{@link Read#withMaxCapacityPerShard(Integer)}
Review Comment:
Created https://github.com/apache/beam/issues/26257 to address this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]