[
https://issues.apache.org/jira/browse/BEAM-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexey Strokach closed BEAM-7446.
---------------------------------
Resolution: Fixed
Fix Version/s: 2.14.0
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a
> "too many open files" error
> --------------------------------------------------------------------------------------------------------
>
> Key: BEAM-7446
> URL: https://issues.apache.org/jira/browse/BEAM-7446
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.12.0
> Reporter: Alexey Strokach
> Priority: Minor
> Labels: pull-request-available
> Fix For: 2.14.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a
> "too many open files" error within 2 minutes for a busy topic.
> I am not exactly sure, but it appears that "pubsub.SubscriberClient()"
> creates a grpc channel which must be closed explicitly after we are done
> pulling messages from that channel. This may have been due to change that was
> introduced in [grpc
> v1.12.0|https://github.com/grpc/grpc/releases/tag/v1.12.0] ("_a new
> grpc.Channel.close method is introduced and correct use of gRPC Python now
> requires that channels be closed after use_"). If the underling grcp channel
> is not closed and many "pubsub.SubscriberClient" instances are created, the
> system may run out of available file handles, resulting in "too many open
> files" errors. A similar issue is reported here:
> [https://github.com/googleapis/google-cloud-python/issues/5523].
> The issue can be reproduced by running the following file:
> {code:java}
> # directrunner_streaming_tmof.py
> from __future__ import print_function
> import multiprocessing
> import os
> import subprocess
> import time
> import apache_beam as beam
> from google.cloud import pubsub_v1
> def count_open_files():
> """Count the number of files opened by current process."""
> pid = multiprocessing.current_process().pid
> lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
> num_open_files = len(lsof_out.strip().split("\n")) - 1
> return num_open_files
> def start_streaming_pipeline(project_id, subscription_path):
> """Create a simple streaming pipeline."""
> runner = beam.runners.direct.DirectRunner()
> pipeline_options = beam.pipeline.PipelineOptions(project=project_id,
> streaming=True)
> taxirides_pc = (
> #
> beam.Pipeline(runner=runner, options=pipeline_options)
> | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
> )
> results = taxirides_pc.pipeline.run()
> return results
> def monitor():
> """Periodically print the number of open files."""
> start_time = time.time()
> for _ in range(20):
> num_open_files = count_open_files()
> time_elapsed = time.time() - start_time
> print(
> "Time elapsed: {:<3s}s, Number of open files: {}".format(
> str(round(time_elapsed, 0)), num_open_files
> )
> )
> if num_open_files > 1000:
> break
> time.sleep(5)
> if __name__ == "__main__":
> project_id = "{project_id}"
> topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
> client = pubsub_v1.SubscriberClient()
> subscription_path = client.subscription_path(project_id,
> "taxirides-realtime-sub")
> subscription = client.create_subscription(subscription_path, topic_path)
> print("Subscription created: {}".format(subscription_path))
> try:
> results = start_streaming_pipeline(project_id, subscription_path)
> monitor()
> finally:
> client.delete_subscription(subscription_path)
> print("Subscription deleted: {}".format(subscription_path))
> pass
> {code}
> Currently, the output from running this script looks something like:
> {noformat}
> Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
> Time elapsed: 0.0s, Number of open files: 160
> Time elapsed: 5.0s, Number of open files: 179
> Time elapsed: 11.0s, Number of open files: 247
> Time elapsed: 16.0s, Number of open files: 339
> Time elapsed: 21.0s, Number of open files: 436
> Time elapsed: 26.0s, Number of open files: 523
> Time elapsed: 31.0s, Number of open files: 615
> Time elapsed: 36.0s, Number of open files: 713
> Time elapsed: 41.0s, Number of open files: 809
> Time elapsed: 46.0s, Number of open files: 903
> Time elapsed: 52.0s, Number of open files: 999
> Time elapsed: 57.0s, Number of open files: 1095
> Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
> WARNING:root:The DirectPipelineResult is being garbage-collected while the
> DirectRunner is still running the corresponding pipeline. This may lead to
> incomplete execution of the pipeline if the main thread exits before pipeline
> completion. Consider using result.wait_until_finish() to wait for completion
> of pipeline execution.
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)