[ 
https://issues.apache.org/jira/browse/BEAM-7446?focusedWorklogId=249720&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-249720
 ]

ASF GitHub Bot logged work on BEAM-7446:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/May/19 23:49
            Start Date: 28/May/19 23:49
    Worklog Time Spent: 10m 
      Work Description: ostrokach commented on pull request #8708: [BEAM-7446] 
Close grpc channel after pulling messages from a Cloud Pub/Sub topic
URL: https://github.com/apache/beam/pull/8708
 
 
   It appears that `pubsub.SubscriberClient()` creates a grpc channel which 
must be closed explicitly after we are done pulling messages from that channel. 
This may have been due to change that was introduced in [grpc 
v1.12.0](https://github.com/grpc/grpc/releases/tag/v1.12.0) ("*a new 
grpc.Channel.close method is introduced and correct use of gRPC Python now 
requires that channels be closed after use.*"). If the underling grcp channel 
is not closed and many `pubsub.SubscriberClient` instances are created, the 
system may run out of available file handles, resulting in "too many open 
files" errors.
   
   A similar issue has been reported here: googleapis/google-cloud-python#5523, 
and it appears that the solution is to explicitly close the grpc transport 
channel after the subscription is no longer needed.
   
   -----------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
 <br> [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 
   Portable | --- | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
   R: @charlesccychen @aaltay 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 249720)
            Time Spent: 10m
    Remaining Estimate: 0h

> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
> "too many open files" error
> --------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7446
>                 URL: https://issues.apache.org/jira/browse/BEAM-7446
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.12.0
>            Reporter: Alexey Strokach
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a 
> "too many open files" error within 2 minutes for a busy topic.
> I am not exactly sure, but it appears that "pubsub.SubscriberClient()" 
> creates a grpc channel which must be closed explicitly after we are done 
> pulling messages from that channel. This may have been due to change that was 
> introduced in [grpc 
> v1.12.0|https://github.com/grpc/grpc/releases/tag/v1.12.0] ("_a new 
> grpc.Channel.close method is introduced and correct use of gRPC Python now 
> requires that channels be closed after use_"). If the underling grcp channel 
> is not closed and many "pubsub.SubscriberClient" instances are created, the 
> system may run out of available file handles, resulting in "too many open 
> files" errors. A similar issue is reported here: 
> [https://github.com/googleapis/google-cloud-python/issues/5523].
> The issue can be reproduced by running the following file:
> {code:java}
> # directrunner_streaming_tmof.py
> from __future__ import print_function
> import multiprocessing
> import os
> import subprocess
> import time
> import apache_beam as beam
> from google.cloud import pubsub_v1
> def count_open_files():
>     """Count the number of files opened by current process."""
>     pid = multiprocessing.current_process().pid
>     lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
>     num_open_files = len(lsof_out.strip().split("\n")) - 1
>     return num_open_files
> def start_streaming_pipeline(project_id, subscription_path):
>     """Create a simple streaming pipeline."""
>     runner = beam.runners.direct.DirectRunner()
>     pipeline_options = beam.pipeline.PipelineOptions(project=project_id, 
> streaming=True)
>     taxirides_pc = (
>         #
>         beam.Pipeline(runner=runner, options=pipeline_options)
>         | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
>     )
>     results = taxirides_pc.pipeline.run()
>     return results
> def monitor():
>     """Periodically print the number of open files."""
>     start_time = time.time()
>     for _ in range(20):
>         num_open_files = count_open_files()
>         time_elapsed = time.time() - start_time
>         print(
>             "Time elapsed: {:<3s}s, Number of open files: {}".format(
>                 str(round(time_elapsed, 0)), num_open_files
>             )
>         )
>         if num_open_files > 1000:
>             break
>         time.sleep(5)
> if __name__ == "__main__":
>     project_id = "{project_id}"
>     topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
>     client = pubsub_v1.SubscriberClient()
>     subscription_path = client.subscription_path(project_id, 
> "taxirides-realtime-sub")
>     subscription = client.create_subscription(subscription_path, topic_path)
>     print("Subscription created: {}".format(subscription_path))
>     try:
>         results = start_streaming_pipeline(project_id, subscription_path)
>         monitor()
>     finally:
>         client.delete_subscription(subscription_path)
>         print("Subscription deleted: {}".format(subscription_path))
>         pass
> {code}
> Currently, the output from running this script looks something like:
> {noformat}
> Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
> Time elapsed: 0.0s, Number of open files: 160
> Time elapsed: 5.0s, Number of open files: 179
> Time elapsed: 11.0s, Number of open files: 247
> Time elapsed: 16.0s, Number of open files: 339
> Time elapsed: 21.0s, Number of open files: 436
> Time elapsed: 26.0s, Number of open files: 523
> Time elapsed: 31.0s, Number of open files: 615
> Time elapsed: 36.0s, Number of open files: 713
> Time elapsed: 41.0s, Number of open files: 809
> Time elapsed: 46.0s, Number of open files: 903
> Time elapsed: 52.0s, Number of open files: 999
> Time elapsed: 57.0s, Number of open files: 1095
> Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
> WARNING:root:The DirectPipelineResult is being garbage-collected while the 
> DirectRunner is still running the corresponding pipeline. This may lead to 
> incomplete execution of the pipeline if the main thread exits before pipeline 
> completion. Consider using result.wait_until_finish() to wait for completion 
> of pipeline execution.
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to