psolomin commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1157285401
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
* Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires
implementing {@link
* RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
*
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out,
EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving
data from the stream.
+ *
+ * <p>More details can be found here: <a
+ *
href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers
with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link
Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ * .withStreamName("streamName")
+ * .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ * .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances
via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ * "stream-01":
"arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ * "stream-02":
"arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ * ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link
Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer
will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not
(re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest,
SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.
Review Comment:
I am not sure it will. This test passes, for example.
```
@Test public void a() {
ResourceInUseException e = ResourceInUseException.builder().build();
assertThat(e.retryable()).isFalse();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]