> How many io threads are writing concurrently in a single write_dataset > call?
With the default options, and no partitioning, it will only use 1 I/O thread. This is because we do not write to a single file in parallel. If you change FileSystemDatasetWriteOptions::max_rows_per_file then you may see more than 1 I/O thread because we will start new files and write to each one in parallel. If you have partitioning then you may see more than 1 I/O thread because we will be writing to multiple files. We use, at most, 1 I/O thread per file being written. > How do they schedule? There are 3 stages. Arrival stage: Data arrives on an Acero worker thread (CPU thread). We will partition the data at this point. For each batch we schedule a Queue Batch task. Queue stage: This stage runs on the CPU thread. It finds the correct file queue for the batch and adds the batch to the file queue. It may split the batch if max_rows_per_file is set. It may trigger backpressure if there are too many rows queued on files. This stage runs serially, on the CPU thread. There is never more than one queue task running. Write stage: Each file has a number of write tasks. These run in parallel across files but serially within a file. These are I/O tasks. > The throttle code seems only one task got running? Yes, there is a throttled scheduler used for the queue stage (we only run one queue task at a time). There is a throttled scheduler per file used for the write stage. All of these are configured to only allow one task at a time to run. > What else can I do to inspect the problem? I think we need to find out why the CPU is still 800% after the transfer is done when partitioning is enabled. I would expect the CPU to drop to 0% even if it takes several seconds (or longer) for the cached data to flush to the disk. The strack trace you shared is helpful but I don't know the root cause yet. All of the threads are stuck on locking / unlocking in FutureImpl::TryAddCallback but that critical section is very small. So it seems like there is some kind of task storm. I think this is similar to a condition_variable that has thousands of waiters and is constantly doing a notify_all. I think we will need to figure out some kind of reproducible test case. I will try and find some time to run some experiments on Monday. Maybe I can reproduce this by setting the backpressure limit to a very small amount. On Fri, Jul 28, 2023 at 6:48 AM Wenbo Hu <huwenbo1...@gmail.com> wrote: > Hi, > Thanks for your detailed explanation, I made some experiment today. > > Before experiment, > 1. To limit the resources used by the server, I use docker, which uses > cgroups. But "free" does not respect the resource limit inside the > container. > 2. I measured the write speed on the host by "dd if=/dev/zero > of=./test.img bs=1G count=45 oflag=dsync", the output is "48318382080 > Byets(48 GB) Copies,132.558 s,365 MB/s" > 3. I do not limit the memory of the container ( available over 140GB), > but the CPU is still limit to 8. According to you explanation, the > write process should never slow down, since it will write to the > "memory cached" which is accounted as used memory until it is flushed > to the storage by the OS. > > Additionally, the file format is "feather", writing with/without > partitioning leads to different result. > > ## write without partitioning > Everything works fine, no matter what value I set to io_thread_count > or cpu_count. > the performance of same configuration varies a lot, the initial peak > speed may result in different memory usage, but the max average flow > speed not varies a lot. > Some records are below, > 1. With cpu_count and io_thread_count to 128 CPU is less than 100% and > RES is less than 1G (after initial peak speed), average flow speed is > 6.83M rows/s (45bytes per row). > 2. With cpu_count to 1, io_thread_count to 16, CPU is a little over > 100%, RES is about 3g at max, average flow speed is 4.64M rows/s, but > it takes additional 6s to complete writing after transferring done. > 3. With cpu_count to 1, io_thread_count to 128, performs almost as > same as record 2. > > ## write with partitioning > Writing with partitioning fails most of the time, setting lower cpu > count not helping. > 1. With cpu_count and io_thread_count to 128, CPU is 800% from > begining, RES is growing slowing to 40.7G to the end of transferring, > average flow speed is 3.24M rows/s. After that, CPU is still 800%, but > RES going down very slow at 200MB/minute. Write speed not recovered. > 2.With cpu_count to 1, io_thread_count to 16, CPU goes up to 800% > slower than record1, RES is growing to 44.1G to the end of > transferring, average flow speed is 6.75M rows/s. Same happens as > record 1 after transferring done. > 3. With cpu_count to 1, io_thread_count to 128, CPU goes to 800% much > slower than record2 (due to slower flow speed?), RES is growing to 30G > to the end of transferring, average flow speed is 1.62M rows/s. Same > happens as record 1 after transferring done. > > Then I'm trying to limit the flow speed before writing queue got full > with custom flow control (sleep on reader iteration based on available > memory) But the sleep time curve is not accurate, sometimes flow slows > down, but the queue got full anyway. > Then the interesting thing happens, before the queue is full (memory > quickly grows up), the CPU is not fully used. When memory grows up > quickly, CPU goes up as well, to 800%. > 1. Sometimes, the writing queue can overcome, CPU will goes down after > the memory accumulated. The writing speed recoved and memory back to > normal. > 2. Sometimes, it can't. IOBPS goes down sharply, and CPU never goes > down after that. > > How many io threads are writing concurrently in a single write_dataset > call? How do they schedule? The throttle code seems only one task got > running? > What else can I do to inspect the problem? > > Weston Pace <weston.p...@gmail.com> 于2023年7月28日周五 00:33写道: > > > > You'll need to measure more but generally the bottleneck for writes is > > usually going to be the disk itself. Unfortunately, standard OS buffered > > I/O has some pretty negative behaviors in this case. First I'll describe > > what I generally see happen (the last time I profiled this was a while > back > > but I don't think anything substantial has changed). > > > > * Initially, writes are very fast. The OS `write` call is simply a > memcpy > > from user space into kernel space. The actual flushing the data from > > kernel space to disk happens asynchronously unless you are using direct > I/O > > (which is not currently supported). > > * Over time, assuming the data arrival rate is faster than the data write > > rate, the data will accumulate in kernel memory. For example, if you > > continuously run the Linux `free` program you will see the `free` column > > decrease and the `buff/cache` column decreases. The `available` column > > should generally stay consistent (kernel memory that is in use but can > > technically be flushed to disk if needed is still considered "available" > > but not "free") > > * Once the `free` column reaches 0 then a few things happen. First, the > > calls to `write` are no longer fast (the write cannot complete until some > > existing data has been flushed to disk). Second, other processes that > > aren't in use might start swapping their data to disk (you will generally > > see the entire system, if it is interactive, grind to a halt). Third, if > > you have an OOM-killer active, it may start to kill running applications. > > It isn't supposed to do so but there are sometimes bugs[1]. > > * Assuming the OOM killer does not kill your application then, because > the > > `write` calls slow down, the number of rows in the dataset writer's queue > > will start to fill up (this is captured by the variable > > `rows_in_flight_throttle`. > > * Once the rows_in_flight_throttle is full it will pause and the dataset > > writer will return an unfinished future (asking the caller to back off). > > * Once this happens the caller will apply backpressure (if being used in > > Acero) which will pause the reader. This backpressure is not instant and > > generally each running CPU thread still delivers whatever batch it is > > working on. These batches essentially get added to an asynchronous > > condition variable waiting on the dataset writer queue to free up. This > is > > the spot where the ThrottledAsyncTaskScheduler is used. > > > > The stack dump that you reported is not exactly what I would have > expected > > but it might still match the above description. At this point I am just > > sort of guessing. When the dataset writer frees up enough to receive > > another batch it will do what is effectively a "notify all" and all of > the > > compute threads are waking up and trying to add their batch to the > dataset > > writer. One of these gets through, gets added to the dataset writer, and > > then backpressure is applied again and all the requests pile up once > > again. It's possible that a "resume sending" signal is sent and this > might > > actually lead to RAM filling up more. We could probably mitigate this by > > adding a low water mark to the dataset writer's backpressure throttle (so > > it doesn't send the resume signal as soon as the queue has room but waits > > until the queue is half full). > > > > I'd recommend watching the output of `free` (or monitoring memory in some > > other way) and verifying the above. I'd also suggest lowering the number > > of CPU threads and see how that affects performance. If you lower the > CPU > > threads enough then you should eventually get it to a point where your > > supply of data is slower then your writer and I wouldn't expect memory to > > accumulate. These things are solutions but might give us more clues into > > what is happening. > > > > [1] > > > https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used > > > > On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com> wrote: > > > > > Hi, > > > I'm using flight to receive streams from client and write to the > > > storage with python `pa.dataset.write_dataset` API. The whole data is > > > 1 Billion rows, over 40GB with one partition column ranges from 0~63. > > > The container runs at 8-cores CPU and 4GB ram resources. > > > It can be done about 160s (6M rows/s, each record batch is about > > > 32K rows) for completing transferring and writing almost > > > synchronously, after setting 128 for io_thread_count. > > > Then I'd like to find out the bottleneck of the system, CPU or > > > RAM or storage? > > > 1. I extend the ram into 32GB, then the server consumes more ram, > > > the writing progress works at the beginning, then suddenly slow down > > > and the data accumulated into ram until OOM. > > > 2. Then I set the ram to 64GB, so that the server will not killed > > > by OOM. Same happens, also, after all the data is transferred (in > > > memory), the server consumes all CPU shares (800%), but still very > > > slow on writing (not totally stopped, but about 100MB/minute). > > > 2.1 I'm wondering if the io thread is stuck, or the computation > > > task is stuck. After setting both io_thread_count and cpu_count to 32, > > > I wrapped the input stream of write_dataset with a callback on each > > > record batch, I can tell that all the record batches are consumed into > > > write_dataset API. > > > 2.2 I dumped all threads stack traces and grab a flamegraph. See > > > https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb. > > > > > > It seems that all threads stucks at > ThrottledAsyncTaskSchedulerImpl. > > > > > > -- > > > --------------------- > > > Best Regards, > > > Wenbo Hu, > > > > > > > -- > --------------------- > Best Regards, > Wenbo Hu, >