psolomin commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1157216496


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires 
implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, 
EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving 
data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * 
href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html";>Consumers
 with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link 
Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances 
via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": 
"arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": 
"arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link 
Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer 
will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not 
(re)start EFO consumer(s)

Review Comment:
   KinesisAsyncClient does not allow client-side closing of a subscription and 
creating it again without waiting 5 seconds.
   
   > If you call SubscribeToShard again with the same ConsumerARN and ShardId 
within 5 seconds of a successful call, you'll get a ResourceInUseException.
   
   If the runner cancels the job for whatever reason, and re-starts it shortly 
after, KinesisAsyncClient will throw this error, and the runner will then 
re-start the job again.
   
   I can try to reproduce this again and check if this behaviour still holds 
for Flink runner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to