[
https://issues.apache.org/jira/browse/BEAM-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798965#comment-16798965
]
Egbert commented on BEAM-6886:
------------------------------
I didn't mention this and I don't know if it matters, but I'm using the
DataflowRunner for this job.
I added some metrics to the ElasticsearchIO transform on my own job, and it
seems that the transform taking the Iterable input from the GroupIntoBatches
and outputs each grouped element separately results in all elements being in
the same bundle in ElasticsearchIO. I don't know anything about the retry
configuration.
If there is no proper or correct way to implement this, I think the
documentation (javadocs) should be updated to inform the user of the fact that
withBatchSize does not accomplish much by default in streaming pipelines - it
cost me quite some time to find the root cause of all the Elasticsearch
connection exceptions I was getting, even with quite a small load - I couldn't
get above an effective 50 documents indexed per second. Adding the
GroupIntoBatches increased this easily to well over 1000 documents per second.
> Change batch handling in ElasticsearchIO to avoid necessity for
> GroupIntoBatches
> --------------------------------------------------------------------------------
>
> Key: BEAM-6886
> URL: https://issues.apache.org/jira/browse/BEAM-6886
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Affects Versions: 2.11.0
> Reporter: Egbert
> Priority: Major
>
> I have a streaming job inserting records into an Elasticsearch cluster. I set
> the batch size appropriately big, but I found out this is not causing any
> effect at all: I found that all elements are inserted in batches of 1 or 2
> elements.
> The reason seems to be that this is a streaming pipeline, which may result in
> tiny bundles. Since ElasticsearchIO uses `@FinishBundle` to flush a batch,
> this will result in equally small batches.
> This results in a huge amount of bulk requests with just one element,
> grinding the Elasticsearch cluster to a halt.
> I have now been able to work around this by using a `GroupIntoBatches`
> operation before the insert, but this results in 3 steps (mapping to a key,
> applying GroupIntoBatches, stripping key and outputting all collected
> elements), making the process quite awkward.
> A much better approach would be to internalize this into the ElasticsearchIO
> write transform.. Use a timer that flushes the batch at batch size or end of
> window, not at the end of a bundle.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)