hu6360567 opened a new issue, #36749:
URL: https://github.com/apache/arrow/issues/36749
### Describe the bug, including details regarding any error messages,
version, and platform.
The following code is randomly deadlocked.
```python
import concurrent.futures
import pyarrow as pa
import pyarrow.flight
def write(writer, prefix):
schema = pa.schema([("id", pa.utf8())])
writer.begin(schema)
for i in range(10):
print(f"{prefix} write {i}")
writer.write(
pa.record_batch([[prefix]], schema)
)
writer.close()
def read(reader, prefix):
i = 0
for rb in reader.to_reader():
print(f"{prefix} {i} [{rb.to_pylist()}]")
i += 1
print(f"{prefix} in-read done")
class ScratchServer(pa.flight.FlightServerBase):
def do_exchange(self, context, descriptor, reader:
pa.flight.MetadataRecordBatchReader,
writer: pa.flight.MetadataRecordBatchWriter):
print("do exchange")
with concurrent.futures.ThreadPoolExecutor() as executor:
write_task = executor.submit(write, writer, "server")
read_task = executor.submit(read, reader, "server")
write_task.add_done_callback(lambda _: print("server write
done"))
read_task.add_done_callback(lambda _: print("server read done"))
print("do exchange done")
if __name__ == "__main__":
server = ScratchServer(location="grpc://127.0.0.1:12345")
print(">>>>>>")
client = pa.flight.FlightClient(location="grpc://127.0.0.1:12345")
writer, reader =
client.do_exchange(pa.flight.FlightDescriptor.for_command(b""))
with concurrent.futures.ThreadPoolExecutor() as executor:
write_task = executor.submit(write, writer, "client")
read_task = executor.submit(read, reader, "client")
write_task.add_done_callback(lambda _: print("client write done"))
read_task.add_done_callback(lambda _: print("client read done"))
server.shutdown()
```
When a writer print "done", the peer reader should print "done" as well.
The snippet code may stop as expected sometims, but it may stuck when one
side write has done, but the reader stuck forever.
```
>>>>>>
do exchange
client write 0
server write 0
server write 1
client 0 [[{'id': 'server'}]]
server write 2
client 1 [[{'id': 'server'}]]
server write 3
client 2 [[{'id': 'server'}]]server write 4
server write 5
client 3 [[{'id': 'server'}]]
client 4 [[{'id': 'server'}]]
client 5 [[{'id': 'server'}]]
server write 6
server write 7
client 6 [[{'id': 'server'}]]
server write 8
client 7 [[{'id': 'server'}]]
server write 9
client write 1
client 8 [[{'id': 'server'}]]
server 0 [[{'id': 'client'}]]
server write doneclient write 2
client 9 [[{'id': 'server'}]]
server 1 [[{'id': 'client'}]]
client write 3
server 2 [[{'id': 'client'}]]
client write 4
server 3 [[{'id': 'client'}]]
client write 5
server 4 [[{'id': 'client'}]]
client write 6
server 5 [[{'id': 'client'}]]
client write 7
client write 8
server 6 [[{'id': 'client'}]]
client write 9
server 7 [[{'id': 'client'}]]
server 8 [[{'id': 'client'}]]
server 9 [[{'id': 'client'}]]
```
### Component(s)
Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]