[
https://issues.apache.org/jira/browse/ARROW-15438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Li updated ARROW-15438:
-----------------------------
Description:
Found during 7.0.0 verification
{noformat}
pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED
[ 30%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
directory = tempdir / 'ds'
file_format = "parquet"
partition_column_id = 1
column_names = ['c1', 'c2']
record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
['a', 'b', 'c', 'd', 'e', 'a']],
names=column_names)
record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
['a', 'b', 'c', 'd', 'e', 'c']],
names=column_names)
record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
['a', 'b', 'c', 'd', 'e', 'd']],
names=column_names)
record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
['a', 'b', 'c', 'd', 'e', 'b']],
names=column_names)
table = pa.Table.from_batches([record_batch_1, record_batch_2,
record_batch_3, record_batch_4])
partitioning = ds.partitioning(
pa.schema([(column_names[partition_column_id], pa.string())]),
flavor="hive")
data_source_1 = directory / "default"
ds.write_dataset(data=table, base_dir=data_source_1,
partitioning=partitioning, format=file_format)
# Here we consider the number of unique partitions created when
# partitioning column contains duplicate records.
# Returns: (number_of_files_generated, number_of_partitions)
def _get_compare_pair(data_source, record_batch, file_format, col_id):
num_of_files_generated = _get_num_of_files_generated(
base_directory=data_source, file_format=file_format)
number_of_partitions = len(pa.compute.unique(record_batch[col_id]))
return num_of_files_generated, number_of_partitions
# CASE 1: when max_open_files=default & max_open_files >=
num_of_partitions
# In case of a writing to disk via partitioning based on a
# particular column (considering row labels in that column),
# the number of unique rows must be equal
# to the number of files generated
num_of_files_generated, number_of_partitions \
= _get_compare_pair(data_source_1, record_batch_1, file_format,
partition_column_id)
assert num_of_files_generated == number_of_partitions
# CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
# the number of files generated must be greater than the number
of
# partitions
data_source_2 = directory / "max_1"
max_open_files = 3
ds.write_dataset(data=table, base_dir=data_source_2,
partitioning=partitioning, format=file_format,
max_open_files=max_open_files)
num_of_files_generated, number_of_partitions \
= _get_compare_pair(data_source_2, record_batch_1, file_format,
partition_column_id)
> assert num_of_files_generated > number_of_partitions
E assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
{noformat}
was:
Found during 7.0.0 verification
{noformat}
pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED
[ 30%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
directory = tempdir / 'ds'
file_format = "parquet"
partition_column_id = 1
column_names = ['c1', 'c2']
record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
['a', 'b', 'c', 'd', 'e', 'a']],
names=column_names)
record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
['a', 'b', 'c', 'd', 'e', 'c']],
names=column_names)
record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
['a', 'b', 'c', 'd', 'e', 'd']],
names=column_names)
record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
['a', 'b', 'c', 'd', 'e', 'b']],
names=column_names)
table = pa.Table.from_batches([record_batch_1, record_batch_2,
record_batch_3, record_batch_4])
partitioning = ds.partitioning(
pa.schema([(column_names[partition_column_id], pa.string())]),
flavor="hive")
data_source_1 = directory / "default"
ds.write_dataset(data=table, base_dir=data_source_1,
partitioning=partitioning, format=file_format)
# Here we consider the number of unique partitions created when
# partitioning column contains duplicate records.
# Returns: (number_of_files_generated, number_of_partitions)
def _get_compare_pair(data_source, record_batch, file_format, col_id):
num_of_files_generated = _get_num_of_files_generated(
base_directory=data_source, file_format=file_format)
number_of_partitions = len(pa.compute.unique(record_batch[col_id]))
return num_of_files_generated, number_of_partitions
# CASE 1: when max_open_files=default & max_open_files >=
num_of_partitions
# In case of a writing to disk via partitioning based on a
# particular column (considering row labels in that column),
# the number of unique rows must be equal
# to the number of files generated
num_of_files_generated, number_of_partitions \
= _get_compare_pair(data_source_1, record_batch_1, file_format,
partition_column_id)
assert num_of_files_generated == number_of_partitions
# CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
# the number of files generated must be greater than the number
of
# partitions
data_source_2 = directory / "max_1"
max_open_files = 3
ds.write_dataset(data=table, base_dir=data_source_2,
partitioning=partitioning, format=file_format,
max_open_files=max_open_files)
num_of_files_generated, number_of_partitions \
= _get_compare_pair(data_source_2, record_batch_1, file_format,
partition_column_id)
> assert num_of_files_generated > number_of_partitions
E assert 5 > 5pyarrow/tests/test_dataset.py:3807:
AssertionErrorpyarrow/tests/test_flight.py::test_interrupt >
/tmp/arrow/apache-arrow-7.0.0/python/pyarrow/tests/test_flight.py(1937)test()
-> read_all()
(Pdb) c>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PDB continue (IO-capturing
resumed) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
FAILED [ 35%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> captured stdout
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Program
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interrupted. (Use 'cont'
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to resume).
pyarrow.lib.ArrowCancelled: Operation cancelled. Detail: received signal 2
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_interrupt():
if threading.current_thread().ident != threading.main_thread().ident:
pytest.skip("test only works from main Python thread")
# Skips test if not available
raise_signal = util.get_raise_signal()
def signal_from_thread():
time.sleep(0.5)
raise_signal(signal.SIGINT)
exc_types = (KeyboardInterrupt, pa.ArrowCancelled)
def test(read_all):
try:
try:
t = threading.Thread(target=signal_from_thread)
with pytest.raises(exc_types) as exc_info:
t.start()
read_all()
finally:
t.join()
except KeyboardInterrupt:
# In case KeyboardInterrupt didn't interrupt read_all
# above, at least prevent it from stopping the test suite
pytest.fail("KeyboardInterrupt didn't interrupt Flight
read_all")
e = exc_info.value.__context__
assert isinstance(e, pa.ArrowCancelled) or \
isinstance(e, KeyboardInterrupt)
with CancelFlightServer() as server:
client = FlightClient(("localhost", server.port))
reader = client.do_get(flight.Ticket(b""))
> test(reader.read_all)pyarrow/tests/test_flight.py:1952:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ read_all = <built-in method read_all
of pyarrow._flight.FlightStreamReader object at 0x7fe0c37590c0> def
test(read_all):
try:
try:
t = threading.Thread(target=signal_from_thread)
with pytest.raises(exc_types) as exc_info:
t.start()
> read_all()
E AssertionError: assert (False or False)
E + where False = isinstance(None, <class
'pyarrow.lib.ArrowCancelled'>)
E + where <class 'pyarrow.lib.ArrowCancelled'> =
pa.ArrowCancelled
E + and False = isinstance(None,
KeyboardInterrupt)pyarrow/tests/test_flight.py:1937: AssertionError
{noformat}
> [Python] Flaky test test_write_dataset_max_open_files
> -----------------------------------------------------
>
> Key: ARROW-15438
> URL: https://issues.apache.org/jira/browse/ARROW-15438
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Reporter: David Li
> Priority: Major
>
> Found during 7.0.0 verification
> {noformat}
> pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED
> [ 30%]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> =
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
> directory = tempdir / 'ds'
> file_format = "parquet"
> partition_column_id = 1
> column_names = ['c1', 'c2']
> record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
> ['a', 'b', 'c', 'd', 'e',
> 'a']],
> names=column_names)
> record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
> ['a', 'b', 'c', 'd', 'e',
> 'c']],
> names=column_names)
> record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
> ['a', 'b', 'c', 'd', 'e',
> 'd']],
> names=column_names)
> record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
> ['a', 'b', 'c', 'd', 'e',
> 'b']],
> names=column_names)
>
> table = pa.Table.from_batches([record_batch_1, record_batch_2,
> record_batch_3, record_batch_4])
>
> partitioning = ds.partitioning(
> pa.schema([(column_names[partition_column_id], pa.string())]),
> flavor="hive")
>
> data_source_1 = directory / "default"
>
> ds.write_dataset(data=table, base_dir=data_source_1,
> partitioning=partitioning, format=file_format)
>
> # Here we consider the number of unique partitions created when
> # partitioning column contains duplicate records.
> # Returns: (number_of_files_generated, number_of_partitions)
> def _get_compare_pair(data_source, record_batch, file_format, col_id):
> num_of_files_generated = _get_num_of_files_generated(
> base_directory=data_source, file_format=file_format)
> number_of_partitions =
> len(pa.compute.unique(record_batch[col_id]))
> return num_of_files_generated, number_of_partitions
>
> # CASE 1: when max_open_files=default & max_open_files >=
> num_of_partitions
> # In case of a writing to disk via partitioning based on a
> # particular column (considering row labels in that column),
> # the number of unique rows must be equal
> # to the number of files generated
>
> num_of_files_generated, number_of_partitions \
> = _get_compare_pair(data_source_1, record_batch_1, file_format,
> partition_column_id)
> assert num_of_files_generated == number_of_partitions
>
> # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
> # the number of files generated must be greater than the
> number of
> # partitions
>
> data_source_2 = directory / "max_1"
>
> max_open_files = 3
>
> ds.write_dataset(data=table, base_dir=data_source_2,
> partitioning=partitioning, format=file_format,
> max_open_files=max_open_files)
>
> num_of_files_generated, number_of_partitions \
> = _get_compare_pair(data_source_2, record_batch_1, file_format,
> partition_column_id)
> > assert num_of_files_generated > number_of_partitions
> E assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)