[ 
https://issues.apache.org/jira/browse/BEAM-9767?focusedWorklogId=423660&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-423660
 ]

ASF GitHub Bot logged work on BEAM-9767:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Apr/20 19:04
            Start Date: 16/Apr/20 19:04
    Worklog Time Spent: 10m 
      Work Description: rohdesamuel commented on pull request #11440: 
[BEAM-9767] Add a timeout to the TestStream GRPC and fix the Streaming cache 
timeout
URL: https://github.com/apache/beam/pull/11440
 
 
   Change-Id: I33908eab8313a90829a2115029f87b7f2f454f1b
   
   The TestStream read from GRPC method didn't have a timeout and had the 
possibility of hanging indefinitely. This adds a 30s inactivity timeout. This 
also fixes the StreamingCache waiting for file timeout which didn't work.
   
   This doesn't necessarily fix BEAM-9767 because it looks more like a problem 
interacting with the test environment. This will make it so that the test times 
out after 30s instead of 600s.
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 
   Portable | --- | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 423660)
    Remaining Estimate: 0h
            Time Spent: 10m

> test_streaming_wordcount flaky timeouts
> ---------------------------------------
>
>                 Key: BEAM-9767
>                 URL: https://issues.apache.org/jira/browse/BEAM-9767
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core, test-failures
>            Reporter: Udi Meiri
>            Assignee: Sam Rohde
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Timed out after 600s, typically completes in 2.8s on my workstation.
> https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/
> {code}
> self = 
> <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest
>  testMethod=test_streaming_wordcount>
>     @unittest.skipIf(
>         sys.version_info < (3, 5, 3),
>         'The tests require at least Python 3.6 to work.')
>     def test_streaming_wordcount(self):
>       class WordExtractingDoFn(beam.DoFn):
>         def process(self, element):
>           text_line = element.strip()
>           words = text_line.split()
>           return words
>     
>       # Add the TestStream so that it can be cached.
>       ib.options.capturable_sources.add(TestStream)
>       ib.options.capture_duration = timedelta(seconds=5)
>     
>       p = beam.Pipeline(
>           runner=interactive_runner.InteractiveRunner(),
>           options=StandardOptions(streaming=True))
>     
>       data = (
>           p
>           | TestStream()
>               .advance_watermark_to(0)
>               .advance_processing_time(1)
>               .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
>               .advance_watermark_to(20)
>               .advance_processing_time(1)
>               .add_elements(['that', 'is', 'the', 'question'])
>           | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
>     
>       counts = (
>           data
>           | 'split' >> beam.ParDo(WordExtractingDoFn())
>           | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
>           | 'group' >> beam.GroupByKey()
>           | 'count' >> beam.Map(lambda wordones: (wordones[0], 
> sum(wordones[1]))))
>     
>       # Watch the local scope for Interactive Beam so that referenced 
> PCollections
>       # will be cached.
>       ib.watch(locals())
>     
>       # This is normally done in the interactive_utils when a transform is
>       # applied but needs an IPython environment. So we manually run this 
> here.
>       ie.current_env().track_user_pipelines()
>     
>       # Create a fake limiter that cancels the BCJ once the main job receives 
> the
>       # expected amount of results.
>       class FakeLimiter:
>         def __init__(self, p, pcoll):
>           self.p = p
>           self.pcoll = pcoll
>     
>         def is_triggered(self):
>           result = ie.current_env().pipeline_result(self.p)
>           if result:
>             try:
>               results = result.get(self.pcoll)
>             except ValueError:
>               return False
>             return len(results) >= 10
>           return False
>     
>       # This sets the limiters to stop reading when the test receives 10 
> elements
>       # or after 5 seconds have elapsed (to eliminate the possibility of 
> hanging).
>       ie.current_env().options.capture_control.set_limiters_for_test(
>           [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
>     
>       # This tests that the data was correctly cached.
>       pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
>       expected_data_df = pd.DataFrame([
>           ('to', 0, [IntervalWindow(0, 10)], pane_info),
>           ('be', 0, [IntervalWindow(0, 10)], pane_info),
>           ('or', 0, [IntervalWindow(0, 10)], pane_info),
>           ('not', 0, [IntervalWindow(0, 10)], pane_info),
>           ('to', 0, [IntervalWindow(0, 10)], pane_info),
>           ('be', 0, [IntervalWindow(0, 10)], pane_info),
>           ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
>       ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
>     
> >     data_df = ib.collect(data, include_window_info=True)
> apache_beam/runners/interactive/interactive_runner_test.py:237: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> apache_beam/runners/interactive/interactive_beam.py:451: in collect
>     return head(pcoll, n=-1, include_window_info=include_window_info)
> apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator
>     return func(*args, **kwargs)
> apache_beam/runners/interactive/interactive_beam.py:515: in head
>     result.wait_until_finish()
> apache_beam/runners/interactive/interactive_runner.py:250: in 
> wait_until_finish
>     self._underlying_result.wait_until_finish()
> apache_beam/runners/direct/direct_runner.py:455: in wait_until_finish
>     self._executor.await_completion()
> apache_beam/runners/direct/executor.py:439: in await_completion
>     self._executor.await_completion()
> apache_beam/runners/direct/executor.py:484: in await_completion
>     update = self.visible_updates.take()
> apache_beam/runners/direct/executor.py:557: in take
>     item = self._queue.get(timeout=1)
> /usr/lib/python3.6/queue.py:173: in get
>     self.not_empty.wait(remaining)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> self = <Condition(<unlocked _thread.lock object at 0x7f2f85244198>, 0)>
> timeout = 0.9999979329295456
>     def wait(self, timeout=None):
>         """Wait until notified or until a timeout occurs.
>     
>         If the calling thread has not acquired the lock when this method is
>         called, a RuntimeError is raised.
>     
>         This method releases the underlying lock, and then blocks until it is
>         awakened by a notify() or notify_all() call for the same condition
>         variable in another thread, or until the optional timeout occurs. Once
>         awakened or timed out, it re-acquires the lock and returns.
>     
>         When the timeout argument is present and not None, it should be a
>         floating point number specifying a timeout for the operation in 
> seconds
>         (or fractions thereof).
>     
>         When the underlying lock is an RLock, it is not released using its
>         release() method, since this may not actually unlock the lock when it
>         was acquired multiple times recursively. Instead, an internal 
> interface
>         of the RLock class is used, which really unlocks it even when it has
>         been recursively acquired several times. Another internal interface is
>         then used to restore the recursion level when the lock is reacquired.
>     
>         """
>         if not self._is_owned():
>             raise RuntimeError("cannot wait on un-acquired lock")
>         waiter = _allocate_lock()
>         waiter.acquire()
>         self._waiters.append(waiter)
>         saved_state = self._release_save()
>         gotit = False
>         try:    # restore state no matter what (e.g., KeyboardInterrupt)
>             if timeout is None:
>                 waiter.acquire()
>                 gotit = True
>             else:
>                 if timeout > 0:
> >                   gotit = waiter.acquire(True, timeout)
> E                   Failed: Timeout >600.0s
> /usr/lib/python3.6/threading.py:299: Failed
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to