psolomin opened a new issue, #26293: URL: https://github.com/apache/beam/issues/26293
### What needs to happen? AWS AsyncKinesisClient has the following behaviour when a subscription is gracefully closed by the client and then created again: ``` If you call SubscribeToShard again with the same ConsumerARN and ShardId within 5 seconds of a successful call, you'll get a ResourceInUseException. ``` When a Beam application runs, it has N subscriptions (one per shard) it receives data from. - If a user code in a pipeline throws and error or some other failure occurs, runner will put Beam application down, closing KinesisSource, which will close existing subscriptions. After some time, the runner will try to re-start the app and the Source will try to establish subscriptions again. - If such a thing happens faster than within 5 seconds, creation of new shard subscriptions will fail with `ResourceInUseException` and the runner will put the app down, then re-start the app again. - If runner is not configured to increase restart back-off interval, the app will stay in a crash loop ## Possible solution Since `ResourceInUseException` is not retry-able, setting retry policy for `KinesisAsyncClient` itself doesn't help. Instead, this part of code may handle is as retry-able, with, say, max 5 attempts: https://github.com/apache/beam/blob/9c00dca093f6ddfa18a879e0b9fef76871f0902a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java#L102 This has a downside though: hiding `ResourceInUseException` may hide issues with the IO, for example, if it is erroneously creates splits with same shard IDs and multiple nodes of the app try to subscribe to the same shard. Logging WARN is mandatory but implementation should also guarantee that num of attempts is not re-set and `ResourceInUseException` eventually bubbles up. ## References - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html - https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java#L184 ### Issue Priority Priority: 3 (nice-to-have improvement) ### Issue Components - [ ] Component: Python SDK - [X] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [X] Component: IO connector - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
