You'll need to measure more but generally the bottleneck for writes is
usually going to be the disk itself.  Unfortunately, standard OS buffered
I/O has some pretty negative behaviors in this case.  First I'll describe
what I generally see happen (the last time I profiled this was a while back
but I don't think anything substantial has changed).

* Initially, writes are very fast.  The OS `write` call is simply a memcpy
from user space into kernel space.  The actual flushing the data from
kernel space to disk happens asynchronously unless you are using direct I/O
(which is not currently supported).
* Over time, assuming the data arrival rate is faster than the data write
rate, the data will accumulate in kernel memory.  For example, if you
continuously run the Linux `free` program you will see the `free` column
decrease and the `buff/cache` column decreases.  The `available` column
should generally stay consistent (kernel memory that is in use but can
technically be flushed to disk if needed is still considered "available"
but not "free")
* Once the `free` column reaches 0 then a few things happen.  First, the
calls to `write` are no longer fast (the write cannot complete until some
existing data has been flushed to disk).  Second, other processes that
aren't in use might start swapping their data to disk (you will generally
see the entire system, if it is interactive, grind to a halt).  Third, if
you have an OOM-killer active, it may start to kill running applications.
It isn't supposed to do so but there are sometimes bugs[1].
* Assuming the OOM killer does not kill your application then, because the
`write` calls slow down, the number of rows in the dataset writer's queue
will start to fill up (this is captured by the variable
`rows_in_flight_throttle`.
* Once the rows_in_flight_throttle is full it will pause and the dataset
writer will return an unfinished future (asking the caller to back off).
* Once this happens the caller will apply backpressure (if being used in
Acero) which will pause the reader.  This backpressure is not instant and
generally each running CPU thread still delivers whatever batch it is
working on.  These batches essentially get added to an asynchronous
condition variable waiting on the dataset writer queue to free up.  This is
the spot where the ThrottledAsyncTaskScheduler is used.

The stack dump that you reported is not exactly what I would have expected
but it might still match the above description.  At this point I am just
sort of guessing.  When the dataset writer frees up enough to receive
another batch it will do what is effectively a "notify all" and all of the
compute threads are waking up and trying to add their batch to the dataset
writer.  One of these gets through, gets added to the dataset writer, and
then backpressure is applied again and all the requests pile up once
again.  It's possible that a "resume sending" signal is sent and this might
actually lead to RAM filling up more.  We could probably mitigate this by
adding a low water mark to the dataset writer's backpressure throttle (so
it doesn't send the resume signal as soon as the queue has room but waits
until the queue is half full).

I'd recommend watching the output of `free` (or monitoring memory in some
other way) and verifying the above.  I'd also suggest lowering the number
of CPU threads and see how that affects performance.  If you lower the CPU
threads enough then you should eventually get it to a point where your
supply of data is slower then your writer and I wouldn't expect memory to
accumulate.  These things are solutions but might give us more clues into
what is happening.

[1]
https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used

On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com> wrote:

> Hi,
>     I'm using flight to receive streams from client and write to the
> storage with python `pa.dataset.write_dataset` API. The whole data is
> 1 Billion rows, over 40GB with one partition column ranges from 0~63.
> The container runs at 8-cores CPU and 4GB ram resources.
>     It can be done about 160s (6M rows/s, each record batch is about
> 32K rows) for completing transferring and writing almost
> synchronously, after setting 128 for io_thread_count.
>      Then I'd like to find out the bottleneck of the system, CPU or
> RAM or storage?
>     1. I extend the ram into 32GB, then the server consumes more ram,
> the writing progress works at the beginning, then suddenly slow down
> and the data accumulated into ram until OOM.
>     2. Then I set the ram to 64GB, so that the server will not killed
> by OOM. Same happens, also, after all the data is transferred (in
> memory), the server consumes all CPU shares (800%), but still very
> slow on writing (not totally stopped, but about 100MB/minute).
>     2.1 I'm wondering if the io thread is stuck, or the computation
> task is stuck. After setting both io_thread_count and cpu_count to 32,
> I wrapped the input stream of write_dataset with a callback on each
> record batch, I can tell that all the record batches are consumed into
> write_dataset API.
>     2.2 I dumped all threads stack traces and grab a flamegraph. See
> https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb.
>
>      It seems that all threads stucks at ThrottledAsyncTaskSchedulerImpl.
>
> --
> ---------------------
> Best Regards,
> Wenbo Hu,
>

Reply via email to