[
https://issues.apache.org/jira/browse/ARROW-15438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482134#comment-17482134
]
Weston Pace commented on ARROW-15438:
-------------------------------------
I was able to reproduce fairly regularly and confirmed the issue was the order
in which the batches were delivered to the dataset writer. Unfortunately, we
were also not completely respecting the {{use_threads}} option in
{{write_dataset}}. If {{use_threads=False}} then we would scan in serial but
still send batches into the exec plan in parallel. I added a new PR that sets
{{use_threads=False}} (based on Vibhatha's PR) and also updates the Write
method to be serial.
I don't know if this bug should block the RC but if we cut another RC it would
probably be nice to include.
> [Python] Flaky test test_write_dataset_max_open_files
> -----------------------------------------------------
>
> Key: ARROW-15438
> URL: https://issues.apache.org/jira/browse/ARROW-15438
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Reporter: David Li
> Assignee: Vibhatha Lakmal Abeykoon
> Priority: Major
> Labels: pull-request-available
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Found during 7.0.0 verification
> {noformat}
> pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED
> [ 30%]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> =
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
> directory = tempdir / 'ds'
> file_format = "parquet"
> partition_column_id = 1
> column_names = ['c1', 'c2']
> record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
> ['a', 'b', 'c', 'd', 'e',
> 'a']],
> names=column_names)
> record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
> ['a', 'b', 'c', 'd', 'e',
> 'c']],
> names=column_names)
> record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
> ['a', 'b', 'c', 'd', 'e',
> 'd']],
> names=column_names)
> record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
> ['a', 'b', 'c', 'd', 'e',
> 'b']],
> names=column_names)
>
> table = pa.Table.from_batches([record_batch_1, record_batch_2,
> record_batch_3, record_batch_4])
>
> partitioning = ds.partitioning(
> pa.schema([(column_names[partition_column_id], pa.string())]),
> flavor="hive")
>
> data_source_1 = directory / "default"
>
> ds.write_dataset(data=table, base_dir=data_source_1,
> partitioning=partitioning, format=file_format)
>
> # Here we consider the number of unique partitions created when
> # partitioning column contains duplicate records.
> # Returns: (number_of_files_generated, number_of_partitions)
> def _get_compare_pair(data_source, record_batch, file_format, col_id):
> num_of_files_generated = _get_num_of_files_generated(
> base_directory=data_source, file_format=file_format)
> number_of_partitions =
> len(pa.compute.unique(record_batch[col_id]))
> return num_of_files_generated, number_of_partitions
>
> # CASE 1: when max_open_files=default & max_open_files >=
> num_of_partitions
> # In case of a writing to disk via partitioning based on a
> # particular column (considering row labels in that column),
> # the number of unique rows must be equal
> # to the number of files generated
>
> num_of_files_generated, number_of_partitions \
> = _get_compare_pair(data_source_1, record_batch_1, file_format,
> partition_column_id)
> assert num_of_files_generated == number_of_partitions
>
> # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
> # the number of files generated must be greater than the
> number of
> # partitions
>
> data_source_2 = directory / "max_1"
>
> max_open_files = 3
>
> ds.write_dataset(data=table, base_dir=data_source_2,
> partitioning=partitioning, format=file_format,
> max_open_files=max_open_files)
>
> num_of_files_generated, number_of_partitions \
> = _get_compare_pair(data_source_2, record_batch_1, file_format,
> partition_column_id)
> > assert num_of_files_generated > number_of_partitions
> E assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)