[ 
https://issues.apache.org/jira/browse/BEAM-13443?focusedWorklogId=698815&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-698815
 ]

ASF GitHub Bot logged work on BEAM-13443:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Dec/21 16:40
            Start Date: 20/Dec/21 16:40
    Worklog Time Spent: 10m 
      Work Description: mosche commented on a change in pull request #16286:
URL: https://github.com/apache/beam/pull/16286#discussion_r772513555



##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java
##########
@@ -145,8 +148,15 @@ private void readLoop(ShardRecordsIterator 
shardRecordsIterator, RateLimitPolicy
           List<KinesisRecord> kinesisRecords = 
shardRecordsIterator.readNextBatch();
           try {
             for (KinesisRecord kinesisRecord : kinesisRecords) {
-              recordsQueue.put(kinesisRecord);
-              
numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
+              while (true) {
+                if (!poolOpened.get()) {

Review comment:
       Goal here is to stop as soon as possible to not delay shutting down the 
reader. The loop to enqueue all new record can take very long, especially using 
aggregated records. Therefore it's important to check if the pool is still open 
before every new attempt to offer a record to the queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 698815)
    Time Spent: 50m  (was: 40m)

> Poor handling of aggregated records in KinesisIO.read
> -----------------------------------------------------
>
>                 Key: BEAM-13443
>                 URL: https://issues.apache.org/jira/browse/BEAM-13443
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>              Labels: aws, aws-sdk-v1, aws-sdk-v2, performance
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The way the Kinesis source is implemented it doesn't play well with 
> aggregated records.
> Even using configuration options it's fairly hard to configure it in a way 
> that becomes sufficiently performant.
> One of the key issues is around bundle size & record queue size vs the number 
> of aggregated records per message. These might, in certain situations, exceed 
> the internal queue size by far unnecessarily blocking threads and requiring 
> thread pools to be forcefully taken down.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to