Thanks. This is a very helpful reproduction. I was able to reproduce and diagnose the problem. There is a bug on our end and I have filed [1] to address it. There are a lot more details in the ticket if you are interested. In the meantime, the only workaround I can think of is probably to slow down the data source enough that the queue doesn't fill up.
[1] https://github.com/apache/arrow/issues/36951 On Sun, Jul 30, 2023 at 8:15 PM Wenbo Hu <huwenbo1...@gmail.com> wrote: > Hi, > The following code should reproduce the problem. > > ``` > > import pyarrow as pa > import pyarrow.fs, pyarrow.dataset > > schema = pa.schema([("id", pa.utf8()), ("bucket", pa.uint8())]) > > > def rb_generator(buckets, rows, batches): > batch = pa.record_batch( > [[f"id-{i}" for i in range(rows)], [i % buckets for i in > range(rows)]], > schema=schema, > ) > > for i in range(batches): > yield batch > print(f"yielding {i}") > > > if __name__ == "__main__": > pa.set_io_thread_count(1) > reader = pa.RecordBatchReader.from_batches(schema, > rb_generator(64, 32768, 1000000)) > local_fs = pa.fs.LocalFileSystem() > > pa.dataset.write_dataset( > reader, > "/tmp/data_f", > format="feather", > partitioning=["bucket"], > filesystem=local_fs, > existing_data_behavior="overwrite_or_ignore" > ) > > ``` > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月30日周日 15:30写道: > > > > Hi, > > Then another question is that "why back pressure not working on the > > input stream of write_dataset api?". If back pressure happens on the > > end of the acero stream for some reason (on queue stage or write > > stage), then the input stream should backpressure as well? It should > > keep the memory to a stable level so that the input speed would match > > the output speed. > > > > Then, I made some other experiments with various io_thread_count > > values and bucket_size (partitions/opening files). > > > > 1. for bucket_size to 64 and io_thread_count/cpu_count to 1, the cpu > > is up to 100% after transferring done, but there is a very interesting > > output. > > * flow transferring from client to server at the very first few > > batches are slow, less than 0.01M rows/s, then it speeds up to over 4M > > rows/s very quickly. > > * I think at the very beginning stage, the backpressure works > > fine, until sometime, like the previous experiments, the backpressure > > makes the stream into a blackhole, then the io thread input stream > > stuck at some slow speed. (It's still writing, but takes a lot of time > > on waiting upstream CPU partitioning threads to push batches?) > > * from iotop, the total disk write is dropping down very slowly > > after transferring done. But it may change over different experiments > > with the same configuration. I think the upstream backpressure is not > > working as expected, which makes the downstream writing keep querying. > > I think it may reveal something, maybe at some point, the slow writing > > enlarge the backpressure on the whole process (the write speed is > > dropping slowly), but the slow reason of writing is the upstream is > > already slow down. > > > > 2. Then I set cpu_count to 64 > > 2.1 io_thread_count to 4. > > 2.1.1 , for bucket_size to 2/4/6, The system works fine. CPU is less > > than 100%. Backpressure works fine, memory will not accumulated much > > before the flow speed becomes stable. > > 2.1.2 when bucket_size to 8, the bug comes back. After transferring > > done, CPU is about 350% (only io thread is running?) and write from > > iotop is about 40M/s, then dropping down very slowly. > > > > 2.2. then I set both io_thread to 6, > > 2.2.1 for bucket_size to 6/8/16, The system works fine. CPU is about > > 100%. like 2.1.1 > > 2.2.2 for bucket_size to 32, the bug comes back. CPU halts at 550%. > > > > 2.3 io_thread_count to 8 > > 2.3.1 for bucket_size to 16, it fails somehow. After transferring > > done, the memory accumulated over 3G, but write speed is about 60M/s, > > which makes it possible to wait. CPU is about 600~700%. After the > > accumulated memory writing, CPU becomes normal. > > 2.3.2 for bucket_size to 32, it still fails. CPU halts at 800%. > > transferring is very fast (over 14M rows/s). the backpressure is not > > working at all. > > > > > > Weston Pace <weston.p...@gmail.com> 于2023年7月29日周六 01:08写道: > > > > > > > How many io threads are writing concurrently in a single > write_dataset > > > > call? > > > > > > With the default options, and no partitioning, it will only use 1 I/O > > > thread. This is because we do not write to a single file in parallel. > > > If you change FileSystemDatasetWriteOptions::max_rows_per_file then > you may > > > see more than 1 I/O thread because we will start new files and write to > > > each one in parallel. > > > If you have partitioning then you may see more than 1 I/O thread > because we > > > will be writing to multiple files. > > > > > > We use, at most, 1 I/O thread per file being written. > > > > > > > How do they schedule? > > > > > > There are 3 stages. > > > > > > Arrival stage: Data arrives on an Acero worker thread (CPU thread). We > > > will partition the data at this point. For each batch we schedule a > Queue > > > Batch task. > > > Queue stage: This stage runs on the CPU thread. It finds the correct > file > > > queue for the batch and adds the batch to the file queue. It may > split the > > > batch if max_rows_per_file is set. It may trigger backpressure if > there > > > are too many rows queued on files. This stage runs serially, on the > CPU > > > thread. There is never more than one queue task running. > > > Write stage: Each file has a number of write tasks. These run in > parallel > > > across files but serially within a file. These are I/O tasks. > > > > > > > The throttle code seems only one task got running? > > > > > > Yes, there is a throttled scheduler used for the queue stage (we only > run > > > one queue task at a time). There is a throttled scheduler per file > used > > > for the write stage. All of these are configured to only allow one > task at > > > a time to run. > > > > > > > What else can I do to inspect the problem? > > > > > > I think we need to find out why the CPU is still 800% after the > transfer is > > > done when partitioning is enabled. I would expect the CPU to drop to > 0% > > > even if it takes several seconds (or longer) for the cached data to > flush > > > to the disk. The strack trace you shared is helpful but I don't know > the > > > root cause yet. All of the threads are stuck on locking / unlocking in > > > FutureImpl::TryAddCallback but that critical section is very small. > So it > > > seems like there is some kind of task storm. I think this is similar > to a > > > condition_variable that has thousands of waiters and is constantly > doing a > > > notify_all. > > > > > > I think we will need to figure out some kind of reproducible test > case. I > > > will try and find some time to run some experiments on Monday. Maybe > I can > > > reproduce this by setting the backpressure limit to a very small > amount. > > > > > > On Fri, Jul 28, 2023 at 6:48 AM Wenbo Hu <huwenbo1...@gmail.com> > wrote: > > > > > > > Hi, > > > > Thanks for your detailed explanation, I made some experiment > today. > > > > > > > > Before experiment, > > > > 1. To limit the resources used by the server, I use docker, which > uses > > > > cgroups. But "free" does not respect the resource limit inside the > > > > container. > > > > 2. I measured the write speed on the host by "dd if=/dev/zero > > > > of=./test.img bs=1G count=45 oflag=dsync", the output is "48318382080 > > > > Byets(48 GB) Copies,132.558 s,365 MB/s" > > > > 3. I do not limit the memory of the container ( available over > 140GB), > > > > but the CPU is still limit to 8. According to you explanation, the > > > > write process should never slow down, since it will write to the > > > > "memory cached" which is accounted as used memory until it is flushed > > > > to the storage by the OS. > > > > > > > > Additionally, the file format is "feather", writing with/without > > > > partitioning leads to different result. > > > > > > > > ## write without partitioning > > > > Everything works fine, no matter what value I set to io_thread_count > > > > or cpu_count. > > > > the performance of same configuration varies a lot, the initial peak > > > > speed may result in different memory usage, but the max average flow > > > > speed not varies a lot. > > > > Some records are below, > > > > 1. With cpu_count and io_thread_count to 128 CPU is less than 100% > and > > > > RES is less than 1G (after initial peak speed), average flow speed is > > > > 6.83M rows/s (45bytes per row). > > > > 2. With cpu_count to 1, io_thread_count to 16, CPU is a little over > > > > 100%, RES is about 3g at max, average flow speed is 4.64M rows/s, but > > > > it takes additional 6s to complete writing after transferring done. > > > > 3. With cpu_count to 1, io_thread_count to 128, performs almost as > > > > same as record 2. > > > > > > > > ## write with partitioning > > > > Writing with partitioning fails most of the time, setting lower cpu > > > > count not helping. > > > > 1. With cpu_count and io_thread_count to 128, CPU is 800% from > > > > begining, RES is growing slowing to 40.7G to the end of transferring, > > > > average flow speed is 3.24M rows/s. After that, CPU is still 800%, > but > > > > RES going down very slow at 200MB/minute. Write speed not recovered. > > > > 2.With cpu_count to 1, io_thread_count to 16, CPU goes up to 800% > > > > slower than record1, RES is growing to 44.1G to the end of > > > > transferring, average flow speed is 6.75M rows/s. Same happens as > > > > record 1 after transferring done. > > > > 3. With cpu_count to 1, io_thread_count to 128, CPU goes to 800% much > > > > slower than record2 (due to slower flow speed?), RES is growing to > 30G > > > > to the end of transferring, average flow speed is 1.62M rows/s. Same > > > > happens as record 1 after transferring done. > > > > > > > > Then I'm trying to limit the flow speed before writing queue got full > > > > with custom flow control (sleep on reader iteration based on > available > > > > memory) But the sleep time curve is not accurate, sometimes flow > slows > > > > down, but the queue got full anyway. > > > > Then the interesting thing happens, before the queue is full (memory > > > > quickly grows up), the CPU is not fully used. When memory grows up > > > > quickly, CPU goes up as well, to 800%. > > > > 1. Sometimes, the writing queue can overcome, CPU will goes down > after > > > > the memory accumulated. The writing speed recoved and memory back to > > > > normal. > > > > 2. Sometimes, it can't. IOBPS goes down sharply, and CPU never goes > > > > down after that. > > > > > > > > How many io threads are writing concurrently in a single > write_dataset > > > > call? How do they schedule? The throttle code seems only one task got > > > > running? > > > > What else can I do to inspect the problem? > > > > > > > > Weston Pace <weston.p...@gmail.com> 于2023年7月28日周五 00:33写道: > > > > > > > > > > You'll need to measure more but generally the bottleneck for > writes is > > > > > usually going to be the disk itself. Unfortunately, standard OS > buffered > > > > > I/O has some pretty negative behaviors in this case. First I'll > describe > > > > > what I generally see happen (the last time I profiled this was a > while > > > > back > > > > > but I don't think anything substantial has changed). > > > > > > > > > > * Initially, writes are very fast. The OS `write` call is simply a > > > > memcpy > > > > > from user space into kernel space. The actual flushing the data > from > > > > > kernel space to disk happens asynchronously unless you are using > direct > > > > I/O > > > > > (which is not currently supported). > > > > > * Over time, assuming the data arrival rate is faster than the > data write > > > > > rate, the data will accumulate in kernel memory. For example, if > you > > > > > continuously run the Linux `free` program you will see the `free` > column > > > > > decrease and the `buff/cache` column decreases. The `available` > column > > > > > should generally stay consistent (kernel memory that is in use but > can > > > > > technically be flushed to disk if needed is still considered > "available" > > > > > but not "free") > > > > > * Once the `free` column reaches 0 then a few things happen. > First, the > > > > > calls to `write` are no longer fast (the write cannot complete > until some > > > > > existing data has been flushed to disk). Second, other processes > that > > > > > aren't in use might start swapping their data to disk (you will > generally > > > > > see the entire system, if it is interactive, grind to a halt). > Third, if > > > > > you have an OOM-killer active, it may start to kill running > applications. > > > > > It isn't supposed to do so but there are sometimes bugs[1]. > > > > > * Assuming the OOM killer does not kill your application then, > because > > > > the > > > > > `write` calls slow down, the number of rows in the dataset > writer's queue > > > > > will start to fill up (this is captured by the variable > > > > > `rows_in_flight_throttle`. > > > > > * Once the rows_in_flight_throttle is full it will pause and the > dataset > > > > > writer will return an unfinished future (asking the caller to back > off). > > > > > * Once this happens the caller will apply backpressure (if being > used in > > > > > Acero) which will pause the reader. This backpressure is not > instant and > > > > > generally each running CPU thread still delivers whatever batch it > is > > > > > working on. These batches essentially get added to an asynchronous > > > > > condition variable waiting on the dataset writer queue to free > up. This > > > > is > > > > > the spot where the ThrottledAsyncTaskScheduler is used. > > > > > > > > > > The stack dump that you reported is not exactly what I would have > > > > expected > > > > > but it might still match the above description. At this point I > am just > > > > > sort of guessing. When the dataset writer frees up enough to > receive > > > > > another batch it will do what is effectively a "notify all" and > all of > > > > the > > > > > compute threads are waking up and trying to add their batch to the > > > > dataset > > > > > writer. One of these gets through, gets added to the dataset > writer, and > > > > > then backpressure is applied again and all the requests pile up > once > > > > > again. It's possible that a "resume sending" signal is sent and > this > > > > might > > > > > actually lead to RAM filling up more. We could probably mitigate > this by > > > > > adding a low water mark to the dataset writer's backpressure > throttle (so > > > > > it doesn't send the resume signal as soon as the queue has room > but waits > > > > > until the queue is half full). > > > > > > > > > > I'd recommend watching the output of `free` (or monitoring memory > in some > > > > > other way) and verifying the above. I'd also suggest lowering the > number > > > > > of CPU threads and see how that affects performance. If you lower > the > > > > CPU > > > > > threads enough then you should eventually get it to a point where > your > > > > > supply of data is slower then your writer and I wouldn't expect > memory to > > > > > accumulate. These things are solutions but might give us more > clues into > > > > > what is happening. > > > > > > > > > > [1] > > > > > > > > > > https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used > > > > > > > > > > On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com> > wrote: > > > > > > > > > > > Hi, > > > > > > I'm using flight to receive streams from client and write to > the > > > > > > storage with python `pa.dataset.write_dataset` API. The whole > data is > > > > > > 1 Billion rows, over 40GB with one partition column ranges from > 0~63. > > > > > > The container runs at 8-cores CPU and 4GB ram resources. > > > > > > It can be done about 160s (6M rows/s, each record batch is > about > > > > > > 32K rows) for completing transferring and writing almost > > > > > > synchronously, after setting 128 for io_thread_count. > > > > > > Then I'd like to find out the bottleneck of the system, CPU > or > > > > > > RAM or storage? > > > > > > 1. I extend the ram into 32GB, then the server consumes more > ram, > > > > > > the writing progress works at the beginning, then suddenly slow > down > > > > > > and the data accumulated into ram until OOM. > > > > > > 2. Then I set the ram to 64GB, so that the server will not > killed > > > > > > by OOM. Same happens, also, after all the data is transferred (in > > > > > > memory), the server consumes all CPU shares (800%), but still > very > > > > > > slow on writing (not totally stopped, but about 100MB/minute). > > > > > > 2.1 I'm wondering if the io thread is stuck, or the > computation > > > > > > task is stuck. After setting both io_thread_count and cpu_count > to 32, > > > > > > I wrapped the input stream of write_dataset with a callback on > each > > > > > > record batch, I can tell that all the record batches are > consumed into > > > > > > write_dataset API. > > > > > > 2.2 I dumped all threads stack traces and grab a flamegraph. > See > > > > > > > https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb. > > > > > > > > > > > > It seems that all threads stucks at > > > > ThrottledAsyncTaskSchedulerImpl. > > > > > > > > > > > > -- > > > > > > --------------------- > > > > > > Best Regards, > > > > > > Wenbo Hu, > > > > > > > > > > > > > > > > > > > > > > -- > > > > --------------------- > > > > Best Regards, > > > > Wenbo Hu, > > > > > > > > > > > > -- > > --------------------- > > Best Regards, > > Wenbo Hu, > > > > -- > --------------------- > Best Regards, > Wenbo Hu, >