This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1763622b6f ARROW-16122: [Python] Change use_legacy_dataset default and
deprecate no-longer supported keywords in parquet.write_to_dataset
1763622b6f is described below
commit 1763622b6f60e974e495b3349cc2f4b7caaf1951
Author: Alenka Frim <[email protected]>
AuthorDate: Thu Apr 21 09:49:24 2022 +0200
ARROW-16122: [Python] Change use_legacy_dataset default and deprecate
no-longer supported keywords in parquet.write_to_dataset
This PR tries to amend `pq.write_to_dataset` to:
1. raise a deprecation warning for `use_legacy_dataset=True` and already
switch the default to `False`.
2. raise deprecation warnings for all keywords (when
`use_legacy_dataset=True`) that won't be supported in the new implementation.
Closes #12811 from AlenkaF/ARROW-16122
Lead-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
python/pyarrow/parquet/__init__.py | 104 ++++++++++++++++++++++-----
python/pyarrow/tests/parquet/test_dataset.py | 49 ++++++++++++-
python/pyarrow/tests/test_dataset.py | 5 +-
3 files changed, 135 insertions(+), 23 deletions(-)
diff --git a/python/pyarrow/parquet/__init__.py
b/python/pyarrow/parquet/__init__.py
index 33094dabe6..867babdaf8 100644
--- a/python/pyarrow/parquet/__init__.py
+++ b/python/pyarrow/parquet/__init__.py
@@ -2926,8 +2926,10 @@ def _mkdir_if_not_exists(fs, path):
def write_to_dataset(table, root_path, partition_cols=None,
partition_filename_cb=None, filesystem=None,
- use_legacy_dataset=None, **kwargs):
- """Wrapper around parquet.write_table for writing a Table to
+ use_legacy_dataset=None, schema=None,
+ partitioning=None, basename_template=None,
+ use_threads=None, file_visitor=None, **kwargs):
+ """Wrapper around parquet.write_dataset for writing a Table to
Parquet format by partitions.
For each combination of partition columns and values,
a subdirectories are created in the following
@@ -2962,11 +2964,44 @@ def write_to_dataset(table, root_path,
partition_cols=None,
and allow you to override the partition filename. If nothing is
passed, the filename will consist of a uuid.
use_legacy_dataset : bool
- Default is True unless a ``pyarrow.fs`` filesystem is passed.
- Set to False to enable the new code path (experimental, using the
- new Arrow Dataset API). This is more efficient when using partition
- columns, but does not (yet) support `partition_filename_cb` and
- `metadata_collector` keywords.
+ Default is False. Set to True to use the the legacy behaviour
+ (this option is deprecated, and the legacy implementation will be
+ removed in a future version). The legacy implementation still
+ supports `partition_filename_cb` and `metadata_collector` keywords
+ but is less efficient when using partition columns.
+ use_threads : bool, default True
+ Write files in parallel. If enabled, then maximum parallelism will be
+ used determined by the number of available CPU cores.
+ schema : Schema, optional
+ partitioning : Partitioning or list[str], optional
+ The partitioning scheme specified with the
+ ``pyarrow.dataset.partitioning()`` function or a list of field names.
+ When providing a list of field names, you can use
+ ``partitioning_flavor`` to drive which partitioning type should be
+ used.
+ basename_template : str, optional
+ A template string used to generate basenames of written data files.
+ The token '{i}' will be replaced with an automatically incremented
+ integer. If not specified, it defaults to "guid-{i}.parquet"
+ file_visitor : function
+ If set, this function will be called with a WrittenFile instance
+ for each file created during the call. This object will have both
+ a path attribute and a metadata attribute.
+
+ The path attribute will be a string containing the path to
+ the created file.
+
+ The metadata attribute will be the parquet metadata of the file.
+ This metadata will have the file path attribute set and can be used
+ to build a _metadata file. The metadata attribute will be None if
+ the format is not parquet.
+
+ Example visitor which simple collects the filenames created::
+
+ visited_paths = []
+
+ def file_visitor(written_file):
+ visited_paths.append(written_file.path)
**kwargs : dict,
Additional kwargs for write_table function. See docstring for
`write_table` or `ParquetWriter` for more information.
@@ -2988,26 +3023,24 @@ def write_to_dataset(table, root_path,
partition_cols=None,
>>> import pyarrow.parquet as pq
>>> pq.write_to_dataset(table, root_path='dataset_name_3',
- ... partition_cols=['year'],
- ... use_legacy_dataset=False
- ... )
+ ... partition_cols=['year'])
>>> pq.ParquetDataset('dataset_name_3', use_legacy_dataset=False).files
['dataset_name_3/year=2019/part-0.parquet', ...
Write a single Parquet file into the root folder:
- >>> pq.write_to_dataset(table, root_path='dataset_name_4',
- ... use_legacy_dataset=False)
+ >>> pq.write_to_dataset(table, root_path='dataset_name_4')
>>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files
['dataset_name_4/part-0.parquet']
"""
if use_legacy_dataset is None:
- # if a new filesystem is passed -> default to new implementation
- if isinstance(filesystem, FileSystem):
- use_legacy_dataset = False
- # otherwise the default is still True
- else:
+ # if partition_filename_cb is specified ->
+ # default to the old implementation
+ if partition_filename_cb:
use_legacy_dataset = True
+ # otherwise the default is False
+ else:
+ use_legacy_dataset = False
if not use_legacy_dataset:
import pyarrow.dataset as ds
@@ -3042,13 +3075,48 @@ def write_to_dataset(table, root_path,
partition_cols=None,
part_schema = table.select(partition_cols).schema
partitioning = ds.partitioning(part_schema, flavor="hive")
+ if basename_template is None:
+ basename_template = guid() + '-{i}.parquet'
+
ds.write_dataset(
table, root_path, filesystem=filesystem,
format=parquet_format, file_options=write_options, schema=schema,
partitioning=partitioning, use_threads=use_threads,
- file_visitor=file_visitor)
+ file_visitor=file_visitor,
+ basename_template=basename_template,
+ existing_data_behavior='overwrite_or_ignore')
return
+ # warnings and errors when using legecy implementation
+ if use_legacy_dataset:
+ warnings.warn(
+ "Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
+ "deprecated as of pyarrow 8.0.0, and the legacy implementation "
+ "will be removed in a future version.",
+ FutureWarning, stacklevel=2)
+ msg2 = (
+ "The '{}' argument is not supported with the legacy "
+ "implementation. To use this argument specify "
+ "'use_legacy_dataset=False' while constructing the "
+ "ParquetDataset."
+ )
+ if schema is not None:
+ raise ValueError(msg2.format("schema"))
+ if partitioning is not None:
+ raise ValueError(msg2.format("partitioning"))
+ if use_threads is not None:
+ raise ValueError(msg2.format("use_threads"))
+ if file_visitor is not None:
+ raise ValueError(msg2.format("file_visitor"))
+ if partition_filename_cb is not None:
+ warnings.warn(
+ _DEPR_MSG.format("partition_filename_cb", " Specify "
+ "'use_legacy_dataset=False' while constructing "
+ "the ParquetDataset, and then use the "
+ "'basename_template' parameter instead. For "
+ "usage see `pyarrow.dataset.write_dataset`"),
+ FutureWarning, stacklevel=2)
+
fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)
_mkdir_if_not_exists(fs, root_path)
diff --git a/python/pyarrow/tests/parquet/test_dataset.py
b/python/pyarrow/tests/parquet/test_dataset.py
index 2534cce73c..6326743113 100644
--- a/python/pyarrow/tests/parquet/test_dataset.py
+++ b/python/pyarrow/tests/parquet/test_dataset.py
@@ -1305,6 +1305,7 @@ def _test_write_to_dataset_no_partitions(base_path,
n = 5
for i in range(n):
pq.write_to_dataset(output_table, base_path,
+ use_legacy_dataset=use_legacy_dataset,
filesystem=filesystem)
output_files = [file for file in filesystem.ls(str(base_path))
if file.endswith(".parquet")]
@@ -1544,9 +1545,10 @@ def test_dataset_read_dictionary(tempdir,
use_legacy_dataset):
path = tempdir / "ARROW-3325-dataset"
t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
- # TODO pass use_legacy_dataset (need to fix unique names)
- pq.write_to_dataset(t1, root_path=str(path))
- pq.write_to_dataset(t2, root_path=str(path))
+ pq.write_to_dataset(t1, root_path=str(path),
+ use_legacy_dataset=use_legacy_dataset)
+ pq.write_to_dataset(t2, root_path=str(path),
+ use_legacy_dataset=use_legacy_dataset)
result = pq.ParquetDataset(
path, read_dictionary=['f0'],
@@ -1716,3 +1718,44 @@ def test_parquet_dataset_deprecated_properties(tempdir):
with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
dataset2.pieces
+
+
[email protected]
+def test_parquet_write_to_dataset_deprecated_properties(tempdir):
+ table = pa.table({'a': [1, 2, 3]})
+ path = tempdir / 'data.parquet'
+
+ with pytest.warns(FutureWarning,
+ match="Passing 'use_legacy_dataset=True'"):
+ pq.write_to_dataset(table, path, use_legacy_dataset=True)
+
+ # check also that legacy implementation is set when
+ # partition_filename_cb is specified
+ with pytest.warns(FutureWarning,
+ match="Passing 'use_legacy_dataset=True'"):
+ pq.write_to_dataset(table, path,
+ partition_filename_cb=lambda x: 'filename.parquet')
+
+
[email protected]
+def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir):
+ table = pa.table({'a': [1, 2, 3]})
+ path = tempdir / 'data.parquet'
+
+ with pytest.raises(ValueError, match="schema"):
+ pq.write_to_dataset(table, path, use_legacy_dataset=True,
+ schema=pa.schema([
+ ('a', pa.int32())
+ ]))
+
+ with pytest.raises(ValueError, match="partitioning"):
+ pq.write_to_dataset(table, path, use_legacy_dataset=True,
+ partitioning=["a"])
+
+ with pytest.raises(ValueError, match="use_threads"):
+ pq.write_to_dataset(table, path, use_legacy_dataset=True,
+ use_threads=False)
+
+ with pytest.raises(ValueError, match="file_visitor"):
+ pq.write_to_dataset(table, path, use_legacy_dataset=True,
+ file_visitor=lambda x: x)
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index 9a7f5ea213..19448d3687 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -971,7 +971,7 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None,
filesystem=None):
path = str(tempdir / "test_parquet_dataset")
# write_to_dataset currently requires pandas
- pq.write_to_dataset(table, path,
+ pq.write_to_dataset(table, path, use_legacy_dataset=True,
partition_cols=["part"], chunk_size=chunk_size)
dataset = ds.dataset(
path, format="parquet", partitioning="hive", filesystem=filesystem
@@ -1247,7 +1247,8 @@ def _create_dataset_all_types(tempdir, chunk_size=None):
path = str(tempdir / "test_parquet_dataset_all_types")
# write_to_dataset currently requires pandas
- pq.write_to_dataset(table, path, chunk_size=chunk_size)
+ pq.write_to_dataset(table, path, use_legacy_dataset=True,
+ chunk_size=chunk_size)
return table, ds.dataset(path, format="parquet", partitioning="hive")