[
https://issues.apache.org/jira/browse/BEAM-12093?focusedWorklogId=584973&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-584973
]
ASF GitHub Bot logged work on BEAM-12093:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Apr/21 08:51
Start Date: 19/Apr/21 08:51
Worklog Time Spent: 10m
Work Description: echauchot commented on pull request #14347:
URL: https://github.com/apache/beam/pull/14347#issuecomment-822295067
> This change set represents a rather large (and backward compatible) change
to the way ElastichsearchIO.Write operates. Presently, the Write transform has
2 responsibilities:
>
> 1. Convert input documents into Bulk API entities, serializing based on
user settings (partial update, delete, upsert, etc) -> `DocToBulk`
> 2. Batch the converted Bulk API entities together and interface with the
target ES cluster -> `BulkIO`
>
> This PR aims to separate these 2 responsibilities into discrete
PTransforms to allow for greater flexibility while also maintaining the
convenience of the Write transform to perform both document conversion and IO
serially. Examples of how the flexibility of separate transforms could be used:
>
> 1. Unit testing. It becomes trivial for pipeline developers to ensure that
output Bulk API entities for a given set of inputs will produce an expected
result, without the need for an available Elasticsearch cluster.
> 2. Flexible options for data backup. Serialized Bulk API entities can be
forked and sent to both Elasticsearch and a data lake.
> 3. Mirroring data to multiple clusters. Presently, mirroring data to
multiple clusters would require duplicate computation.
> 4. Better batching with input streams in one job. A job may produce
multiple "shapes" of Bulk API entities based on multiple input types, and then
"fan-in" all serialized Bulk entities into a single BulkIO transform to improve
batching semantics.
> 5. Decoupled jobs. Corollary to (4) above. Job(s) could be made to produce
Bulk entities and then publish them to a message bus. A distinct job could
consume from that message bus and solely be responsible for IO with the target
cluster(s).
> 6. Easier support for multiple BulkIO semantics.
>
=> Reading at the overall design goals, it looks very promising and a good
analysis of the missing properties of the curent architecture ! Thanks !
> Expanding on point (6), this PR also introduces a new (optional) way to
batch entities for bulk requests: Stateful Processing. Presently, Bulk request
size is limited by the lesser of Runner bundle size and `maxBatchSize` user
setting. In my experience, bundle sizes are often very small, and can be a
small as 1 or 2. When that’s the case, it means Bulk requests contain only 1 or
2 documents, and it’s effectively the same as not using the Bulk API at all.
`BulkIOStatefulFn` is made to be compatible with `GroupIntoBatches` which will
use entity count and (optionally) elapsed time to create batches much closer to
the `maxBatchSize` setting to improve throughput.
=> True that very small batches can exist for example Flink being a
streaming oriented platform, Flink runner tends to create very small Beam
bundles. So, when the bundle is finished processing (finishBundle is called),
the ES bulk request is sent leading to small ES bulk. Leveraging
_GroupIntoBatches_ that creates trans-bundle groups and still respect Beam
semantics (windowing, bundle retries etc...) is a very good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 584973)
Time Spent: 2.5h (was: 2h 20m)
> Overhaul ElasticsearchIO#Write
> ------------------------------
>
> Key: BEAM-12093
> URL: https://issues.apache.org/jira/browse/BEAM-12093
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Reporter: Evan Galpin
> Priority: P2
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> The current ElasticsearchIO#Write is great, but there are two related areas
> which could be improved:
> # Separation of concern
> # Bulk API batch size optimization
>
> Presently, the Write transform has 2 responsibilities which are coupled and
> inseparable by users:
> # Convert input documents into Bulk API entities, serializing based on user
> settings (partial update, delete, upsert, etc)
> # Batch the converted Bulk API entities together and interface with the
> target ES cluster
>
> Having these 2 roles tightly coupled means testing requires an available
> Elasticsearch cluster, making unit testing almost impossible. Allowing access
> to the serialized documents would make unit testing much easier for pipeline
> developers, among numerous other benefits to having separation between
> serialization and IO.
> Relatedly, the batching of entities when creating Bulk API payloads is
> currently limited by the lesser of Beam Runner bundling semantics, and the
> `ElasticsearchIO#Write#maxBatchSize` setting. This is understandable for
> portability between runners, but it also means most Bulk payloads only have a
> few (1-5) entities. By using Stateful Processing to better adhere to the
> `ElasticsearchIO#Write#maxBatchSize` setting, we have been able to drop the
> number of indexing requests in an Elasticsearch cluster by 50-100x.
> Separating the role of document serialization and IO allows supporting
> multiple IO techniques with minimal and understandable code.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)