jorisvandenbossche commented on a change in pull request #10693:
URL: https://github.com/apache/arrow/pull/10693#discussion_r670370362



##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files

Review comment:
       "Scheduling" is a bit strange term to use in this context? 

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  
This is

Review comment:
       ```suggestion
   The previous examples have demonstrated how to read the data into a table 
using :func:`~Dataset.to_table`.  This is
   ```
   
   ?

##########
File path: python/pyarrow/_hdfs.pyx
##########
@@ -93,9 +93,10 @@ cdef class HadoopFileSystem(FileSystem):
         Instantiate HadoopFileSystem object from an URI string.
 
         The following two calls are equivalent
-        * HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
-                                    '&replication=1')
-        * HadoopFileSystem('localhost', port=8020, user='test', replication=1)
+
+        * 
``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test&replication=1')`` 
# noqa: E501
+
+        * ``HadoopFileSystem('localhost', port=8020, user='test', 
replication=1)`` # noqa: E501

Review comment:
       ```suggestion
           * ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test\
   &replication=1')``
           * ``HadoopFileSystem('localhost', port=8020, user='test', \
   replication=1)``
   ```
   
   It's a bit ugly in the source code, but this AFAIK the best way to deal with 
this issue. In a console checking the docstring this will look like a single 
line.

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  
This is
+useful if the dataset is small or there is only a small amount of data that 
needs to
+be read.  The dataset API contains additional methods to read and process 
large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  
This
+method returns an iterator of record batches.  For example, we can use this 
method to
+calculate the average of a column without loading the entire column into 
memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and 
pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for 
you
+automatically by the to_table and to_batches method of the dataset.  Any 
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size 
of the
+batches returned by the scanner.  Batches can still be smaller than the 
`batch_size`
+if the dataset consists of small files or those files themselves consist of 
small
+row groups.  For example, a parquet file with 10,000 rows per row group will 
yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller 
value.
+
+The default batch size is one million rows and this is typically a good 
default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset.  This can be useful 
when
+you want to partition your data or you need to write a large amount of data.  A
+basic dataset write is similar to writing a table except that you specify a 
directory
+instead of a filename.
+
+.. ipython:: python
+
+    base = pathlib.Path(tempfile.gettempdir())
+    dataset_root = base / "sample_dataset"
+    dataset_root.mkdir(exist_ok=True)
+
+    table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 
5})
+    ds.write_dataset(table, dataset_root, format="parquet")
+
+The above example will create a single file named part-0.parquet in our 
sample_dataset
+directory.
+
+.. warning::
+
+    If you run the example again it will replace the existing part-0.parquet 
file.
+    Appending files to an existing dataset is not currently supported by this 
API and
+    the output directory should be empty for predictable results.
+
+Writing partitioned data
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+A partitioning object can be used to specify how your output data should be 
partitioned.
+This uses the same kind of partitioning objects we used for reading datasets.  
To write
+our above data out to a partitioned directory we only need to specify how we 
want the
+dataset to be partitioned.  For example:
+
+.. ipython:: python
+
+    part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
+
+This will create two files.  Half our data will be in the dataset_root/c=1 
directory and
+the other half will be in the dataset_root/c=2 directory.
+
+Writing large amounts of data
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The above examples wrote data from a table.  If you are writing a large amount 
of data
+you may not be able to load everything into a single in-memory table.  
Fortunately, the
+write_dataset method also accepts an iterable of record batches.  This makes 
it really
+simple, for example, to repartition a large dataset without loading the entire 
dataset
+into memory:
+
+.. ipython:: python
+
+    old_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    new_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor=None
+    )
+    input_dataset = ds.dataset(dataset_root, partitioning=old_part)
+    new_root = base / "repartitioned_dataset"
+    # A scanner can act as an iterator of record batches but you could also 
receive
+    # data from the network (e.g. via flight), from your own scanning, or from 
any
+    # other method that yields record batches.  In addition, you can pass a 
dataset
+    # into write_dataset directly but this method is useful if you want to 
customize
+    # the scanner (e.g. to filter the input dataset or set a maximum batch 
size)
+    scanner = input_dataset.scanner()
+
+    ds.write_dataset(scanner, new_root, format="parquet", 
partitioning=new_part)
+
+After the above example runs our data will be in dataset_root/1 and 
dataset_root/2

Review comment:
       Since this example is not changing the partitioning structure (except 
for the flavor), it's not directly what this "repartitioning" will do apart 
from changing `/c=1/` into `/1/`. Will it also repartition individual files? 
(eg write more smaller files in case your input dataset has large files)

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  
This is
+useful if the dataset is small or there is only a small amount of data that 
needs to
+be read.  The dataset API contains additional methods to read and process 
large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  
This
+method returns an iterator of record batches.  For example, we can use this 
method to
+calculate the average of a column without loading the entire column into 
memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and 
pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for 
you
+automatically by the to_table and to_batches method of the dataset.  Any 
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size 
of the

Review comment:
       ```suggestion
   One of those parameters is the ``batch_size``.  This controls the maximum 
size of the
   ```
   
   (rst uses double backticks for code, in contrast to markdown ..)

##########
File path: python/pyarrow/_hdfs.pyx
##########
@@ -93,9 +93,10 @@ cdef class HadoopFileSystem(FileSystem):
         Instantiate HadoopFileSystem object from an URI string.
 
         The following two calls are equivalent
-        * HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
-                                    '&replication=1')
-        * HadoopFileSystem('localhost', port=8020, user='test', replication=1)
+
+        * 
``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test&replication=1')`` 
# noqa: E501
+
+        * ``HadoopFileSystem('localhost', port=8020, user='test', 
replication=1)`` # noqa: E501

Review comment:
       I added a suggestion below to fix this. I think that's preferrable since 
the ``# noqa: E501`` actually would show up as normal text which would be 
strange in the online rendered docstring

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  
This is
+useful if the dataset is small or there is only a small amount of data that 
needs to
+be read.  The dataset API contains additional methods to read and process 
large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  
This
+method returns an iterator of record batches.  For example, we can use this 
method to
+calculate the average of a column without loading the entire column into 
memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):

Review comment:
       Maybe add a `columns=["col2"]` as well? (it shows passing a column 
selection as well + it is actually what you should do here if you are only 
summing that column)

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  
This is
+useful if the dataset is small or there is only a small amount of data that 
needs to
+be read.  The dataset API contains additional methods to read and process 
large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  
This
+method returns an iterator of record batches.  For example, we can use this 
method to
+calculate the average of a column without loading the entire column into 
memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and 
pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for 
you
+automatically by the to_table and to_batches method of the dataset.  Any 
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size 
of the
+batches returned by the scanner.  Batches can still be smaller than the 
`batch_size`
+if the dataset consists of small files or those files themselves consist of 
small
+row groups.  For example, a parquet file with 10,000 rows per row group will 
yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller 
value.
+
+The default batch size is one million rows and this is typically a good 
default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset.  This can be useful 
when
+you want to partition your data or you need to write a large amount of data.  A
+basic dataset write is similar to writing a table except that you specify a 
directory
+instead of a filename.
+
+.. ipython:: python
+
+    base = pathlib.Path(tempfile.gettempdir())
+    dataset_root = base / "sample_dataset"
+    dataset_root.mkdir(exist_ok=True)
+
+    table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 
5})
+    ds.write_dataset(table, dataset_root, format="parquet")
+
+The above example will create a single file named part-0.parquet in our 
sample_dataset
+directory.
+
+.. warning::
+
+    If you run the example again it will replace the existing part-0.parquet 
file.
+    Appending files to an existing dataset is not currently supported by this 
API and
+    the output directory should be empty for predictable results.
+
+Writing partitioned data
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+A partitioning object can be used to specify how your output data should be 
partitioned.
+This uses the same kind of partitioning objects we used for reading datasets.  
To write
+our above data out to a partitioned directory we only need to specify how we 
want the
+dataset to be partitioned.  For example:
+
+.. ipython:: python
+
+    part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
+
+This will create two files.  Half our data will be in the dataset_root/c=1 
directory and
+the other half will be in the dataset_root/c=2 directory.
+
+Writing large amounts of data
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The above examples wrote data from a table.  If you are writing a large amount 
of data
+you may not be able to load everything into a single in-memory table.  
Fortunately, the
+write_dataset method also accepts an iterable of record batches.  This makes 
it really
+simple, for example, to repartition a large dataset without loading the entire 
dataset
+into memory:
+
+.. ipython:: python
+
+    old_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor="hive"
+    )
+    new_part = ds.partitioning(
+        pa.schema([("c", pa.int16())]), flavor=None
+    )
+    input_dataset = ds.dataset(dataset_root, partitioning=old_part)
+    new_root = base / "repartitioned_dataset"
+    # A scanner can act as an iterator of record batches but you could also 
receive
+    # data from the network (e.g. via flight), from your own scanning, or from 
any
+    # other method that yields record batches.  In addition, you can pass a 
dataset
+    # into write_dataset directly but this method is useful if you want to 
customize
+    # the scanner (e.g. to filter the input dataset or set a maximum batch 
size)
+    scanner = input_dataset.scanner()
+
+    ds.write_dataset(scanner, new_root, format="parquet", 
partitioning=new_part)
+
+After the above example runs our data will be in dataset_root/1 and 
dataset_root/2
+directories.  You could also use this mechnaism to change which columns you 
are partitioned

Review comment:
       ```suggestion
   directories.  You could also use this mechnaism to change which columns the 
dataset is partitioned
   ```

##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can 
be used for filtering:
     dataset.to_table().to_pandas()
     dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
 
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data.  When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
 
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
 
-..
-    Possible content:
-    - fragments (get_fragments)
-    - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.  
This is
+useful if the dataset is small or there is only a small amount of data that 
needs to
+be read.  The dataset API contains additional methods to read and process 
large amounts
+of data in a streaming fashion.
 
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.  
This
+method returns an iterator of record batches.  For example, we can use this 
method to
+calculate the average of a column without loading the entire column into 
memory:
 
-    for scan_task in dataset.scan(columns=[...], filter=...):
-        for record_batch in scan_task.execute():
-            # process the record batch
+.. ipython:: python
+
+    import pyarrow.compute as pc
+
+    col2_sum = 0
+    count = 0
+    for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+        col2_sum += pc.sum(batch.column('col2')).as_py()
+        count += batch.num_rows
+    mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and 
pyarrow
+uses an object called a :class:`Scanner` to do this.  A Scanner is created for 
you
+automatically by the to_table and to_batches method of the dataset.  Any 
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`.  This controls the maximum size 
of the
+batches returned by the scanner.  Batches can still be smaller than the 
`batch_size`
+if the dataset consists of small files or those files themselves consist of 
small
+row groups.  For example, a parquet file with 10,000 rows per row group will 
yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller 
value.
+
+The default batch size is one million rows and this is typically a good 
default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset.  This can be useful 
when

Review comment:
       ```suggestion
   The dataset API also simplifies writing data to a dataset using 
:func:`write_dataset` .  This can be useful when
   ```
   
   (this is not necessarily the best addition, but basically I think it would 
be good to add this explicit reference _somewhere_ in this paragraph; it's 
useful to have this so that the user has an easy link to click to check the 
more detailed function docstring)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to