[ 
https://issues.apache.org/jira/browse/ARROW-15438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Li updated ARROW-15438:
-----------------------------
    Description: 
Found during 7.0.0 verification
{noformat}
pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED         
                                   [ 30%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  = 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     def 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
        directory = tempdir / 'ds'
        file_format = "parquet"
        partition_column_id = 1
        column_names = ['c1', 'c2']
        record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
                                               ['a', 'b', 'c', 'd', 'e', 'a']],
                                         names=column_names)
        record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'c']],
                                         names=column_names)
        record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'd']],
                                         names=column_names)
        record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'b']],
                                         names=column_names)
    
        table = pa.Table.from_batches([record_batch_1, record_batch_2,
                                       record_batch_3, record_batch_4])
    
        partitioning = ds.partitioning(
            pa.schema([(column_names[partition_column_id], pa.string())]),
            flavor="hive")
    
        data_source_1 = directory / "default"
    
        ds.write_dataset(data=table, base_dir=data_source_1,
                         partitioning=partitioning, format=file_format)
    
        # Here we consider the number of unique partitions created when
        # partitioning column contains duplicate records.
        #   Returns: (number_of_files_generated, number_of_partitions)
        def _get_compare_pair(data_source, record_batch, file_format, col_id):
            num_of_files_generated = _get_num_of_files_generated(
                base_directory=data_source, file_format=file_format)
            number_of_partitions = len(pa.compute.unique(record_batch[col_id]))
            return num_of_files_generated, number_of_partitions
    
        # CASE 1: when max_open_files=default & max_open_files >= 
num_of_partitions
        #         In case of a writing to disk via partitioning based on a
        #         particular column (considering row labels in that column),
        #         the number of unique rows must be equal
        #         to the number of files generated
    
        num_of_files_generated, number_of_partitions \
            = _get_compare_pair(data_source_1, record_batch_1, file_format,
                                partition_column_id)
        assert num_of_files_generated == number_of_partitions
    
        # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
        #         the number of files generated must be greater than the number 
of
        #         partitions
    
        data_source_2 = directory / "max_1"
    
        max_open_files = 3
    
        ds.write_dataset(data=table, base_dir=data_source_2,
                         partitioning=partitioning, format=file_format,
                         max_open_files=max_open_files)
    
        num_of_files_generated, number_of_partitions \
            = _get_compare_pair(data_source_2, record_batch_1, file_format,
                                partition_column_id)
>       assert num_of_files_generated > number_of_partitions
E       assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
 {noformat}

  was:
Found during 7.0.0 verification
{noformat}
pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED         
                                   [ 30%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  = 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     def 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
        directory = tempdir / 'ds'
        file_format = "parquet"
        partition_column_id = 1
        column_names = ['c1', 'c2']
        record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
                                               ['a', 'b', 'c', 'd', 'e', 'a']],
                                         names=column_names)
        record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'c']],
                                         names=column_names)
        record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'd']],
                                         names=column_names)
        record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'b']],
                                         names=column_names)
    
        table = pa.Table.from_batches([record_batch_1, record_batch_2,
                                       record_batch_3, record_batch_4])
    
        partitioning = ds.partitioning(
            pa.schema([(column_names[partition_column_id], pa.string())]),
            flavor="hive")
    
        data_source_1 = directory / "default"
    
        ds.write_dataset(data=table, base_dir=data_source_1,
                         partitioning=partitioning, format=file_format)
    
        # Here we consider the number of unique partitions created when
        # partitioning column contains duplicate records.
        #   Returns: (number_of_files_generated, number_of_partitions)
        def _get_compare_pair(data_source, record_batch, file_format, col_id):
            num_of_files_generated = _get_num_of_files_generated(
                base_directory=data_source, file_format=file_format)
            number_of_partitions = len(pa.compute.unique(record_batch[col_id]))
            return num_of_files_generated, number_of_partitions
    
        # CASE 1: when max_open_files=default & max_open_files >= 
num_of_partitions
        #         In case of a writing to disk via partitioning based on a
        #         particular column (considering row labels in that column),
        #         the number of unique rows must be equal
        #         to the number of files generated
    
        num_of_files_generated, number_of_partitions \
            = _get_compare_pair(data_source_1, record_batch_1, file_format,
                                partition_column_id)
        assert num_of_files_generated == number_of_partitions
    
        # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
        #         the number of files generated must be greater than the number 
of
        #         partitions
    
        data_source_2 = directory / "max_1"
    
        max_open_files = 3
    
        ds.write_dataset(data=table, base_dir=data_source_2,
                         partitioning=partitioning, format=file_format,
                         max_open_files=max_open_files)
    
        num_of_files_generated, number_of_partitions \
            = _get_compare_pair(data_source_2, record_batch_1, file_format,
                                partition_column_id)
>       assert num_of_files_generated > number_of_partitions
E       assert 5 > 5pyarrow/tests/test_dataset.py:3807: 
AssertionErrorpyarrow/tests/test_flight.py::test_interrupt > 
/tmp/arrow/apache-arrow-7.0.0/python/pyarrow/tests/test_flight.py(1937)test()
-> read_all()
(Pdb) c>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PDB continue (IO-capturing 
resumed) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
FAILED                                                                [ 35%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> captured stdout 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Program
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  interrupted. (Use 'cont' 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to resume).
pyarrow.lib.ArrowCancelled: Operation cancelled. Detail: received signal 2
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     def 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_interrupt():
        if threading.current_thread().ident != threading.main_thread().ident:
            pytest.skip("test only works from main Python thread")
        # Skips test if not available
        raise_signal = util.get_raise_signal()
    
        def signal_from_thread():
            time.sleep(0.5)
            raise_signal(signal.SIGINT)
    
        exc_types = (KeyboardInterrupt, pa.ArrowCancelled)
    
        def test(read_all):
            try:
                try:
                    t = threading.Thread(target=signal_from_thread)
                    with pytest.raises(exc_types) as exc_info:
                        t.start()
                        read_all()
                finally:
                    t.join()
            except KeyboardInterrupt:
                # In case KeyboardInterrupt didn't interrupt read_all
                # above, at least prevent it from stopping the test suite
                pytest.fail("KeyboardInterrupt didn't interrupt Flight 
read_all")
            e = exc_info.value.__context__
            assert isinstance(e, pa.ArrowCancelled) or \
                isinstance(e, KeyboardInterrupt)
    
        with CancelFlightServer() as server:
            client = FlightClient(("localhost", server.port))
    
            reader = client.do_get(flight.Ticket(b""))
>           test(reader.read_all)pyarrow/tests/test_flight.py:1952: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ read_all = <built-in method read_all 
of pyarrow._flight.FlightStreamReader object at 0x7fe0c37590c0>    def 
test(read_all):
        try:
            try:
                t = threading.Thread(target=signal_from_thread)
                with pytest.raises(exc_types) as exc_info:
                    t.start()
>                   read_all()
E                   AssertionError: assert (False or False)
E                    +  where False = isinstance(None, <class 
'pyarrow.lib.ArrowCancelled'>)
E                    +    where <class 'pyarrow.lib.ArrowCancelled'> = 
pa.ArrowCancelled
E                    +  and   False = isinstance(None, 
KeyboardInterrupt)pyarrow/tests/test_flight.py:1937: AssertionError
 {noformat}


> [Python] Flaky test test_write_dataset_max_open_files
> -----------------------------------------------------
>
>                 Key: ARROW-15438
>                 URL: https://issues.apache.org/jira/browse/ARROW-15438
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: David Li
>            Priority: Major
>
> Found during 7.0.0 verification
> {noformat}
> pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED       
>                                      [ 30%]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback 
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  = 
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     def 
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test_write_dataset_max_open_files(tempdir):
>         directory = tempdir / 'ds'
>         file_format = "parquet"
>         partition_column_id = 1
>         column_names = ['c1', 'c2']
>         record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
>                                                ['a', 'b', 'c', 'd', 'e', 
> 'a']],
>                                          names=column_names)
>         record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
>                                                ['a', 'b', 'c', 'd', 'e', 
> 'c']],
>                                          names=column_names)
>         record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
>                                                ['a', 'b', 'c', 'd', 'e', 
> 'd']],
>                                          names=column_names)
>         record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
>                                                ['a', 'b', 'c', 'd', 'e', 
> 'b']],
>                                          names=column_names)
>     
>         table = pa.Table.from_batches([record_batch_1, record_batch_2,
>                                        record_batch_3, record_batch_4])
>     
>         partitioning = ds.partitioning(
>             pa.schema([(column_names[partition_column_id], pa.string())]),
>             flavor="hive")
>     
>         data_source_1 = directory / "default"
>     
>         ds.write_dataset(data=table, base_dir=data_source_1,
>                          partitioning=partitioning, format=file_format)
>     
>         # Here we consider the number of unique partitions created when
>         # partitioning column contains duplicate records.
>         #   Returns: (number_of_files_generated, number_of_partitions)
>         def _get_compare_pair(data_source, record_batch, file_format, col_id):
>             num_of_files_generated = _get_num_of_files_generated(
>                 base_directory=data_source, file_format=file_format)
>             number_of_partitions = 
> len(pa.compute.unique(record_batch[col_id]))
>             return num_of_files_generated, number_of_partitions
>     
>         # CASE 1: when max_open_files=default & max_open_files >= 
> num_of_partitions
>         #         In case of a writing to disk via partitioning based on a
>         #         particular column (considering row labels in that column),
>         #         the number of unique rows must be equal
>         #         to the number of files generated
>     
>         num_of_files_generated, number_of_partitions \
>             = _get_compare_pair(data_source_1, record_batch_1, file_format,
>                                 partition_column_id)
>         assert num_of_files_generated == number_of_partitions
>     
>         # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
>         #         the number of files generated must be greater than the 
> number of
>         #         partitions
>     
>         data_source_2 = directory / "max_1"
>     
>         max_open_files = 3
>     
>         ds.write_dataset(data=table, base_dir=data_source_2,
>                          partitioning=partitioning, format=file_format,
>                          max_open_files=max_open_files)
>     
>         num_of_files_generated, number_of_partitions \
>             = _get_compare_pair(data_source_2, record_batch_1, file_format,
>                                 partition_column_id)
> >       assert num_of_files_generated > number_of_partitions
> E       assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
>  {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to