mosche commented on a change in pull request #16286:
URL: https://github.com/apache/beam/pull/16286#discussion_r772513555
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java
##########
@@ -145,8 +148,15 @@ private void readLoop(ShardRecordsIterator
shardRecordsIterator, RateLimitPolicy
List<KinesisRecord> kinesisRecords =
shardRecordsIterator.readNextBatch();
try {
for (KinesisRecord kinesisRecord : kinesisRecords) {
- recordsQueue.put(kinesisRecord);
-
numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
+ while (true) {
+ if (!poolOpened.get()) {
Review comment:
Goal here is to stop as soon as possible to not delay shutting down the
reader. The loop to enqueue all new record can take very long, especially using
aggregated records. Therefore it's important to check if the pool is still open
before every new attempt to offer a record to the queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]