Alexey Strokach created BEAM-7446:
-------------------------------------
Summary: Reading from a Cloud Pub/Sub topic inside Python's
DirectRunner results in a "too many open files" error
Key: BEAM-7446
URL: https://issues.apache.org/jira/browse/BEAM-7446
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Affects Versions: 2.12.0
Reporter: Alexey Strokach
Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a
"too many open files" error within 2 minutes for a busy topic.
I am not exactly sure, but it appears that "pubsub.SubscriberClient()" creates
a grpc channel which must be closed explicitly after we are done pulling
messages from that channel. This may have been due to change that was
introduced in grpc v1.12.0
([https://github.com/grpc/grpc/releases/tag/v1.12.0]) ("_a new
grpc.Channel.close method is introduced and correct use of gRPC Python now
requires that channels be closed after use_"). If the underling grcp channel is
not closed and many "pubsub.SubscriberClient" instances are created, the system
may run out of available file handles, resulting in "too many open files"
errors. A similar issue is reported here:
[https://github.com/googleapis/google-cloud-python/issues/5523].
The issue can be reproduced by running the following file:
{code}
# directrunner_streaming_tmof.py
from __future__ import print_function
import multiprocessing
import os
import subprocess
import time
import apache_beam as beam
from google.cloud import pubsub_v1
def count_open_files():
"""Count the number of files opened by current process."""
pid = multiprocessing.current_process().pid
lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
num_open_files = len(lsof_out.strip().split("\n")) - 1
return num_open_files
def start_streaming_pipeline(project_id, subscription_path):
"""Create a simple streaming pipeline."""
runner = beam.runners.direct.DirectRunner()
pipeline_options = beam.pipeline.PipelineOptions(project=project_id,
streaming=True)
taxirides_pc = (
#
beam.Pipeline(runner=runner, options=pipeline_options)
| "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
)
results = taxirides_pc.pipeline.run()
return results
def monitor():
"""Periodically print the number of open files."""
start_time = time.time()
for _ in range(20):
num_open_files = count_open_files()
time_elapsed = time.time() - start_time
print(
"Time elapsed: {:<3s}s, Number of open files: {}".format(
str(round(time_elapsed, 0)), num_open_files
)
)
if num_open_files > 1000:
break
time.sleep(5)
if __name__ == "__main__":
project_id = "{project_id}"
topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project_id,
"taxirides-realtime-sub")
subscription = client.create_subscription(subscription_path, topic_path)
print("Subscription created: {}".format(subscription_path))
try:
results = start_streaming_pipeline(project_id, subscription_path)
monitor()
finally:
client.delete_subscription(subscription_path)
print("Subscription deleted: {}".format(subscription_path))
pass
{code}
Currently, the output from running this script looks something like:
{noformat}
Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
Time elapsed: 0.0s, Number of open files: 160
Time elapsed: 5.0s, Number of open files: 179
Time elapsed: 11.0s, Number of open files: 247
Time elapsed: 16.0s, Number of open files: 339
Time elapsed: 21.0s, Number of open files: 436
Time elapsed: 26.0s, Number of open files: 523
Time elapsed: 31.0s, Number of open files: 615
Time elapsed: 36.0s, Number of open files: 713
Time elapsed: 41.0s, Number of open files: 809
Time elapsed: 46.0s, Number of open files: 903
Time elapsed: 52.0s, Number of open files: 999
Time elapsed: 57.0s, Number of open files: 1095
Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
WARNING:root:The DirectPipelineResult is being garbage-collected while the
DirectRunner is still running the corresponding pipeline. This may lead to
incomplete execution of the pipeline if the main thread exits before pipeline
completion. Consider using result.wait_until_finish() to wait for completion of
pipeline execution.
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)