[
https://issues.apache.org/jira/browse/BEAM-13443?focusedWorklogId=698815&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-698815
]
ASF GitHub Bot logged work on BEAM-13443:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Dec/21 16:40
Start Date: 20/Dec/21 16:40
Worklog Time Spent: 10m
Work Description: mosche commented on a change in pull request #16286:
URL: https://github.com/apache/beam/pull/16286#discussion_r772513555
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java
##########
@@ -145,8 +148,15 @@ private void readLoop(ShardRecordsIterator
shardRecordsIterator, RateLimitPolicy
List<KinesisRecord> kinesisRecords =
shardRecordsIterator.readNextBatch();
try {
for (KinesisRecord kinesisRecord : kinesisRecords) {
- recordsQueue.put(kinesisRecord);
-
numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
+ while (true) {
+ if (!poolOpened.get()) {
Review comment:
Goal here is to stop as soon as possible to not delay shutting down the
reader. The loop to enqueue all new record can take very long, especially using
aggregated records. Therefore it's important to check if the pool is still open
before every new attempt to offer a record to the queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 698815)
Time Spent: 50m (was: 40m)
> Poor handling of aggregated records in KinesisIO.read
> -----------------------------------------------------
>
> Key: BEAM-13443
> URL: https://issues.apache.org/jira/browse/BEAM-13443
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Reporter: Moritz Mack
> Assignee: Moritz Mack
> Priority: P2
> Labels: aws, aws-sdk-v1, aws-sdk-v2, performance
> Time Spent: 50m
> Remaining Estimate: 0h
>
> The way the Kinesis source is implemented it doesn't play well with
> aggregated records.
> Even using configuration options it's fairly hard to configure it in a way
> that becomes sufficiently performant.
> One of the key issues is around bundle size & record queue size vs the number
> of aggregated records per message. These might, in certain situations, exceed
> the internal queue size by far unnecessarily blocking threads and requiring
> thread pools to be forcefully taken down.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)