westonpace commented on a change in pull request #11286:
URL: https://github.com/apache/arrow/pull/11286#discussion_r728906243



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3389,6 +3392,73 @@ def test_write_dataset_with_scanner(tempdir):
                     ) == table.drop(["a"]).to_pydict()
 
 
+def test_write_dataset_with_backpressure(tempdir):
+    consumer_gate = threading.Semaphore(0)
+
+    class GatingFs(ProxyHandler):
+        def open_output_stream(self, path, metadata):
+            consumer_gate.acquire(1)
+            return self._fs.open_output_stream(path, metadata=metadata)
+    gating_fs = fs.PyFileSystem(GatingFs(fs.LocalFileSystem()))
+
+    schema = pa.schema([pa.field('data', pa.int32())])
+    # By default, the dataset writer will queue up 64Mi rows so
+    # with batches of 1M it should only fit ~67 batches
+    batch = pa.record_batch([pa.array(list(range(1_000_000)))], schema=schema)
+    batches_read = 0
+    min_backpressure = 67
+    end = 200
+
+    def counting_generator():
+        nonlocal batches_read
+        while batches_read < end:
+            time.sleep(0.01)
+            batches_read += 1
+            yield batch
+
+    scanner = ds.Scanner.from_batches(
+        counting_generator(), schema=schema, use_threads=True,
+        use_async=True)
+
+    write_thread = threading.Thread(
+        target=lambda: ds.write_dataset(
+            scanner, str(tempdir), format='parquet', filesystem=gating_fs))
+    write_thread.start()
+
+    try:
+        start = time.time()
+
+        def duration():
+            return time.time() - start
+
+        # This test is timing dependent.  There is no signal from the C++
+        # when backpressure has been hit.  We don't know exactly when
+        # backpressure will be hit because it may take some time for the
+        # signal to get from the sink to the scanner.
+        #
+        # The test may emit false positives on slow systems.  It could
+        # theoretically emit a false negative if the scanner managed to read
+        # and emit all 200 batches before the backpressure signal had a chance
+        # to propagate but the 0.01s delay in the generator should make that
+        # scenario unlikely.

Review comment:
       There are already tests in C++ for the scanner backpressure and the 
dataset writer backpressure.  You are correct that we have more control.  I was 
able to use the thread pool's "wait for idle" method to know when backpressure 
had been hit.
   
   I wanted a python test to pull everything together and make sure it is 
actually being utilized correctly (I think it is easy sometimes for python to 
get missed due to a configuration parameter or something else).  I'd be ok with 
removing this test but I don't think we need to add anything to C++.  @bkietz 
thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to