jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663986392



##########
File path: python/pyarrow/parquet.py
##########
@@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
             "implementation."
         )
         metadata_collector = kwargs.pop('metadata_collector', None)
+        file_visitor = None
         if metadata_collector is not None:
-            raise ValueError(msg.format("metadata_collector"))
+            def file_visitor(written_file):
+                if written_file.metadata:

Review comment:
       Is this `if` check needed (in theory)? Since this will always be parquet 
files in this case

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,
         metadata_collector=metadata_collector
     )
-    return metadata_path, table
+    return metadata_path, partitionless_schema
 
 
 @pytest.mark.parquet
 @pytest.mark.pandas  # write_to_dataset currently requires pandas
-def test_parquet_dataset_factory(tempdir):
[email protected]('use_legacy_dataset', [False, True])
+def test_parquet_dataset_factory(tempdir, use_legacy_dataset):
     root_path = tempdir / "test_parquet_dataset"
-    metadata_path, table = _create_parquet_dataset_simple(root_path)
+    metadata_path, partitionless_schema = _create_parquet_dataset_simple(
+        root_path, use_legacy_dataset)
     dataset = ds.parquet_dataset(metadata_path)
-    assert dataset.schema.equals(table.schema)
+    assert dataset.schema.equals(partitionless_schema)

Review comment:
       This doesn't seem right to me? I think the dataset's schema should 
include the partition columns?

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, 
format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       Should we also mention that the Parquet metadata has been updated with 
the written file path?

##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, 
format=None,
         (e.g. S3)
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
+    file_visitor : Function
+        If set, this function will be called with a WrittenFile instance
+        for each file created during the call.

Review comment:
       Since "WrittenFile" is not a generally know pyarrow class, I would give 
a bit more details on this (eg the fact that you can get the path and (if 
parquet) metadata. 
   
   And maybe also give an example use case, something like "For example, this 
enables to collect the paths or metadata of all written files"

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
         dataset_reader.to_table(ds.dataset(basedir, format="feather"))
 
 
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
     import pyarrow.parquet as pq
 
     metadata_collector = []
 
-    for i in range(4):
-        table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
-        pq.write_to_dataset(
-            table, str(root_path), metadata_collector=metadata_collector
-        )
+    f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+    table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+    pq.write_to_dataset(
+        table, str(root_path), partition_cols=['f1'],
+        use_legacy_dataset=use_legacy_dataset,
+        metadata_collector=metadata_collector
+    )
+
+    partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
 
     metadata_path = str(root_path / '_metadata')
     # write _metadata file
     pq.write_metadata(
-        table.schema, metadata_path,
+        partitionless_schema, metadata_path,

Review comment:
       Is this change needed?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2810,11 +2819,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, 
open_logging_fs):
 
     # filtering fragments should not open any file
     with assert_opens([]):
-        list(dataset.get_fragments(ds.field("f1") > 15))
+        list(dataset.get_fragments(ds.field("f2") > 15))

Review comment:
       The "f2" column is random normal around 0, so this filter will never 
yield anything

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3269,8 +3308,19 @@ def test_write_dataset_parquet(tempdir):
 
     # using default "parquet" format string
 
+    files_correct_metadata = 0
+
+    def file_visitor(written_file):
+        nonlocal files_correct_metadata
+        if (written_file.metadata is not None and
+                written_file.metadata.num_columns == 3):
+            files_correct_metadata += 1

Review comment:
       Can you maybe move this into a separate test? (it makes reading the 
current test for basic functionality a bit complicated IMO)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2795,7 +2804,7 @@ def test_parquet_dataset_lazy_filtering(tempdir, 
open_logging_fs):
     # created with ParquetDatasetFactory from a _metadata file
 
     root_path = tempdir / "test_parquet_dataset_lazy_filtering"
-    metadata_path, _ = _create_parquet_dataset_simple(root_path)
+    metadata_path, _ = _create_parquet_dataset_simple(root_path, True)

Review comment:
       You can also add a default for this keyword in the helper function

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3130,19 +3159,29 @@ def test_write_table(tempdir):
 
     # with partitioning
     base_dir = tempdir / 'partitioned'
+    expected_paths = [
+        base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow",
+        base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow"
+    ]
+
+    visited_paths = []
+
+    def file_visitor(written_file):
+        nonlocal visited_paths

Review comment:
       Is this `nonlocal` needed?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,30 @@ def test_write_dataset_use_threads(tempdir):
         pa.schema([("part", pa.string())]), flavor="hive")
 
     target1 = tempdir / 'partitioned1'
+    paths_written = []
+
+    def file_visitor(written_file):
+        print(f'Visiting {written_file.path}')

Review comment:
       We should probably remove this print statement at the end before 
merging? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to