jorisvandenbossche commented on a change in pull request #10693:
URL: https://github.com/apache/arrow/pull/10693#discussion_r670370362
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
Review comment:
"Scheduling" is a bit strange term to use in this context?
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data. When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
-..
- Possible content:
- - fragments (get_fragments)
- - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.
This is
Review comment:
```suggestion
The previous examples have demonstrated how to read the data into a table
using :func:`~Dataset.to_table`. This is
```
?
##########
File path: python/pyarrow/_hdfs.pyx
##########
@@ -93,9 +93,10 @@ cdef class HadoopFileSystem(FileSystem):
Instantiate HadoopFileSystem object from an URI string.
The following two calls are equivalent
- * HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
- '&replication=1')
- * HadoopFileSystem('localhost', port=8020, user='test', replication=1)
+
+ *
``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test&replication=1')``
# noqa: E501
+
+ * ``HadoopFileSystem('localhost', port=8020, user='test',
replication=1)`` # noqa: E501
Review comment:
```suggestion
* ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test\
&replication=1')``
* ``HadoopFileSystem('localhost', port=8020, user='test', \
replication=1)``
```
It's a bit ugly in the source code, but this AFAIK the best way to deal with
this issue. In a console checking the docstring this will look like a single
line.
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data. When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
-..
- Possible content:
- - fragments (get_fragments)
- - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.
This is
+useful if the dataset is small or there is only a small amount of data that
needs to
+be read. The dataset API contains additional methods to read and process
large amounts
+of data in a streaming fashion.
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.
This
+method returns an iterator of record batches. For example, we can use this
method to
+calculate the average of a column without loading the entire column into
memory:
- for scan_task in dataset.scan(columns=[...], filter=...):
- for record_batch in scan_task.execute():
- # process the record batch
+.. ipython:: python
+
+ import pyarrow.compute as pc
+
+ col2_sum = 0
+ count = 0
+ for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+ col2_sum += pc.sum(batch.column('col2')).as_py()
+ count += batch.num_rows
+ mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and
pyarrow
+uses an object called a :class:`Scanner` to do this. A Scanner is created for
you
+automatically by the to_table and to_batches method of the dataset. Any
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`. This controls the maximum size
of the
+batches returned by the scanner. Batches can still be smaller than the
`batch_size`
+if the dataset consists of small files or those files themselves consist of
small
+row groups. For example, a parquet file with 10,000 rows per row group will
yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller
value.
+
+The default batch size is one million rows and this is typically a good
default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset. This can be useful
when
+you want to partition your data or you need to write a large amount of data. A
+basic dataset write is similar to writing a table except that you specify a
directory
+instead of a filename.
+
+.. ipython:: python
+
+ base = pathlib.Path(tempfile.gettempdir())
+ dataset_root = base / "sample_dataset"
+ dataset_root.mkdir(exist_ok=True)
+
+ table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] *
5})
+ ds.write_dataset(table, dataset_root, format="parquet")
+
+The above example will create a single file named part-0.parquet in our
sample_dataset
+directory.
+
+.. warning::
+
+ If you run the example again it will replace the existing part-0.parquet
file.
+ Appending files to an existing dataset is not currently supported by this
API and
+ the output directory should be empty for predictable results.
+
+Writing partitioned data
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+A partitioning object can be used to specify how your output data should be
partitioned.
+This uses the same kind of partitioning objects we used for reading datasets.
To write
+our above data out to a partitioned directory we only need to specify how we
want the
+dataset to be partitioned. For example:
+
+.. ipython:: python
+
+ part = ds.partitioning(
+ pa.schema([("c", pa.int16())]), flavor="hive"
+ )
+ ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
+
+This will create two files. Half our data will be in the dataset_root/c=1
directory and
+the other half will be in the dataset_root/c=2 directory.
+
+Writing large amounts of data
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The above examples wrote data from a table. If you are writing a large amount
of data
+you may not be able to load everything into a single in-memory table.
Fortunately, the
+write_dataset method also accepts an iterable of record batches. This makes
it really
+simple, for example, to repartition a large dataset without loading the entire
dataset
+into memory:
+
+.. ipython:: python
+
+ old_part = ds.partitioning(
+ pa.schema([("c", pa.int16())]), flavor="hive"
+ )
+ new_part = ds.partitioning(
+ pa.schema([("c", pa.int16())]), flavor=None
+ )
+ input_dataset = ds.dataset(dataset_root, partitioning=old_part)
+ new_root = base / "repartitioned_dataset"
+ # A scanner can act as an iterator of record batches but you could also
receive
+ # data from the network (e.g. via flight), from your own scanning, or from
any
+ # other method that yields record batches. In addition, you can pass a
dataset
+ # into write_dataset directly but this method is useful if you want to
customize
+ # the scanner (e.g. to filter the input dataset or set a maximum batch
size)
+ scanner = input_dataset.scanner()
+
+ ds.write_dataset(scanner, new_root, format="parquet",
partitioning=new_part)
+
+After the above example runs our data will be in dataset_root/1 and
dataset_root/2
Review comment:
Since this example is not changing the partitioning structure (except
for the flavor), it's not directly what this "repartitioning" will do apart
from changing `/c=1/` into `/1/`. Will it also repartition individual files?
(eg write more smaller files in case your input dataset has large files)
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data. When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
-..
- Possible content:
- - fragments (get_fragments)
- - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.
This is
+useful if the dataset is small or there is only a small amount of data that
needs to
+be read. The dataset API contains additional methods to read and process
large amounts
+of data in a streaming fashion.
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.
This
+method returns an iterator of record batches. For example, we can use this
method to
+calculate the average of a column without loading the entire column into
memory:
- for scan_task in dataset.scan(columns=[...], filter=...):
- for record_batch in scan_task.execute():
- # process the record batch
+.. ipython:: python
+
+ import pyarrow.compute as pc
+
+ col2_sum = 0
+ count = 0
+ for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+ col2_sum += pc.sum(batch.column('col2')).as_py()
+ count += batch.num_rows
+ mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and
pyarrow
+uses an object called a :class:`Scanner` to do this. A Scanner is created for
you
+automatically by the to_table and to_batches method of the dataset. Any
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`. This controls the maximum size
of the
Review comment:
```suggestion
One of those parameters is the ``batch_size``. This controls the maximum
size of the
```
(rst uses double backticks for code, in contrast to markdown ..)
##########
File path: python/pyarrow/_hdfs.pyx
##########
@@ -93,9 +93,10 @@ cdef class HadoopFileSystem(FileSystem):
Instantiate HadoopFileSystem object from an URI string.
The following two calls are equivalent
- * HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
- '&replication=1')
- * HadoopFileSystem('localhost', port=8020, user='test', replication=1)
+
+ *
``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test&replication=1')``
# noqa: E501
+
+ * ``HadoopFileSystem('localhost', port=8020, user='test',
replication=1)`` # noqa: E501
Review comment:
I added a suggestion below to fix this. I think that's preferrable since
the ``# noqa: E501`` actually would show up as normal text which would be
strange in the online rendered docstring
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data. When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
-..
- Possible content:
- - fragments (get_fragments)
- - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.
This is
+useful if the dataset is small or there is only a small amount of data that
needs to
+be read. The dataset API contains additional methods to read and process
large amounts
+of data in a streaming fashion.
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.
This
+method returns an iterator of record batches. For example, we can use this
method to
+calculate the average of a column without loading the entire column into
memory:
- for scan_task in dataset.scan(columns=[...], filter=...):
- for record_batch in scan_task.execute():
- # process the record batch
+.. ipython:: python
+
+ import pyarrow.compute as pc
+
+ col2_sum = 0
+ count = 0
+ for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
Review comment:
Maybe add a `columns=["col2"]` as well? (it shows passing a column
selection as well + it is actually what you should do here if you are only
summing that column)
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data. When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
-..
- Possible content:
- - fragments (get_fragments)
- - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.
This is
+useful if the dataset is small or there is only a small amount of data that
needs to
+be read. The dataset API contains additional methods to read and process
large amounts
+of data in a streaming fashion.
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.
This
+method returns an iterator of record batches. For example, we can use this
method to
+calculate the average of a column without loading the entire column into
memory:
- for scan_task in dataset.scan(columns=[...], filter=...):
- for record_batch in scan_task.execute():
- # process the record batch
+.. ipython:: python
+
+ import pyarrow.compute as pc
+
+ col2_sum = 0
+ count = 0
+ for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+ col2_sum += pc.sum(batch.column('col2')).as_py()
+ count += batch.num_rows
+ mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and
pyarrow
+uses an object called a :class:`Scanner` to do this. A Scanner is created for
you
+automatically by the to_table and to_batches method of the dataset. Any
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`. This controls the maximum size
of the
+batches returned by the scanner. Batches can still be smaller than the
`batch_size`
+if the dataset consists of small files or those files themselves consist of
small
+row groups. For example, a parquet file with 10,000 rows per row group will
yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller
value.
+
+The default batch size is one million rows and this is typically a good
default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset. This can be useful
when
+you want to partition your data or you need to write a large amount of data. A
+basic dataset write is similar to writing a table except that you specify a
directory
+instead of a filename.
+
+.. ipython:: python
+
+ base = pathlib.Path(tempfile.gettempdir())
+ dataset_root = base / "sample_dataset"
+ dataset_root.mkdir(exist_ok=True)
+
+ table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] *
5})
+ ds.write_dataset(table, dataset_root, format="parquet")
+
+The above example will create a single file named part-0.parquet in our
sample_dataset
+directory.
+
+.. warning::
+
+ If you run the example again it will replace the existing part-0.parquet
file.
+ Appending files to an existing dataset is not currently supported by this
API and
+ the output directory should be empty for predictable results.
+
+Writing partitioned data
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+A partitioning object can be used to specify how your output data should be
partitioned.
+This uses the same kind of partitioning objects we used for reading datasets.
To write
+our above data out to a partitioned directory we only need to specify how we
want the
+dataset to be partitioned. For example:
+
+.. ipython:: python
+
+ part = ds.partitioning(
+ pa.schema([("c", pa.int16())]), flavor="hive"
+ )
+ ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)
+
+This will create two files. Half our data will be in the dataset_root/c=1
directory and
+the other half will be in the dataset_root/c=2 directory.
+
+Writing large amounts of data
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The above examples wrote data from a table. If you are writing a large amount
of data
+you may not be able to load everything into a single in-memory table.
Fortunately, the
+write_dataset method also accepts an iterable of record batches. This makes
it really
+simple, for example, to repartition a large dataset without loading the entire
dataset
+into memory:
+
+.. ipython:: python
+
+ old_part = ds.partitioning(
+ pa.schema([("c", pa.int16())]), flavor="hive"
+ )
+ new_part = ds.partitioning(
+ pa.schema([("c", pa.int16())]), flavor=None
+ )
+ input_dataset = ds.dataset(dataset_root, partitioning=old_part)
+ new_root = base / "repartitioned_dataset"
+ # A scanner can act as an iterator of record batches but you could also
receive
+ # data from the network (e.g. via flight), from your own scanning, or from
any
+ # other method that yields record batches. In addition, you can pass a
dataset
+ # into write_dataset directly but this method is useful if you want to
customize
+ # the scanner (e.g. to filter the input dataset or set a maximum batch
size)
+ scanner = input_dataset.scanner()
+
+ ds.write_dataset(scanner, new_root, format="parquet",
partitioning=new_part)
+
+After the above example runs our data will be in dataset_root/1 and
dataset_root/2
+directories. You could also use this mechnaism to change which columns you
are partitioned
Review comment:
```suggestion
directories. You could also use this mechnaism to change which columns the
dataset is partitioned
```
##########
File path: docs/source/python/dataset.rst
##########
@@ -456,20 +456,160 @@ is materialized as columns when reading the data and can
be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
+Another benefit of manually scheduling the files is that the order of the files
+controls the order of the data. When performing an ordered read (or a read to
+a table) then the rows returned will match the order of the files given.
-Manual scheduling
------------------
+Iterative (out of core or streaming) reads
+------------------------------------------
-..
- Possible content:
- - fragments (get_fragments)
- - scan / scan tasks / iterators of record batches
+The previous examples have demonstrated how to read the data into a table.
This is
+useful if the dataset is small or there is only a small amount of data that
needs to
+be read. The dataset API contains additional methods to read and process
large amounts
+of data in a streaming fashion.
-The :func:`~Dataset.to_table` method loads all selected data into memory
-at once resulting in a pyarrow Table. Alternatively, a dataset can also be
-scanned one RecordBatch at a time in an iterative manner using the
-:func:`~Dataset.scan` method::
+The easiest way to do this is to use the method :meth:`Dataset.to_batches`.
This
+method returns an iterator of record batches. For example, we can use this
method to
+calculate the average of a column without loading the entire column into
memory:
- for scan_task in dataset.scan(columns=[...], filter=...):
- for record_batch in scan_task.execute():
- # process the record batch
+.. ipython:: python
+
+ import pyarrow.compute as pc
+
+ col2_sum = 0
+ count = 0
+ for batch in dataset.to_batches(filter=~ds.field('col2').is_null()):
+ col2_sum += pc.sum(batch.column('col2')).as_py()
+ count += batch.num_rows
+ mean_a = col2_sum/count
+
+Customizing the batch size
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+An iterative read of a dataset is often called a "scan" of the dataset and
pyarrow
+uses an object called a :class:`Scanner` to do this. A Scanner is created for
you
+automatically by the to_table and to_batches method of the dataset. Any
arguments
+you pass to these methods will be passed on to the Scanner constructor.
+
+One of those parameters is the `batch_size`. This controls the maximum size
of the
+batches returned by the scanner. Batches can still be smaller than the
`batch_size`
+if the dataset consists of small files or those files themselves consist of
small
+row groups. For example, a parquet file with 10,000 rows per row group will
yield
+batches with, at most, 10,000 rows unless the batch_size is set to a smaller
value.
+
+The default batch size is one million rows and this is typically a good
default but
+you may want to customize it if you are reading a large number of columns.
+
+Writing Datasets
+----------------
+
+The dataset API also simplifies writing data to a dataset. This can be useful
when
Review comment:
```suggestion
The dataset API also simplifies writing data to a dataset using
:func:`write_dataset` . This can be useful when
```
(this is not necessarily the best addition, but basically I think it would
be good to add this explicit reference _somewhere_ in this paragraph; it's
useful to have this so that the user has an easy link to click to check the
more detailed function docstring)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]