aocsa commented on a change in pull request #11286:
URL: https://github.com/apache/arrow/pull/11286#discussion_r728531833
##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -371,6 +378,7 @@ class DatasetWritingSinkNodeConsumer : public
compute::SinkNodeConsumer {
std::shared_ptr<Schema> schema;
std::unique_ptr<internal::DatasetWriter> dataset_writer;
FileSystemDatasetWriteOptions write_options;
+ std::shared_ptr<util::AsyncToggle> backpressure_toggle_;
util::SerializedAsyncTaskGroup task_group;
Review comment:
maybe this is a good time to uniformize code styling as this is a class
all the data members should follow `name_` style.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3389,6 +3392,73 @@ def test_write_dataset_with_scanner(tempdir):
) == table.drop(["a"]).to_pydict()
+def test_write_dataset_with_backpressure(tempdir):
+ consumer_gate = threading.Semaphore(0)
+
+ class GatingFs(ProxyHandler):
+ def open_output_stream(self, path, metadata):
+ consumer_gate.acquire(1)
+ return self._fs.open_output_stream(path, metadata=metadata)
+ gating_fs = fs.PyFileSystem(GatingFs(fs.LocalFileSystem()))
+
+ schema = pa.schema([pa.field('data', pa.int32())])
+ # By default, the dataset writer will queue up 64Mi rows so
+ # with batches of 1M it should only fit ~67 batches
+ batch = pa.record_batch([pa.array(list(range(1_000_000)))], schema=schema)
+ batches_read = 0
+ min_backpressure = 67
+ end = 200
+
+ def counting_generator():
+ nonlocal batches_read
+ while batches_read < end:
+ time.sleep(0.01)
+ batches_read += 1
+ yield batch
+
+ scanner = ds.Scanner.from_batches(
+ counting_generator(), schema=schema, use_threads=True,
+ use_async=True)
+
+ write_thread = threading.Thread(
+ target=lambda: ds.write_dataset(
+ scanner, str(tempdir), format='parquet', filesystem=gating_fs))
+ write_thread.start()
+
+ try:
+ start = time.time()
+
+ def duration():
+ return time.time() - start
+
+ # This test is timing dependent. There is no signal from the C++
+ # when backpressure has been hit. We don't know exactly when
+ # backpressure will be hit because it may take some time for the
+ # signal to get from the sink to the scanner.
+ #
+ # The test may emit false positives on slow systems. It could
+ # theoretically emit a false negative if the scanner managed to read
+ # and emit all 200 batches before the backpressure signal had a chance
+ # to propagate but the 0.01s delay in the generator should make that
+ # scenario unlikely.
Review comment:
Just a comment, If python test is slow why don't write this test in C++.
I think there is more control in the C++, and even we a test with large
workload is achivable, or run test cases when something so it doesn't always
run,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]