lidavidm commented on a change in pull request #10118:
URL: https://github.com/apache/arrow/pull/10118#discussion_r633533471
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -206,6 +206,63 @@ def dataset(mockfs):
return factory.finish()
[email protected](params=[
+ (True, True),
+ (True, False),
+ (False, True),
+ (False, False)
+], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync'])
+def dataset_reader(request):
+ '''
+ Fixture which allows dataset scanning operations to be
+ run with/without threads and with/without async
+ '''
+ use_threads, use_async = request.param
+
+ class reader:
+
+ def __init__(self):
+ self.use_threads = use_threads
+ self.use_async = use_async
+
+ def _patch_kwargs(self, kwargs):
+ if 'use_threads' in kwargs:
+ raise Exception(
+ ('Invalid use of dataset_reader, do not specify'
+ ' use_threads'))
+ if 'use_async' in kwargs:
+ raise Exception(
+ 'Invalid use of dataset_reader, do not specify use_async')
+ kwargs['use_threads'] = use_threads
+ kwargs['use_async'] = use_async
+
+ def to_table(self, dataset, **kwargs):
+ self._patch_kwargs(kwargs)
+ return dataset.to_table(**kwargs)
+
+ def to_batches(self, dataset, **kwargs):
+ self._patch_kwargs(kwargs)
+ return dataset.to_batches(**kwargs)
+
+ def scanner(self, dataset, **kwargs):
+ self._patch_kwargs(kwargs)
+ return dataset.scanner(**kwargs)
+
+ def head(self, dataset, num_rows, **kwargs):
Review comment:
Note you could declare all these as `(self, dataset, *args, **kwargs)`
and avoid having to replicate their individual signatures.
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2969,6 +2982,8 @@ def _filesystemdataset_write(
FileSystem filesystem not None,
Partitioning partitioning not None,
FileWriteOptions file_options not None,
+ bint use_threads,
+ bint use_async,
Review comment:
This doesn't seem used here? Though I guess even before use_threads was
ignored.
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -206,6 +206,63 @@ def dataset(mockfs):
return factory.finish()
[email protected](params=[
+ (True, True),
+ (True, False),
+ (False, True),
+ (False, False)
+], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync'])
+def dataset_reader(request):
+ '''
+ Fixture which allows dataset scanning operations to be
+ run with/without threads and with/without async
+ '''
+ use_threads, use_async = request.param
+
+ class reader:
+
+ def __init__(self):
+ self.use_threads = use_threads
+ self.use_async = use_async
+
+ def _patch_kwargs(self, kwargs):
+ if 'use_threads' in kwargs:
+ raise Exception(
+ ('Invalid use of dataset_reader, do not specify'
+ ' use_threads'))
+ if 'use_async' in kwargs:
+ raise Exception(
+ 'Invalid use of dataset_reader, do not specify use_async')
+ kwargs['use_threads'] = use_threads
+ kwargs['use_async'] = use_async
+
+ def to_table(self, dataset, **kwargs):
+ self._patch_kwargs(kwargs)
+ return dataset.to_table(**kwargs)
+
+ def to_batches(self, dataset, **kwargs):
+ self._patch_kwargs(kwargs)
+ return dataset.to_batches(**kwargs)
+
+ def scanner(self, dataset, **kwargs):
+ self._patch_kwargs(kwargs)
+ return dataset.scanner(**kwargs)
+
+ def head(self, dataset, num_rows, **kwargs):
Review comment:
Thanks for converting this file, by the way, this looks like it was
rather tedious.
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2746,6 +2746,10 @@ cdef class Scanner(_Weakrefable):
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, the an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
Review comment:
It's because `_populate_builder` is cdef, i.e. gets compiled into an
actual C function with a C parameter list, so it can't take `**kwargs` (but can
directly take things like a shared_ptr). Though, we could pack all the
non-C(++) arguments into a single `kwargs`.
##########
File path: r/R/dataset-scan.R
##########
@@ -42,6 +44,7 @@
#' - `$UseThreads(threads)`: logical: should the scan use multithreading?
#' The method's default input is `TRUE`, but you must call the method to enable
#' multithreading because the scanner default is `FALSE`.
+#' - `$UseAsync(use_async)`: logical: should the async scanner be used?
Review comment:
I think you have to roxygenize() to get these changes reflected in the
actual docs? (and/or you can just `@github-actions autotune` to have CI do that
for you)
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2969,6 +2982,8 @@ def _filesystemdataset_write(
FileSystem filesystem not None,
Partitioning partitioning not None,
FileWriteOptions file_options not None,
+ bint use_threads,
+ bint use_async,
Review comment:
I guess we'd have to do something like embed ScanOptions into
FileSystemDatasetWriteOptions if we want to make these useful, and you already
made a followup.
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2986,6 +2992,8 @@ def _filesystemdataset_write(
c_options.max_partitions = max_partitions
c_options.basename_template = tobytes(basename_template)
- c_scanner = data.unwrap()
+ scanner = data._scanner(use_threads=use_threads, use_async=use_async)
Review comment:
Note that the method was renamed/made public recently (i.e. should be
`data.scanner`)
##########
File path: r/R/dataset-scan.R
##########
@@ -183,6 +191,10 @@ ScannerBuilder <- R6Class("ScannerBuilder", inherit =
ArrowObject,
dataset___ScannerBuilder__UseThreads(self, threads)
self
},
+ UseAsync = function(use_async = FALSE) {
Review comment:
(Otherwise the parameter above doesn't work.)
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -393,6 +393,11 @@ cdef class Dataset(_Weakrefable):
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
+ use_async : bool, default False
+ If enabled, the an async scanner will be used that should offer
+ better performance with high-latency/highly-parallel filesystems
+ (e.g. S3)
+
Review comment:
Do we want to call it out as still experimental? (Ditto for R below)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]