jorisvandenbossche commented on a change in pull request #10628:
URL: https://github.com/apache/arrow/pull/10628#discussion_r663986392
##########
File path: python/pyarrow/parquet.py
##########
@@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path,
partition_cols=None,
"implementation."
)
metadata_collector = kwargs.pop('metadata_collector', None)
+ file_visitor = None
if metadata_collector is not None:
- raise ValueError(msg.format("metadata_collector"))
+ def file_visitor(written_file):
+ if written_file.metadata:
Review comment:
Is this `if` check needed (in theory)? Since this will always be parquet
files in this case
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
dataset_reader.to_table(ds.dataset(basedir, format="feather"))
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
import pyarrow.parquet as pq
metadata_collector = []
- for i in range(4):
- table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
- pq.write_to_dataset(
- table, str(root_path), metadata_collector=metadata_collector
- )
+ f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+ table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+ pq.write_to_dataset(
+ table, str(root_path), partition_cols=['f1'],
+ use_legacy_dataset=use_legacy_dataset,
+ metadata_collector=metadata_collector
+ )
+
+ partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
metadata_path = str(root_path / '_metadata')
# write _metadata file
pq.write_metadata(
- table.schema, metadata_path,
+ partitionless_schema, metadata_path,
metadata_collector=metadata_collector
)
- return metadata_path, table
+ return metadata_path, partitionless_schema
@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
-def test_parquet_dataset_factory(tempdir):
[email protected]('use_legacy_dataset', [False, True])
+def test_parquet_dataset_factory(tempdir, use_legacy_dataset):
root_path = tempdir / "test_parquet_dataset"
- metadata_path, table = _create_parquet_dataset_simple(root_path)
+ metadata_path, partitionless_schema = _create_parquet_dataset_simple(
+ root_path, use_legacy_dataset)
dataset = ds.parquet_dataset(metadata_path)
- assert dataset.schema.equals(table.schema)
+ assert dataset.schema.equals(partitionless_schema)
Review comment:
This doesn't seem right to me? I think the dataset's schema should
include the partition columns?
##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None,
format=None,
(e.g. S3)
max_partitions : int, default 1024
Maximum number of partitions any batch may be written into.
+ file_visitor : Function
+ If set, this function will be called with a WrittenFile instance
+ for each file created during the call.
Review comment:
Should we also mention that the Parquet metadata has been updated with
the written file path?
##########
File path: python/pyarrow/dataset.py
##########
@@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None,
format=None,
(e.g. S3)
max_partitions : int, default 1024
Maximum number of partitions any batch may be written into.
+ file_visitor : Function
+ If set, this function will be called with a WrittenFile instance
+ for each file created during the call.
Review comment:
Since "WrittenFile" is not a generally know pyarrow class, I would give
a bit more details on this (eg the fact that you can get the path and (if
parquet) metadata.
And maybe also give an example use case, something like "For example, this
enables to collect the paths or metadata of all written files"
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2672,47 +2672,56 @@ def test_feather_format(tempdir, dataset_reader):
dataset_reader.to_table(ds.dataset(basedir, format="feather"))
-def _create_parquet_dataset_simple(root_path):
+def _create_parquet_dataset_simple(root_path, use_legacy_dataset):
import pyarrow.parquet as pq
metadata_collector = []
- for i in range(4):
- table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
- pq.write_to_dataset(
- table, str(root_path), metadata_collector=metadata_collector
- )
+ f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
+
+ table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)})
+ pq.write_to_dataset(
+ table, str(root_path), partition_cols=['f1'],
+ use_legacy_dataset=use_legacy_dataset,
+ metadata_collector=metadata_collector
+ )
+
+ partitionless_schema = pa.schema([pa.field('f2', pa.float64())])
metadata_path = str(root_path / '_metadata')
# write _metadata file
pq.write_metadata(
- table.schema, metadata_path,
+ partitionless_schema, metadata_path,
Review comment:
Is this change needed?
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2810,11 +2819,11 @@ def test_parquet_dataset_lazy_filtering(tempdir,
open_logging_fs):
# filtering fragments should not open any file
with assert_opens([]):
- list(dataset.get_fragments(ds.field("f1") > 15))
+ list(dataset.get_fragments(ds.field("f2") > 15))
Review comment:
The "f2" column is random normal around 0, so this filter will never
yield anything
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3269,8 +3308,19 @@ def test_write_dataset_parquet(tempdir):
# using default "parquet" format string
+ files_correct_metadata = 0
+
+ def file_visitor(written_file):
+ nonlocal files_correct_metadata
+ if (written_file.metadata is not None and
+ written_file.metadata.num_columns == 3):
+ files_correct_metadata += 1
Review comment:
Can you maybe move this into a separate test? (it makes reading the
current test for basic functionality a bit complicated IMO)
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -2795,7 +2804,7 @@ def test_parquet_dataset_lazy_filtering(tempdir,
open_logging_fs):
# created with ParquetDatasetFactory from a _metadata file
root_path = tempdir / "test_parquet_dataset_lazy_filtering"
- metadata_path, _ = _create_parquet_dataset_simple(root_path)
+ metadata_path, _ = _create_parquet_dataset_simple(root_path, True)
Review comment:
You can also add a default for this keyword in the helper function
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3130,19 +3159,29 @@ def test_write_table(tempdir):
# with partitioning
base_dir = tempdir / 'partitioned'
+ expected_paths = [
+ base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow",
+ base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow"
+ ]
+
+ visited_paths = []
+
+ def file_visitor(written_file):
+ nonlocal visited_paths
Review comment:
Is this `nonlocal` needed?
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3095,10 +3104,30 @@ def test_write_dataset_use_threads(tempdir):
pa.schema([("part", pa.string())]), flavor="hive")
target1 = tempdir / 'partitioned1'
+ paths_written = []
+
+ def file_visitor(written_file):
+ print(f'Visiting {written_file.path}')
Review comment:
We should probably remove this print statement at the end before
merging?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]