[ 
https://issues.apache.org/jira/browse/BEAM-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799208#comment-16799208
 ] 

Kenneth Knowles commented on BEAM-6886:
---------------------------------------

I just read the code. I think what you have done is the best workaround for 
now. The runner controls bundle size so it is not guaranteed. But generally, 
once you have delayed elements so that they are traveling from step to step 
"around the same time" you should see them in the same bundle. Somewhat more 
specifically, if they are all output in a single bundle, most likely they are 
buffered together for the next step and will be read as a whole bundle. This is 
not guaranteed. Any runner is free to split the bundling any way that it 
chooses.

> Change batch handling in ElasticsearchIO to avoid necessity for 
> GroupIntoBatches
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-6886
>                 URL: https://issues.apache.org/jira/browse/BEAM-6886
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>    Affects Versions: 2.11.0
>            Reporter: Egbert
>            Priority: Major
>
> I have a streaming job inserting records into an Elasticsearch cluster. I set 
> the batch size appropriately big, but I found out this is not causing any 
> effect at all: I found that all elements are inserted in batches of 1 or 2 
> elements.
> The reason seems to be that this is a streaming pipeline, which may result in 
> tiny bundles. Since ElasticsearchIO uses `@FinishBundle` to flush a batch, 
> this will result in equally small batches.
> This results in a huge amount of bulk requests with just one element, 
> grinding the Elasticsearch cluster to a halt.
> I have now been able to work around this by using a `GroupIntoBatches` 
> operation before the insert, but this results in 3 steps (mapping to a key, 
> applying GroupIntoBatches, stripping key and outputting all collected 
> elements), making the process quite awkward.
> A much better approach would be to internalize this into the ElasticsearchIO 
> write transform.. Use a timer that flushes the batch at batch size or end of 
> window, not at the end of a bundle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to