Hi,
    Glad to know the problem has been identified. But the workaround
is not very suitable for my situation, from the data source has no
idea about whether the queue is filling up. We were expecting to make
flow control based on used memory. But we cannot get detailed memory
used by each write task when server having multiple tasks. Also, when
flow control gets involved, the queue may has been filled up already.
Also the server may deploy into different places, which storage
performance cannot be estimated.
    Maybe, for my situation, the workaround may write a partitioning
dispatch, and call write_dataset multiple for each partition.

Weston Pace <weston.p...@gmail.com> 于2023年7月31日周一 21:47写道:
>
> Thanks.  This is a very helpful reproduction.
>
> I was able to reproduce and diagnose the problem.  There is a bug on our
> end and I have filed [1] to address it.  There are a lot more details in
> the ticket if you are interested.  In the meantime, the only workaround I
> can think of is probably to slow down the data source enough that the queue
> doesn't fill up.
>
> [1] https://github.com/apache/arrow/issues/36951
>
>
> On Sun, Jul 30, 2023 at 8:15 PM Wenbo Hu <huwenbo1...@gmail.com> wrote:
>
> > Hi,
> >     The following code should reproduce the problem.
> >
> > ```
> >
> > import pyarrow as pa
> > import pyarrow.fs, pyarrow.dataset
> >
> > schema = pa.schema([("id", pa.utf8()), ("bucket", pa.uint8())])
> >
> >
> > def rb_generator(buckets, rows, batches):
> >     batch = pa.record_batch(
> >         [[f"id-{i}" for i in range(rows)], [i % buckets for i in
> > range(rows)]],
> >         schema=schema,
> >     )
> >
> >     for i in range(batches):
> >         yield batch
> >         print(f"yielding {i}")
> >
> >
> > if __name__ == "__main__":
> >     pa.set_io_thread_count(1)
> >     reader = pa.RecordBatchReader.from_batches(schema,
> > rb_generator(64, 32768, 1000000))
> >     local_fs = pa.fs.LocalFileSystem()
> >
> >     pa.dataset.write_dataset(
> >         reader,
> >         "/tmp/data_f",
> >         format="feather",
> >         partitioning=["bucket"],
> >         filesystem=local_fs,
> >         existing_data_behavior="overwrite_or_ignore"
> >     )
> >
> > ```
> >
> > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月30日周日 15:30写道:
> > >
> > > Hi,
> > >    Then another question is that "why back pressure not working on the
> > > input stream of write_dataset api?". If back pressure happens on the
> > > end of the acero stream for some reason (on queue stage or write
> > > stage), then the input stream should backpressure as well? It should
> > > keep the memory to a stable level so that the input speed would match
> > > the output speed.
> > >
> > >     Then, I made some other experiments with various io_thread_count
> > > values and bucket_size (partitions/opening files).
> > >
> > > 1. for bucket_size to 64 and io_thread_count/cpu_count to 1, the cpu
> > > is up to 100% after transferring done, but there is a very interesting
> > > output.
> > >     * flow transferring from client to server at the very first few
> > > batches are slow, less than 0.01M rows/s, then it speeds up to over 4M
> > > rows/s very quickly.
> > >     * I think at the very beginning stage, the backpressure works
> > > fine, until sometime, like the previous experiments, the backpressure
> > > makes the stream into a blackhole, then the io thread input stream
> > > stuck at some slow speed. (It's still writing, but takes a lot of time
> > > on waiting upstream CPU partitioning threads to push batches?)
> > >     * from iotop, the total disk write is dropping down very slowly
> > > after transferring done. But it may change over different experiments
> > > with the same configuration. I think the upstream backpressure is not
> > > working as expected, which makes the downstream writing keep querying.
> > > I think it may reveal something, maybe at some point, the slow writing
> > > enlarge the backpressure on the whole process (the write speed is
> > > dropping slowly), but the slow reason of writing is the upstream is
> > > already slow down.
> > >
> > > 2. Then I set cpu_count to 64
> > > 2.1 io_thread_count to 4.
> > > 2.1.1 , for bucket_size to 2/4/6, The system works fine. CPU is less
> > > than 100%. Backpressure works fine, memory will not accumulated much
> > > before the flow speed becomes stable.
> > > 2.1.2  when bucket_size to 8, the bug comes back. After transferring
> > > done, CPU is about 350% (only io thread is running?) and write from
> > > iotop is about 40M/s, then dropping down very slowly.
> > >
> > > 2.2. then I set both io_thread to 6,
> > > 2.2.1 for bucket_size to 6/8/16, The system works fine. CPU is about
> > > 100%. like 2.1.1
> > > 2.2.2 for bucket_size to 32, the bug comes back. CPU halts at 550%.
> > >
> > > 2.3 io_thread_count to 8
> > > 2.3.1 for bucket_size to 16, it fails somehow. After transferring
> > > done, the memory accumulated over 3G, but write speed is about 60M/s,
> > > which makes it possible to wait. CPU is about 600~700%. After the
> > > accumulated memory writing, CPU becomes normal.
> > > 2.3.2 for bucket_size to 32, it still fails. CPU halts at 800%.
> > > transferring is very fast (over 14M rows/s). the backpressure is not
> > > working at all.
> > >
> > >
> > > Weston Pace <weston.p...@gmail.com> 于2023年7月29日周六 01:08写道:
> > > >
> > > > > How many io threads are writing concurrently in a single
> > write_dataset
> > > > > call?
> > > >
> > > > With the default options, and no partitioning, it will only use 1 I/O
> > > > thread.  This is because we do not write to a single file in parallel.
> > > > If you change FileSystemDatasetWriteOptions::max_rows_per_file then
> > you may
> > > > see more than 1 I/O thread because we will start new files and write to
> > > > each one in parallel.
> > > > If you have partitioning then you may see more than 1 I/O thread
> > because we
> > > > will be writing to multiple files.
> > > >
> > > > We use, at most, 1 I/O thread per file being written.
> > > >
> > > > > How do they schedule?
> > > >
> > > > There are 3 stages.
> > > >
> > > > Arrival stage: Data arrives on an Acero worker thread (CPU thread).  We
> > > > will partition the data at this point.  For each batch we schedule a
> > Queue
> > > > Batch task.
> > > > Queue stage: This stage runs on the CPU thread.  It finds the correct
> > file
> > > > queue for the batch and adds the batch to the file queue.  It may
> > split the
> > > > batch if max_rows_per_file is set.  It may trigger backpressure if
> > there
> > > > are too many rows queued on files.  This stage runs serially, on the
> > CPU
> > > > thread.  There is never more than one queue task running.
> > > > Write stage: Each file has a number of write tasks.  These run in
> > parallel
> > > > across files but serially within a file.  These are I/O tasks.
> > > >
> > > > > The throttle code seems only one task got running?
> > > >
> > > > Yes, there is a throttled scheduler used for the queue stage (we only
> > run
> > > > one queue task at a time).  There is a throttled scheduler per file
> > used
> > > > for the write stage.  All of these are configured to only allow one
> > task at
> > > > a time to run.
> > > >
> > > > > What else can I do to inspect the problem?
> > > >
> > > > I think we need to find out why the CPU is still 800% after the
> > transfer is
> > > > done when partitioning is enabled.  I would expect the CPU to drop to
> > 0%
> > > > even if it takes several seconds (or longer) for the cached data to
> > flush
> > > > to the disk.  The strack trace you shared is helpful but I don't know
> > the
> > > > root cause yet.  All of the threads are stuck on locking / unlocking in
> > > > FutureImpl::TryAddCallback but that critical section is very small.
> > So it
> > > > seems like there is some kind of task storm.  I think this is similar
> > to a
> > > > condition_variable that has thousands of waiters and is constantly
> > doing a
> > > > notify_all.
> > > >
> > > > I think we will need to figure out some kind of reproducible test
> > case.  I
> > > > will try and find some time to run some experiments on Monday.  Maybe
> > I can
> > > > reproduce this by setting the backpressure limit to a very small
> > amount.
> > > >
> > > > On Fri, Jul 28, 2023 at 6:48 AM Wenbo Hu <huwenbo1...@gmail.com>
> > wrote:
> > > >
> > > > > Hi,
> > > > >    Thanks for your detailed explanation, I made some experiment
> > today.
> > > > >
> > > > > Before experiment,
> > > > > 1. To limit the resources used by the server, I use docker, which
> > uses
> > > > > cgroups. But "free" does not respect the resource limit inside the
> > > > > container.
> > > > > 2. I measured the write speed on the host by "dd if=/dev/zero
> > > > > of=./test.img bs=1G count=45 oflag=dsync", the output is "48318382080
> > > > > Byets(48 GB) Copies,132.558 s,365 MB/s"
> > > > > 3. I do not limit the memory of the container ( available over
> > 140GB),
> > > > > but the CPU is still limit to 8. According to you explanation,  the
> > > > > write process should never slow down, since it will write to the
> > > > > "memory cached" which is accounted as used memory until it is flushed
> > > > > to the storage by the OS.
> > > > >
> > > > > Additionally, the file format is "feather", writing with/without
> > > > > partitioning leads to different result.
> > > > >
> > > > > ## write without partitioning
> > > > > Everything works fine, no matter what value I set to io_thread_count
> > > > > or cpu_count.
> > > > > the performance of same configuration varies a lot, the initial peak
> > > > > speed may result in different memory usage, but the max average flow
> > > > > speed not varies a lot.
> > > > > Some records are below,
> > > > > 1. With cpu_count and io_thread_count to 128 CPU is less than 100%
> > and
> > > > > RES is less than 1G (after initial peak speed), average flow speed is
> > > > > 6.83M rows/s (45bytes per row).
> > > > > 2. With cpu_count to 1, io_thread_count to 16, CPU is a little over
> > > > > 100%, RES is about 3g at max, average flow speed is 4.64M rows/s, but
> > > > > it takes additional 6s to complete writing after transferring done.
> > > > > 3. With cpu_count to 1, io_thread_count to 128, performs almost as
> > > > > same as record 2.
> > > > >
> > > > > ## write with partitioning
> > > > > Writing with partitioning fails most of the time, setting lower cpu
> > > > > count not helping.
> > > > >  1. With cpu_count and io_thread_count to 128, CPU is 800% from
> > > > > begining, RES is growing slowing to 40.7G to the end of transferring,
> > > > > average flow speed is 3.24M rows/s. After that, CPU is still 800%,
> > but
> > > > > RES going down very slow at 200MB/minute. Write speed not recovered.
> > > > >  2.With cpu_count to 1, io_thread_count to 16, CPU goes up to 800%
> > > > > slower than record1, RES is growing to 44.1G to the end of
> > > > > transferring, average flow speed is 6.75M rows/s. Same happens as
> > > > > record 1 after transferring done.
> > > > > 3. With cpu_count to 1, io_thread_count to 128, CPU goes to 800% much
> > > > > slower than record2 (due to slower flow speed?), RES is growing to
> > 30G
> > > > > to the end of transferring, average flow speed is 1.62M rows/s. Same
> > > > > happens as record 1 after transferring done.
> > > > >
> > > > > Then I'm trying to limit the flow speed before writing queue got full
> > > > > with custom flow control (sleep on reader iteration based on
> > available
> > > > > memory) But the sleep time curve is not accurate, sometimes flow
> > slows
> > > > > down, but the queue got full anyway.
> > > > > Then the interesting thing happens, before the queue is full (memory
> > > > > quickly grows up), the CPU is not fully used. When memory grows up
> > > > > quickly, CPU goes up as well, to 800%.
> > > > > 1. Sometimes, the writing queue can overcome, CPU will goes down
> > after
> > > > > the memory accumulated. The writing speed recoved and memory back to
> > > > > normal.
> > > > > 2. Sometimes, it can't. IOBPS goes down sharply, and CPU never goes
> > > > > down after that.
> > > > >
> > > > > How many io threads are writing concurrently in a single
> > write_dataset
> > > > > call? How do they schedule? The throttle code seems only one task got
> > > > > running?
> > > > > What else can I do to inspect the problem?
> > > > >
> > > > > Weston Pace <weston.p...@gmail.com> 于2023年7月28日周五 00:33写道:
> > > > > >
> > > > > > You'll need to measure more but generally the bottleneck for
> > writes is
> > > > > > usually going to be the disk itself.  Unfortunately, standard OS
> > buffered
> > > > > > I/O has some pretty negative behaviors in this case.  First I'll
> > describe
> > > > > > what I generally see happen (the last time I profiled this was a
> > while
> > > > > back
> > > > > > but I don't think anything substantial has changed).
> > > > > >
> > > > > > * Initially, writes are very fast.  The OS `write` call is simply a
> > > > > memcpy
> > > > > > from user space into kernel space.  The actual flushing the data
> > from
> > > > > > kernel space to disk happens asynchronously unless you are using
> > direct
> > > > > I/O
> > > > > > (which is not currently supported).
> > > > > > * Over time, assuming the data arrival rate is faster than the
> > data write
> > > > > > rate, the data will accumulate in kernel memory.  For example, if
> > you
> > > > > > continuously run the Linux `free` program you will see the `free`
> > column
> > > > > > decrease and the `buff/cache` column decreases.  The `available`
> > column
> > > > > > should generally stay consistent (kernel memory that is in use but
> > can
> > > > > > technically be flushed to disk if needed is still considered
> > "available"
> > > > > > but not "free")
> > > > > > * Once the `free` column reaches 0 then a few things happen.
> > First, the
> > > > > > calls to `write` are no longer fast (the write cannot complete
> > until some
> > > > > > existing data has been flushed to disk).  Second, other processes
> > that
> > > > > > aren't in use might start swapping their data to disk (you will
> > generally
> > > > > > see the entire system, if it is interactive, grind to a halt).
> > Third, if
> > > > > > you have an OOM-killer active, it may start to kill running
> > applications.
> > > > > > It isn't supposed to do so but there are sometimes bugs[1].
> > > > > > * Assuming the OOM killer does not kill your application then,
> > because
> > > > > the
> > > > > > `write` calls slow down, the number of rows in the dataset
> > writer's queue
> > > > > > will start to fill up (this is captured by the variable
> > > > > > `rows_in_flight_throttle`.
> > > > > > * Once the rows_in_flight_throttle is full it will pause and the
> > dataset
> > > > > > writer will return an unfinished future (asking the caller to back
> > off).
> > > > > > * Once this happens the caller will apply backpressure (if being
> > used in
> > > > > > Acero) which will pause the reader.  This backpressure is not
> > instant and
> > > > > > generally each running CPU thread still delivers whatever batch it
> > is
> > > > > > working on.  These batches essentially get added to an asynchronous
> > > > > > condition variable waiting on the dataset writer queue to free
> > up.  This
> > > > > is
> > > > > > the spot where the ThrottledAsyncTaskScheduler is used.
> > > > > >
> > > > > > The stack dump that you reported is not exactly what I would have
> > > > > expected
> > > > > > but it might still match the above description.  At this point I
> > am just
> > > > > > sort of guessing.  When the dataset writer frees up enough to
> > receive
> > > > > > another batch it will do what is effectively a "notify all" and
> > all of
> > > > > the
> > > > > > compute threads are waking up and trying to add their batch to the
> > > > > dataset
> > > > > > writer.  One of these gets through, gets added to the dataset
> > writer, and
> > > > > > then backpressure is applied again and all the requests pile up
> > once
> > > > > > again.  It's possible that a "resume sending" signal is sent and
> > this
> > > > > might
> > > > > > actually lead to RAM filling up more.  We could probably mitigate
> > this by
> > > > > > adding a low water mark to the dataset writer's backpressure
> > throttle (so
> > > > > > it doesn't send the resume signal as soon as the queue has room
> > but waits
> > > > > > until the queue is half full).
> > > > > >
> > > > > > I'd recommend watching the output of `free` (or monitoring memory
> > in some
> > > > > > other way) and verifying the above.  I'd also suggest lowering the
> > number
> > > > > > of CPU threads and see how that affects performance.  If you lower
> > the
> > > > > CPU
> > > > > > threads enough then you should eventually get it to a point where
> > your
> > > > > > supply of data is slower then your writer and I wouldn't expect
> > memory to
> > > > > > accumulate.  These things are solutions but might give us more
> > clues into
> > > > > > what is happening.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used
> > > > > >
> > > > > > On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >     I'm using flight to receive streams from client and write to
> > the
> > > > > > > storage with python `pa.dataset.write_dataset` API. The whole
> > data is
> > > > > > > 1 Billion rows, over 40GB with one partition column ranges from
> > 0~63.
> > > > > > > The container runs at 8-cores CPU and 4GB ram resources.
> > > > > > >     It can be done about 160s (6M rows/s, each record batch is
> > about
> > > > > > > 32K rows) for completing transferring and writing almost
> > > > > > > synchronously, after setting 128 for io_thread_count.
> > > > > > >      Then I'd like to find out the bottleneck of the system, CPU
> > or
> > > > > > > RAM or storage?
> > > > > > >     1. I extend the ram into 32GB, then the server consumes more
> > ram,
> > > > > > > the writing progress works at the beginning, then suddenly slow
> > down
> > > > > > > and the data accumulated into ram until OOM.
> > > > > > >     2. Then I set the ram to 64GB, so that the server will not
> > killed
> > > > > > > by OOM. Same happens, also, after all the data is transferred (in
> > > > > > > memory), the server consumes all CPU shares (800%), but still
> > very
> > > > > > > slow on writing (not totally stopped, but about 100MB/minute).
> > > > > > >     2.1 I'm wondering if the io thread is stuck, or the
> > computation
> > > > > > > task is stuck. After setting both io_thread_count and cpu_count
> > to 32,
> > > > > > > I wrapped the input stream of write_dataset with a callback on
> > each
> > > > > > > record batch, I can tell that all the record batches are
> > consumed into
> > > > > > > write_dataset API.
> > > > > > >     2.2 I dumped all threads stack traces and grab a flamegraph.
> > See
> > > > > > >
> > https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb.
> > > > > > >
> > > > > > >      It seems that all threads stucks at
> > > > > ThrottledAsyncTaskSchedulerImpl.
> > > > > > >
> > > > > > > --
> > > > > > > ---------------------
> > > > > > > Best Regards,
> > > > > > > Wenbo Hu,
> > > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > ---------------------
> > > > > Best Regards,
> > > > > Wenbo Hu,
> > > > >
> > >
> > >
> > >
> > > --
> > > ---------------------
> > > Best Regards,
> > > Wenbo Hu,
> >
> >
> >
> > --
> > ---------------------
> > Best Regards,
> > Wenbo Hu,
> >



-- 
---------------------
Best Regards,
Wenbo Hu,

Reply via email to