[ 
https://issues.apache.org/jira/browse/BEAM-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Strokach closed BEAM-7446.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 2.14.0

> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
> "too many open files" error
> --------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7446
>                 URL: https://issues.apache.org/jira/browse/BEAM-7446
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.12.0
>            Reporter: Alexey Strokach
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 2.14.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
> "too many open files" error within 2 minutes for a busy topic.
> I am not exactly sure, but it appears that "pubsub.SubscriberClient()" 
> creates a grpc channel which must be closed explicitly after we are done 
> pulling messages from that channel. This may have been due to change that was 
> introduced in [grpc 
> v1.12.0|https://github.com/grpc/grpc/releases/tag/v1.12.0] ("_a new 
> grpc.Channel.close method is introduced and correct use of gRPC Python now 
> requires that channels be closed after use_"). If the underling grcp channel 
> is not closed and many "pubsub.SubscriberClient" instances are created, the 
> system may run out of available file handles, resulting in "too many open 
> files" errors. A similar issue is reported here: 
> [https://github.com/googleapis/google-cloud-python/issues/5523].
> The issue can be reproduced by running the following file:
> {code:java}
> # directrunner_streaming_tmof.py
> from __future__ import print_function
> import multiprocessing
> import os
> import subprocess
> import time
> import apache_beam as beam
> from google.cloud import pubsub_v1
> def count_open_files():
>     """Count the number of files opened by current process."""
>     pid = multiprocessing.current_process().pid
>     lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
>     num_open_files = len(lsof_out.strip().split("\n")) - 1
>     return num_open_files
> def start_streaming_pipeline(project_id, subscription_path):
>     """Create a simple streaming pipeline."""
>     runner = beam.runners.direct.DirectRunner()
>     pipeline_options = beam.pipeline.PipelineOptions(project=project_id, 
> streaming=True)
>     taxirides_pc = (
>         #
>         beam.Pipeline(runner=runner, options=pipeline_options)
>         | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
>     )
>     results = taxirides_pc.pipeline.run()
>     return results
> def monitor():
>     """Periodically print the number of open files."""
>     start_time = time.time()
>     for _ in range(20):
>         num_open_files = count_open_files()
>         time_elapsed = time.time() - start_time
>         print(
>             "Time elapsed: {:<3s}s, Number of open files: {}".format(
>                 str(round(time_elapsed, 0)), num_open_files
>             )
>         )
>         if num_open_files > 1000:
>             break
>         time.sleep(5)
> if __name__ == "__main__":
>     project_id = "{project_id}"
>     topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
>     client = pubsub_v1.SubscriberClient()
>     subscription_path = client.subscription_path(project_id, 
> "taxirides-realtime-sub")
>     subscription = client.create_subscription(subscription_path, topic_path)
>     print("Subscription created: {}".format(subscription_path))
>     try:
>         results = start_streaming_pipeline(project_id, subscription_path)
>         monitor()
>     finally:
>         client.delete_subscription(subscription_path)
>         print("Subscription deleted: {}".format(subscription_path))
>         pass
> {code}
> Currently, the output from running this script looks something like:
> {noformat}
> Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
> Time elapsed: 0.0s, Number of open files: 160
> Time elapsed: 5.0s, Number of open files: 179
> Time elapsed: 11.0s, Number of open files: 247
> Time elapsed: 16.0s, Number of open files: 339
> Time elapsed: 21.0s, Number of open files: 436
> Time elapsed: 26.0s, Number of open files: 523
> Time elapsed: 31.0s, Number of open files: 615
> Time elapsed: 36.0s, Number of open files: 713
> Time elapsed: 41.0s, Number of open files: 809
> Time elapsed: 46.0s, Number of open files: 903
> Time elapsed: 52.0s, Number of open files: 999
> Time elapsed: 57.0s, Number of open files: 1095
> Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
> WARNING:root:The DirectPipelineResult is being garbage-collected while the 
> DirectRunner is still running the corresponding pipeline. This may lead to 
> incomplete execution of the pipeline if the main thread exits before pipeline 
> completion. Consider using result.wait_until_finish() to wait for completion 
> of pipeline execution.
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to