Martin Thøgersen created ARROW-16029:
----------------------------------------

             Summary: [Python] Runaway process with generator in 
"write_dataset()"
                 Key: ARROW-16029
                 URL: https://issues.apache.org/jira/browse/ARROW-16029
             Project: Apache Arrow
          Issue Type: Bug
            Reporter: Martin Thøgersen


We have a complex containerized data pipeline that keeps running, even if the 
main process fails, so we have to stop the containers manually. The issue boils 
down to the following:

The method {{pyarrow.dataset.write_dataset()}} accepts an [iterable of 
RecordBatch|https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html].
This means that generators should also work, since [a generator is an iterator, 
which is an 
iterable.|https://stackoverflow.com/questions/2776829/difference-between-pythons-generators-and-iterators]

The following mininal example can't be stopped with Ctrl-C/KeyboardInterupt 
(SIGINT signal 2). We need to run at minimum `killall -3 python` (SIGQUIT) to 
close the process.
{code:python}
from time import sleep
import pyarrow as pa
import pyarrow.dataset as ds


def mygenerator():
    i = 0
    while True:
        sleep(0.1)
        i = i + 1
        print(i)
        yield pa.RecordBatch.from_pylist([{"mycol": "myval"}] * 10)


schema = pa.schema([("mycol", pa.string())])

# Does NOT respect KeyboardInterrupt:
ds.write_dataset(data=mygenerator(),
                 schema=schema,
                 base_dir="mydir",
                 format="parquet",
                 existing_data_behavior="overwrite_or_ignore",
                 )
{code}
In practice the generator is not infinite, but represents a series of API calls 
that can't be held in memory.

The following examples shows that generators work well with e.g. 
{{{}pa.Table.from_batches(){}}}. So the issue could be limited to the Dataset 
API?
{code:python}
# Respects KeyboardInterrupt:
for i in mygenerator():
    pass
{code}
{code:python}
# Respects KeyboardInterrupt:
table = pa.Table.from_batches(mygenerator(), schema)
{code}
OS: Ubuntu 20.04.3 LTS
python: 3.8
pyarrow: 7.0.0



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to