psolomin opened a new issue, #26293:
URL: https://github.com/apache/beam/issues/26293

   ### What needs to happen?
   
   AWS AsyncKinesisClient has the following behaviour when a subscription is 
gracefully closed by the client and then created again:
   
   ```
   If you call SubscribeToShard again with the same ConsumerARN and ShardId 
within 5 seconds of a successful call, you'll get a ResourceInUseException.
   ```
   
   When a Beam application runs, it has N subscriptions (one per shard) it 
receives data from.
   
   - If a user code in a pipeline throws and error or some other failure 
occurs, runner will put Beam application down, closing KinesisSource, which 
will close existing subscriptions. After some time, the runner will try to 
re-start the app and the Source will try to establish subscriptions again.
   - If such a thing happens faster than within 5 seconds, creation of new 
shard subscriptions will fail with `ResourceInUseException` and the runner will 
put the app down, then re-start the app again.
   - If runner is not configured to increase restart back-off interval, the app 
will stay in a crash loop
   
   ## Possible solution
   
   Since `ResourceInUseException` is not retry-able, setting retry policy for 
`KinesisAsyncClient` itself doesn't help.
   
   Instead, this part of code may handle is as retry-able, with, say, max 5 
attempts:
   
   
https://github.com/apache/beam/blob/9c00dca093f6ddfa18a879e0b9fef76871f0902a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java#L102
   
   This has a downside though: hiding `ResourceInUseException` may hide issues 
with the IO, for example, if it is erroneously creates splits with same shard 
IDs and multiple nodes of the app try to subscribe to the same shard. Logging 
WARN is mandatory but implementation should also guarantee that num of attempts 
is not re-set and `ResourceInUseException` eventually bubbles up.
   
   ## References
   
   - 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
   - 
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java#L184
   
   
   ### Issue Priority
   
   Priority: 3 (nice-to-have improvement)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to