echauchot commented on pull request #14347:
URL: https://github.com/apache/beam/pull/14347#issuecomment-822295067


   > This change set represents a rather large (and backward compatible) change 
to the way ElastichsearchIO.Write operates. Presently, the Write transform has 
2 responsibilities:
   > 
   > 1. Convert input documents into Bulk API entities, serializing based on 
user settings (partial update, delete, upsert, etc) -> `DocToBulk`
   > 2. Batch the converted Bulk API entities together and interface with the 
target ES cluster  -> `BulkIO`
   > 
   > This PR aims to separate these 2 responsibilities into discrete 
PTransforms to allow for greater flexibility while also maintaining the 
convenience of the Write transform to perform both document conversion and IO 
serially. Examples of how the flexibility of separate transforms could be used:
   > 
   > 1. Unit testing. It becomes trivial for pipeline developers to ensure that 
output Bulk API entities for a given set of inputs will produce an expected 
result, without the need for an available Elasticsearch cluster.
   > 2. Flexible options for data backup. Serialized Bulk API entities can be 
forked and sent to both Elasticsearch and a data lake.
   > 3. Mirroring data to multiple clusters. Presently, mirroring data to 
multiple clusters would require duplicate computation.
   > 4. Better batching with input streams in one job. A job may produce 
multiple "shapes" of Bulk API entities based on multiple input types, and then 
"fan-in" all serialized Bulk entities into a single BulkIO transform to improve 
batching semantics.
   > 5. Decoupled jobs. Corollary to (4) above. Job(s) could be made to produce 
Bulk entities and then publish them to a message bus. A distinct job could 
consume from that message bus and solely be responsible for IO with the target 
cluster(s).
   > 6. Easier support for multiple BulkIO semantics.
   > 
   
   => Reading at the overall design goals, it looks very promising and a good 
analysis of the missing properties of the curent architecture ! Thanks !
   
   > Expanding on point (6), this PR also introduces a new (optional) way to 
batch entities for bulk requests: Stateful Processing. Presently, Bulk request 
size is limited by the lesser of Runner bundle size and `maxBatchSize` user 
setting. In my experience, bundle sizes are often very small, and can be a 
small as 1 or 2. When that’s the case, it means Bulk requests contain only 1 or 
2 documents, and it’s effectively the same as not using the Bulk API at all. 
`BulkIOStatefulFn` is made to be compatible with `GroupIntoBatches` which will 
use entity count and (optionally) elapsed time to create batches much closer to 
the `maxBatchSize` setting to improve throughput.
   
   => True that very small batches can exist for example Flink being a 
streaming oriented platform, Flink runner tends to create very small Beam 
bundles. So, when the bundle is finished processing (finishBundle is called), 
the ES bulk request is sent leading to small ES bulk. Leveraging 
_GroupIntoBatches_ that creates trans-bundle groups and still respect Beam 
semantics (windowing, bundle retries etc...) is a very good idea.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to