Alexey Strokach created BEAM-7446:
-------------------------------------

             Summary: Reading from a Cloud Pub/Sub topic inside Python's 
DirectRunner results in a "too many open files" error
                 Key: BEAM-7446
                 URL: https://issues.apache.org/jira/browse/BEAM-7446
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.12.0
            Reporter: Alexey Strokach


Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
"too many open files" error within 2 minutes for a busy topic.

I am not exactly sure, but it appears that "pubsub.SubscriberClient()" creates 
a grpc channel which must be closed explicitly after we are done pulling 
messages from that channel. This may have been due to change that was 
introduced in grpc v1.12.0 
([https://github.com/grpc/grpc/releases/tag/v1.12.0]) ("_a new 
grpc.Channel.close method is introduced and correct use of gRPC Python now 
requires that channels be closed after use_"). If the underling grcp channel is 
not closed and many "pubsub.SubscriberClient" instances are created, the system 
may run out of available file handles, resulting in "too many open files" 
errors. A similar issue is reported here: 
[https://github.com/googleapis/google-cloud-python/issues/5523].

The issue can be reproduced by running the following file:
{code}
# directrunner_streaming_tmof.py
from __future__ import print_function

import multiprocessing
import os
import subprocess
import time

import apache_beam as beam
from google.cloud import pubsub_v1


def count_open_files():
    """Count the number of files opened by current process."""
    pid = multiprocessing.current_process().pid
    lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
    num_open_files = len(lsof_out.strip().split("\n")) - 1
    return num_open_files


def start_streaming_pipeline(project_id, subscription_path):
    """Create a simple streaming pipeline."""
    runner = beam.runners.direct.DirectRunner()
    pipeline_options = beam.pipeline.PipelineOptions(project=project_id, 
streaming=True)
    taxirides_pc = (
        #
        beam.Pipeline(runner=runner, options=pipeline_options)
        | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
    )
    results = taxirides_pc.pipeline.run()
    return results


def monitor():
    """Periodically print the number of open files."""
    start_time = time.time()
    for _ in range(20):
        num_open_files = count_open_files()
        time_elapsed = time.time() - start_time
        print(
            "Time elapsed: {:<3s}s, Number of open files: {}".format(
                str(round(time_elapsed, 0)), num_open_files
            )
        )
        if num_open_files > 1000:
            break
        time.sleep(5)


if __name__ == "__main__":
    project_id = "{project_id}"
    topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"

    client = pubsub_v1.SubscriberClient()
    subscription_path = client.subscription_path(project_id, 
"taxirides-realtime-sub")

    subscription = client.create_subscription(subscription_path, topic_path)
    print("Subscription created: {}".format(subscription_path))
    try:
        results = start_streaming_pipeline(project_id, subscription_path)
        monitor()
    finally:
        client.delete_subscription(subscription_path)
        print("Subscription deleted: {}".format(subscription_path))
        pass
{code}
Currently, the output from running this script looks something like:
{noformat}
Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
Time elapsed: 0.0s, Number of open files: 160
Time elapsed: 5.0s, Number of open files: 179
Time elapsed: 11.0s, Number of open files: 247
Time elapsed: 16.0s, Number of open files: 339
Time elapsed: 21.0s, Number of open files: 436
Time elapsed: 26.0s, Number of open files: 523
Time elapsed: 31.0s, Number of open files: 615
Time elapsed: 36.0s, Number of open files: 713
Time elapsed: 41.0s, Number of open files: 809
Time elapsed: 46.0s, Number of open files: 903
Time elapsed: 52.0s, Number of open files: 999
Time elapsed: 57.0s, Number of open files: 1095
Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
WARNING:root:The DirectPipelineResult is being garbage-collected while the 
DirectRunner is still running the corresponding pipeline. This may lead to 
incomplete execution of the pipeline if the main thread exits before pipeline 
completion. Consider using result.wait_until_finish() to wait for completion of 
pipeline execution.
{noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to