[
https://issues.apache.org/jira/browse/BEAM-7446?focusedWorklogId=249720&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-249720
]
ASF GitHub Bot logged work on BEAM-7446:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/May/19 23:49
Start Date: 28/May/19 23:49
Worklog Time Spent: 10m
Work Description: ostrokach commented on pull request #8708: [BEAM-7446]
Close grpc channel after pulling messages from a Cloud Pub/Sub topic
URL: https://github.com/apache/beam/pull/8708
It appears that `pubsub.SubscriberClient()` creates a grpc channel which
must be closed explicitly after we are done pulling messages from that channel.
This may have been due to change that was introduced in [grpc
v1.12.0](https://github.com/grpc/grpc/releases/tag/v1.12.0) ("*a new
grpc.Channel.close method is introduced and correct use of gRPC Python now
requires that channels be closed after use.*"). If the underling grcp channel
is not closed and many `pubsub.SubscriberClient` instances are created, the
system may run out of available file handles, resulting in "too many open
files" errors.
A similar issue has been reported here: googleapis/google-cloud-python#5523,
and it appears that the solution is to explicitly close the grpc transport
channel after the subscription is no longer needed.
-----------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | --- | --- | --- | ---
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
<br> [](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
R: @charlesccychen @aaltay
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 249720)
Time Spent: 10m
Remaining Estimate: 0h
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a
> "too many open files" error
> --------------------------------------------------------------------------------------------------------
>
> Key: BEAM-7446
> URL: https://issues.apache.org/jira/browse/BEAM-7446
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.12.0
> Reporter: Alexey Strokach
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a
> "too many open files" error within 2 minutes for a busy topic.
> I am not exactly sure, but it appears that "pubsub.SubscriberClient()"
> creates a grpc channel which must be closed explicitly after we are done
> pulling messages from that channel. This may have been due to change that was
> introduced in [grpc
> v1.12.0|https://github.com/grpc/grpc/releases/tag/v1.12.0] ("_a new
> grpc.Channel.close method is introduced and correct use of gRPC Python now
> requires that channels be closed after use_"). If the underling grcp channel
> is not closed and many "pubsub.SubscriberClient" instances are created, the
> system may run out of available file handles, resulting in "too many open
> files" errors. A similar issue is reported here:
> [https://github.com/googleapis/google-cloud-python/issues/5523].
> The issue can be reproduced by running the following file:
> {code:java}
> # directrunner_streaming_tmof.py
> from __future__ import print_function
> import multiprocessing
> import os
> import subprocess
> import time
> import apache_beam as beam
> from google.cloud import pubsub_v1
> def count_open_files():
> """Count the number of files opened by current process."""
> pid = multiprocessing.current_process().pid
> lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
> num_open_files = len(lsof_out.strip().split("\n")) - 1
> return num_open_files
> def start_streaming_pipeline(project_id, subscription_path):
> """Create a simple streaming pipeline."""
> runner = beam.runners.direct.DirectRunner()
> pipeline_options = beam.pipeline.PipelineOptions(project=project_id,
> streaming=True)
> taxirides_pc = (
> #
> beam.Pipeline(runner=runner, options=pipeline_options)
> | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
> )
> results = taxirides_pc.pipeline.run()
> return results
> def monitor():
> """Periodically print the number of open files."""
> start_time = time.time()
> for _ in range(20):
> num_open_files = count_open_files()
> time_elapsed = time.time() - start_time
> print(
> "Time elapsed: {:<3s}s, Number of open files: {}".format(
> str(round(time_elapsed, 0)), num_open_files
> )
> )
> if num_open_files > 1000:
> break
> time.sleep(5)
> if __name__ == "__main__":
> project_id = "{project_id}"
> topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
> client = pubsub_v1.SubscriberClient()
> subscription_path = client.subscription_path(project_id,
> "taxirides-realtime-sub")
> subscription = client.create_subscription(subscription_path, topic_path)
> print("Subscription created: {}".format(subscription_path))
> try:
> results = start_streaming_pipeline(project_id, subscription_path)
> monitor()
> finally:
> client.delete_subscription(subscription_path)
> print("Subscription deleted: {}".format(subscription_path))
> pass
> {code}
> Currently, the output from running this script looks something like:
> {noformat}
> Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
> Time elapsed: 0.0s, Number of open files: 160
> Time elapsed: 5.0s, Number of open files: 179
> Time elapsed: 11.0s, Number of open files: 247
> Time elapsed: 16.0s, Number of open files: 339
> Time elapsed: 21.0s, Number of open files: 436
> Time elapsed: 26.0s, Number of open files: 523
> Time elapsed: 31.0s, Number of open files: 615
> Time elapsed: 36.0s, Number of open files: 713
> Time elapsed: 41.0s, Number of open files: 809
> Time elapsed: 46.0s, Number of open files: 903
> Time elapsed: 52.0s, Number of open files: 999
> Time elapsed: 57.0s, Number of open files: 1095
> Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
> WARNING:root:The DirectPipelineResult is being garbage-collected while the
> DirectRunner is still running the corresponding pipeline. This may lead to
> incomplete execution of the pipeline if the main thread exits before pipeline
> completion. Consider using result.wait_until_finish() to wait for completion
> of pipeline execution.
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)