[
https://issues.apache.org/jira/browse/HUDI-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit updated HUDI-5023:
------------------------------
Status: Patch Available (was: In Progress)
> Add new Executor avoiding Queueing in the write-path
> ----------------------------------------------------
>
> Key: HUDI-5023
> URL: https://issues.apache.org/jira/browse/HUDI-5023
> Project: Apache Hudi
> Issue Type: Improvement
> Components: writer-core
> Reporter: Alexey Kudinkin
> Assignee: Yue Zhang
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.13.0
>
>
> We should evaluate removing _any queueing_ (BoundedInMemoryQueue,
> DisruptorQueue) on the write path for multiple reasons:
> *It breaks up vertical chain of transformations applied to data*
> Spark (alas other engines) rely on the notion of _Iteration_ to vertically
> compose all transformations applied to a single record to allow for effective
> _stream_ processing, where all transformations are applied to an _Iterator,
> yielding records_ from the source, that way
> # Chain of transformations* is applied to every record one by one, allowing
> to effectively limit amount of memory used to the number of records being
> read and processed simultaneously (if the reading is not batched, it'd be
> just a single record), which in turn allows
> # To limit # of memory allocations required to process a single record.
> Consider the opposite: if we'd do it breadth-wise, applying first
> transformation to _all_ of the records, we will have to store all of
> transformed records in memory which is costly from both GC overhead as well
> as pure object churn perspectives.
>
> Enqueueing is essentially violates both of these invariants, breaking up
> {_}stream{_}-like processing model and forcing records to be kept in memory
> for no good reason.
>
> * This chain is broken up at shuffling points (collection of tasks executed
> b/w these shuffling points are called stages in Spark)
>
> *It requires data to be allocated on the heap*
> As was called out in the previous paragraph, enqueueing raw data read from
> the source breaks up _stream_ processing paradigm and forces records to be
> persisted in the heap.
> Consider following example: plain ParquetReader from Spark actually uses
> *mutable* `ColumnarBatchRow` providing a Row-based view into the batch of
> data being read from the file.
> Now, since it's a mutable object we can use it to _iterate_ over all of the
> records (while doing stream-processing) ultimately producing some "output"
> (either writing into another file, shuffle block, etc), but we +can't keep a
> reference on it+ (for ex, by +enqueueing+ it) – since the object is mutable.
> Instead we are forced to make a *copy* of it, which will obviously require us
> to allocate it on the heap.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)