lidavidm commented on a change in pull request #10118:
URL: https://github.com/apache/arrow/pull/10118#discussion_r633533471



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -206,6 +206,63 @@ def dataset(mockfs):
     return factory.finish()
 
 
[email protected](params=[
+    (True, True),
+    (True, False),
+    (False, True),
+    (False, False)
+], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync'])
+def dataset_reader(request):
+    '''
+    Fixture which allows dataset scanning operations to be
+    run with/without threads and with/without async
+    '''
+    use_threads, use_async = request.param
+
+    class reader:
+
+        def __init__(self):
+            self.use_threads = use_threads
+            self.use_async = use_async
+
+        def _patch_kwargs(self, kwargs):
+            if 'use_threads' in kwargs:
+                raise Exception(
+                    ('Invalid use of dataset_reader, do not specify'
+                     ' use_threads'))
+            if 'use_async' in kwargs:
+                raise Exception(
+                    'Invalid use of dataset_reader, do not specify use_async')
+            kwargs['use_threads'] = use_threads
+            kwargs['use_async'] = use_async
+
+        def to_table(self, dataset, **kwargs):
+            self._patch_kwargs(kwargs)
+            return dataset.to_table(**kwargs)
+
+        def to_batches(self, dataset, **kwargs):
+            self._patch_kwargs(kwargs)
+            return dataset.to_batches(**kwargs)
+
+        def scanner(self, dataset, **kwargs):
+            self._patch_kwargs(kwargs)
+            return dataset.scanner(**kwargs)
+
+        def head(self, dataset, num_rows, **kwargs):

Review comment:
       Note you could declare all these as `(self, dataset, *args, **kwargs)` 
and avoid having to replicate their individual signatures.

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2969,6 +2982,8 @@ def _filesystemdataset_write(
     FileSystem filesystem not None,
     Partitioning partitioning not None,
     FileWriteOptions file_options not None,
+    bint use_threads,
+    bint use_async,

Review comment:
       This doesn't seem used here? Though I guess even before use_threads was 
ignored. 

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -206,6 +206,63 @@ def dataset(mockfs):
     return factory.finish()
 
 
[email protected](params=[
+    (True, True),
+    (True, False),
+    (False, True),
+    (False, False)
+], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync'])
+def dataset_reader(request):
+    '''
+    Fixture which allows dataset scanning operations to be
+    run with/without threads and with/without async
+    '''
+    use_threads, use_async = request.param
+
+    class reader:
+
+        def __init__(self):
+            self.use_threads = use_threads
+            self.use_async = use_async
+
+        def _patch_kwargs(self, kwargs):
+            if 'use_threads' in kwargs:
+                raise Exception(
+                    ('Invalid use of dataset_reader, do not specify'
+                     ' use_threads'))
+            if 'use_async' in kwargs:
+                raise Exception(
+                    'Invalid use of dataset_reader, do not specify use_async')
+            kwargs['use_threads'] = use_threads
+            kwargs['use_async'] = use_async
+
+        def to_table(self, dataset, **kwargs):
+            self._patch_kwargs(kwargs)
+            return dataset.to_table(**kwargs)
+
+        def to_batches(self, dataset, **kwargs):
+            self._patch_kwargs(kwargs)
+            return dataset.to_batches(**kwargs)
+
+        def scanner(self, dataset, **kwargs):
+            self._patch_kwargs(kwargs)
+            return dataset.scanner(**kwargs)
+
+        def head(self, dataset, num_rows, **kwargs):

Review comment:
       Thanks for converting this file, by the way, this looks like it was 
rather tedious.

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2746,6 +2746,10 @@ cdef class Scanner(_Weakrefable):
     use_threads : bool, default True
         If enabled, then maximum parallelism will be used determined by
         the number of available CPU cores.
+    use_async : bool, default False
+        If enabled, the an async scanner will be used that should offer
+        better performance with high-latency/highly-parallel filesystems
+        (e.g. S3)

Review comment:
       It's because `_populate_builder` is cdef, i.e. gets compiled into an 
actual C function with a C parameter list, so it can't take `**kwargs` (but can 
directly take things like a shared_ptr). Though, we could pack all the 
non-C(++) arguments into a single `kwargs`.

##########
File path: r/R/dataset-scan.R
##########
@@ -42,6 +44,7 @@
 #' - `$UseThreads(threads)`: logical: should the scan use multithreading?
 #' The method's default input is `TRUE`, but you must call the method to enable
 #' multithreading because the scanner default is `FALSE`.
+#' - `$UseAsync(use_async)`: logical: should the async scanner be used?

Review comment:
       I think you have to roxygenize() to get these changes reflected in the 
actual docs? (and/or you can just `@github-actions autotune` to have CI do that 
for you)

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2969,6 +2982,8 @@ def _filesystemdataset_write(
     FileSystem filesystem not None,
     Partitioning partitioning not None,
     FileWriteOptions file_options not None,
+    bint use_threads,
+    bint use_async,

Review comment:
       I guess we'd have to do something like embed ScanOptions into 
FileSystemDatasetWriteOptions if we want to make these useful, and you already 
made a followup.

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2986,6 +2992,8 @@ def _filesystemdataset_write(
     c_options.max_partitions = max_partitions
     c_options.basename_template = tobytes(basename_template)
 
-    c_scanner = data.unwrap()
+    scanner = data._scanner(use_threads=use_threads, use_async=use_async)

Review comment:
       Note that the method was renamed/made public recently (i.e. should be 
`data.scanner`)

##########
File path: r/R/dataset-scan.R
##########
@@ -183,6 +191,10 @@ ScannerBuilder <- R6Class("ScannerBuilder", inherit = 
ArrowObject,
       dataset___ScannerBuilder__UseThreads(self, threads)
       self
     },
+    UseAsync = function(use_async = FALSE) {

Review comment:
       (Otherwise the parameter above doesn't work.)

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -393,6 +393,11 @@ cdef class Dataset(_Weakrefable):
         use_threads : bool, default True
             If enabled, then maximum parallelism will be used determined by
             the number of available CPU cores.
+        use_async : bool, default False
+            If enabled, the an async scanner will be used that should offer
+            better performance with high-latency/highly-parallel filesystems
+            (e.g. S3)
+

Review comment:
       Do we want to call it out as still experimental? (Ditto for R below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to