This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1763622b6f ARROW-16122: [Python] Change use_legacy_dataset default and 
deprecate no-longer supported keywords in parquet.write_to_dataset
1763622b6f is described below

commit 1763622b6f60e974e495b3349cc2f4b7caaf1951
Author: Alenka Frim <[email protected]>
AuthorDate: Thu Apr 21 09:49:24 2022 +0200

    ARROW-16122: [Python] Change use_legacy_dataset default and deprecate 
no-longer supported keywords in parquet.write_to_dataset
    
    This PR tries to amend `pq.write_to_dataset` to:
    
    1. raise a deprecation warning for `use_legacy_dataset=True` and already 
switch the default to `False`.
    2. raise deprecation warnings for all keywords (when 
`use_legacy_dataset=True`) that won't be supported in the new implementation.
    
    Closes #12811 from AlenkaF/ARROW-16122
    
    Lead-authored-by: Alenka Frim <[email protected]>
    Co-authored-by: Alenka Frim <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 python/pyarrow/parquet/__init__.py           | 104 ++++++++++++++++++++++-----
 python/pyarrow/tests/parquet/test_dataset.py |  49 ++++++++++++-
 python/pyarrow/tests/test_dataset.py         |   5 +-
 3 files changed, 135 insertions(+), 23 deletions(-)

diff --git a/python/pyarrow/parquet/__init__.py 
b/python/pyarrow/parquet/__init__.py
index 33094dabe6..867babdaf8 100644
--- a/python/pyarrow/parquet/__init__.py
+++ b/python/pyarrow/parquet/__init__.py
@@ -2926,8 +2926,10 @@ def _mkdir_if_not_exists(fs, path):
 
 def write_to_dataset(table, root_path, partition_cols=None,
                      partition_filename_cb=None, filesystem=None,
-                     use_legacy_dataset=None, **kwargs):
-    """Wrapper around parquet.write_table for writing a Table to
+                     use_legacy_dataset=None, schema=None,
+                     partitioning=None, basename_template=None,
+                     use_threads=None, file_visitor=None, **kwargs):
+    """Wrapper around parquet.write_dataset for writing a Table to
     Parquet format by partitions.
     For each combination of partition columns and values,
     a subdirectories are created in the following
@@ -2962,11 +2964,44 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
         and allow you to override the partition filename. If nothing is
         passed, the filename will consist of a uuid.
     use_legacy_dataset : bool
-        Default is True unless a ``pyarrow.fs`` filesystem is passed.
-        Set to False to enable the new code path (experimental, using the
-        new Arrow Dataset API). This is more efficient when using partition
-        columns, but does not (yet) support `partition_filename_cb` and
-        `metadata_collector` keywords.
+        Default is False. Set to True to use the the legacy behaviour
+        (this option is deprecated, and the legacy implementation will be
+        removed in a future version). The legacy implementation still
+        supports `partition_filename_cb` and `metadata_collector` keywords
+        but is less efficient when using partition columns.
+    use_threads : bool, default True
+        Write files in parallel. If enabled, then maximum parallelism will be
+        used determined by the number of available CPU cores.
+    schema : Schema, optional
+    partitioning : Partitioning or list[str], optional
+        The partitioning scheme specified with the
+        ``pyarrow.dataset.partitioning()`` function or a list of field names.
+        When providing a list of field names, you can use
+        ``partitioning_flavor`` to drive which partitioning type should be
+        used.
+    basename_template : str, optional
+        A template string used to generate basenames of written data files.
+        The token '{i}' will be replaced with an automatically incremented
+        integer. If not specified, it defaults to "guid-{i}.parquet"
+    file_visitor : function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.  This object will have both
+        a path attribute and a metadata attribute.
+
+        The path attribute will be a string containing the path to
+        the created file.
+
+        The metadata attribute will be the parquet metadata of the file.
+        This metadata will have the file path attribute set and can be used
+        to build a _metadata file.  The metadata attribute will be None if
+        the format is not parquet.
+
+        Example visitor which simple collects the filenames created::
+
+            visited_paths = []
+
+            def file_visitor(written_file):
+                visited_paths.append(written_file.path)
     **kwargs : dict,
         Additional kwargs for write_table function. See docstring for
         `write_table` or `ParquetWriter` for more information.
@@ -2988,26 +3023,24 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
 
     >>> import pyarrow.parquet as pq
     >>> pq.write_to_dataset(table, root_path='dataset_name_3',
-    ...                     partition_cols=['year'],
-    ...                     use_legacy_dataset=False
-    ...                    )
+    ...                     partition_cols=['year'])
     >>> pq.ParquetDataset('dataset_name_3', use_legacy_dataset=False).files
     ['dataset_name_3/year=2019/part-0.parquet', ...
 
     Write a single Parquet file into the root folder:
 
-    >>> pq.write_to_dataset(table, root_path='dataset_name_4',
-    ...                     use_legacy_dataset=False)
+    >>> pq.write_to_dataset(table, root_path='dataset_name_4')
     >>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files
     ['dataset_name_4/part-0.parquet']
     """
     if use_legacy_dataset is None:
-        # if a new filesystem is passed -> default to new implementation
-        if isinstance(filesystem, FileSystem):
-            use_legacy_dataset = False
-        # otherwise the default is still True
-        else:
+        # if partition_filename_cb is specified ->
+        # default to the old implementation
+        if partition_filename_cb:
             use_legacy_dataset = True
+        # otherwise the default is False
+        else:
+            use_legacy_dataset = False
 
     if not use_legacy_dataset:
         import pyarrow.dataset as ds
@@ -3042,13 +3075,48 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
             part_schema = table.select(partition_cols).schema
             partitioning = ds.partitioning(part_schema, flavor="hive")
 
+        if basename_template is None:
+            basename_template = guid() + '-{i}.parquet'
+
         ds.write_dataset(
             table, root_path, filesystem=filesystem,
             format=parquet_format, file_options=write_options, schema=schema,
             partitioning=partitioning, use_threads=use_threads,
-            file_visitor=file_visitor)
+            file_visitor=file_visitor,
+            basename_template=basename_template,
+            existing_data_behavior='overwrite_or_ignore')
         return
 
+    # warnings and errors when using legecy implementation
+    if use_legacy_dataset:
+        warnings.warn(
+            "Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
+            "deprecated as of pyarrow 8.0.0, and the legacy implementation "
+            "will be removed in a future version.",
+            FutureWarning, stacklevel=2)
+    msg2 = (
+        "The '{}' argument is not supported with the legacy "
+        "implementation. To use this argument specify "
+        "'use_legacy_dataset=False' while constructing the "
+        "ParquetDataset."
+    )
+    if schema is not None:
+        raise ValueError(msg2.format("schema"))
+    if partitioning is not None:
+        raise ValueError(msg2.format("partitioning"))
+    if use_threads is not None:
+        raise ValueError(msg2.format("use_threads"))
+    if file_visitor is not None:
+        raise ValueError(msg2.format("file_visitor"))
+    if partition_filename_cb is not None:
+        warnings.warn(
+            _DEPR_MSG.format("partition_filename_cb", " Specify "
+                             "'use_legacy_dataset=False' while constructing "
+                             "the ParquetDataset, and then use the "
+                             "'basename_template' parameter instead. For "
+                             "usage see `pyarrow.dataset.write_dataset`"),
+            FutureWarning, stacklevel=2)
+
     fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)
 
     _mkdir_if_not_exists(fs, root_path)
diff --git a/python/pyarrow/tests/parquet/test_dataset.py 
b/python/pyarrow/tests/parquet/test_dataset.py
index 2534cce73c..6326743113 100644
--- a/python/pyarrow/tests/parquet/test_dataset.py
+++ b/python/pyarrow/tests/parquet/test_dataset.py
@@ -1305,6 +1305,7 @@ def _test_write_to_dataset_no_partitions(base_path,
     n = 5
     for i in range(n):
         pq.write_to_dataset(output_table, base_path,
+                            use_legacy_dataset=use_legacy_dataset,
                             filesystem=filesystem)
     output_files = [file for file in filesystem.ls(str(base_path))
                     if file.endswith(".parquet")]
@@ -1544,9 +1545,10 @@ def test_dataset_read_dictionary(tempdir, 
use_legacy_dataset):
     path = tempdir / "ARROW-3325-dataset"
     t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
     t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
-    # TODO pass use_legacy_dataset (need to fix unique names)
-    pq.write_to_dataset(t1, root_path=str(path))
-    pq.write_to_dataset(t2, root_path=str(path))
+    pq.write_to_dataset(t1, root_path=str(path),
+                        use_legacy_dataset=use_legacy_dataset)
+    pq.write_to_dataset(t2, root_path=str(path),
+                        use_legacy_dataset=use_legacy_dataset)
 
     result = pq.ParquetDataset(
         path, read_dictionary=['f0'],
@@ -1716,3 +1718,44 @@ def test_parquet_dataset_deprecated_properties(tempdir):
 
     with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
         dataset2.pieces
+
+
[email protected]
+def test_parquet_write_to_dataset_deprecated_properties(tempdir):
+    table = pa.table({'a': [1, 2, 3]})
+    path = tempdir / 'data.parquet'
+
+    with pytest.warns(FutureWarning,
+                      match="Passing 'use_legacy_dataset=True'"):
+        pq.write_to_dataset(table, path, use_legacy_dataset=True)
+
+    # check also that legacy implementation is set when
+    # partition_filename_cb is specified
+    with pytest.warns(FutureWarning,
+                      match="Passing 'use_legacy_dataset=True'"):
+        pq.write_to_dataset(table, path,
+                            partition_filename_cb=lambda x: 'filename.parquet')
+
+
[email protected]
+def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir):
+    table = pa.table({'a': [1, 2, 3]})
+    path = tempdir / 'data.parquet'
+
+    with pytest.raises(ValueError, match="schema"):
+        pq.write_to_dataset(table, path, use_legacy_dataset=True,
+                            schema=pa.schema([
+                                ('a', pa.int32())
+                            ]))
+
+    with pytest.raises(ValueError, match="partitioning"):
+        pq.write_to_dataset(table, path, use_legacy_dataset=True,
+                            partitioning=["a"])
+
+    with pytest.raises(ValueError, match="use_threads"):
+        pq.write_to_dataset(table, path, use_legacy_dataset=True,
+                            use_threads=False)
+
+    with pytest.raises(ValueError, match="file_visitor"):
+        pq.write_to_dataset(table, path, use_legacy_dataset=True,
+                            file_visitor=lambda x: x)
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index 9a7f5ea213..19448d3687 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -971,7 +971,7 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None, 
filesystem=None):
     path = str(tempdir / "test_parquet_dataset")
 
     # write_to_dataset currently requires pandas
-    pq.write_to_dataset(table, path,
+    pq.write_to_dataset(table, path, use_legacy_dataset=True,
                         partition_cols=["part"], chunk_size=chunk_size)
     dataset = ds.dataset(
         path, format="parquet", partitioning="hive", filesystem=filesystem
@@ -1247,7 +1247,8 @@ def _create_dataset_all_types(tempdir, chunk_size=None):
     path = str(tempdir / "test_parquet_dataset_all_types")
 
     # write_to_dataset currently requires pandas
-    pq.write_to_dataset(table, path, chunk_size=chunk_size)
+    pq.write_to_dataset(table, path, use_legacy_dataset=True,
+                        chunk_size=chunk_size)
 
     return table, ds.dataset(path, format="parquet", partitioning="hive")
 

Reply via email to