echauchot commented on pull request #14347: URL: https://github.com/apache/beam/pull/14347#issuecomment-822295067
> This change set represents a rather large (and backward compatible) change to the way ElastichsearchIO.Write operates. Presently, the Write transform has 2 responsibilities: > > 1. Convert input documents into Bulk API entities, serializing based on user settings (partial update, delete, upsert, etc) -> `DocToBulk` > 2. Batch the converted Bulk API entities together and interface with the target ES cluster -> `BulkIO` > > This PR aims to separate these 2 responsibilities into discrete PTransforms to allow for greater flexibility while also maintaining the convenience of the Write transform to perform both document conversion and IO serially. Examples of how the flexibility of separate transforms could be used: > > 1. Unit testing. It becomes trivial for pipeline developers to ensure that output Bulk API entities for a given set of inputs will produce an expected result, without the need for an available Elasticsearch cluster. > 2. Flexible options for data backup. Serialized Bulk API entities can be forked and sent to both Elasticsearch and a data lake. > 3. Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would require duplicate computation. > 4. Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk API entities based on multiple input types, and then "fan-in" all serialized Bulk entities into a single BulkIO transform to improve batching semantics. > 5. Decoupled jobs. Corollary to (4) above. Job(s) could be made to produce Bulk entities and then publish them to a message bus. A distinct job could consume from that message bus and solely be responsible for IO with the target cluster(s). > 6. Easier support for multiple BulkIO semantics. > => Reading at the overall design goals, it looks very promising and a good analysis of the missing properties of the curent architecture ! Thanks ! > Expanding on point (6), this PR also introduces a new (optional) way to batch entities for bulk requests: Stateful Processing. Presently, Bulk request size is limited by the lesser of Runner bundle size and `maxBatchSize` user setting. In my experience, bundle sizes are often very small, and can be a small as 1 or 2. When that’s the case, it means Bulk requests contain only 1 or 2 documents, and it’s effectively the same as not using the Bulk API at all. `BulkIOStatefulFn` is made to be compatible with `GroupIntoBatches` which will use entity count and (optionally) elapsed time to create batches much closer to the `maxBatchSize` setting to improve throughput. => True that very small batches can exist for example Flink being a streaming oriented platform, Flink runner tends to create very small Beam bundles. So, when the bundle is finished processing (finishBundle is called), the ES bulk request is sent leading to small ES bulk. Leveraging _GroupIntoBatches_ that creates trans-bundle groups and still respect Beam semantics (windowing, bundle retries etc...) is a very good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
