[
https://issues.apache.org/jira/browse/BEAM-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801604#comment-16801604
]
Egbert commented on BEAM-6886:
------------------------------
[~echauchot] you are right in saying that `withMaxBatchSize` only says it is
the maximum. However, it wasn't clear to me that this maximum will never be
reached when running in streaming mode. At least not using the Dataflow Runner.
I understand the desire to reduce delays, but this in fact increases the delays
as each failure in ingesting to Elasticsearch will result in a delay due to
retrying. Even with moderate loads, an Elasticsearch cluster is easy to break
with many individual indexing requests.
Also, it doesn't need to emit only at the end of the Window, it can very well
emit when the batch size has been reached in the meantime. Comparing to
BigQueryIO for example, I see that uses a GBK internally so that will also wait
on the window trigger. I have configured my window to trigger with an
elementCountAtLeast(batchSize). As I see quite high loads regularly, this will
result in lots of triggerings within the same window, giving plenty of
opportunity to index with minimal delay.
Even with thousands of ingested messages per second, the bundle size remains
tiny at 1 or 2 messages per bundle in Dataflow.
The GroupIntoBatches could be less awkward, but not only the keying/unkeying is
necessary. I now have the following steps:
* Keying: Value -> Key, Value
* Batching (GroupIntoBatches): Key, Value -> Key, Iterable<Value>
* Unbatching: Key, Iterable<Value> -> Key, Value
* Unkeying: Key, Value -> Key
All these steps are necessary as ElasticsearchIO does not work with KV-pairs
nor iterables, and GroupIntoBatches does not work without keys. A variant of
GroupIntoBatches that doesn't require a key and that outputs elements
separately rather than as an Iterable would make it much less awkward. Or,
alternatively, a variant of ElasticsearchIO.Write that accepts KV-pairs where
the value is an Iterable.
I would prefer a solution that adds a parameter influencing windowing /
grouping behavior of ElasticsearchIO so that the developer has control over
what delay is acceptable in order to achieve a higher throughput with increased
batch size.
> Change batch handling in ElasticsearchIO to avoid necessity for
> GroupIntoBatches
> --------------------------------------------------------------------------------
>
> Key: BEAM-6886
> URL: https://issues.apache.org/jira/browse/BEAM-6886
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Affects Versions: 2.11.0
> Reporter: Egbert
> Priority: Major
>
> I have a streaming job inserting records into an Elasticsearch cluster. I set
> the batch size appropriately big, but I found out this is not causing any
> effect at all: I found that all elements are inserted in batches of 1 or 2
> elements.
> The reason seems to be that this is a streaming pipeline, which may result in
> tiny bundles. Since ElasticsearchIO uses `@FinishBundle` to flush a batch,
> this will result in equally small batches.
> This results in a huge amount of bulk requests with just one element,
> grinding the Elasticsearch cluster to a halt.
> I have now been able to work around this by using a `GroupIntoBatches`
> operation before the insert, but this results in 3 steps (mapping to a key,
> applying GroupIntoBatches, stripping key and outputting all collected
> elements), making the process quite awkward.
> A much better approach would be to internalize this into the ElasticsearchIO
> write transform.. Use a timer that flushes the batch at batch size or end of
> window, not at the end of a bundle.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)