[
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-9118:
----------------------------------
Resolution: Fixed
Status: Resolved (was: Resolved)
Hello! Due to a bug in our Jira configuration, this issue had status:Resolved
but resolution:Unresolved.
I am bulk editing these issues to have resolution:Fixed
If a different resolution is appropriate, please change it. To do this, click
the "Resolve" button (you can do this even for closed issues) and set the
Resolution field to the right value.
> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
> is flaky
> ------------------------------------------------------------------------------------------------
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Valentyn Tymofieiev
> Priority: P1
> Labels: beam-fixit, flake
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12 self =
> <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
> testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12
> 14:30:12 def test_pardo_unfusable_side_inputs(self):
> 14:30:12 def cross_product(elem, sides):
> 14:30:12 for side in sides:
> 14:30:12 yield elem, side
> 14:30:12 with self.create_pipeline() as p:
> 14:30:12 pcoll = p | beam.Create(['a', 'b'])
> 14:30:12 assert_that(
> 14:30:12 pcoll | beam.FlatMap(cross_product,
> beam.pvalue.AsList(pcoll)),
> 14:30:12 equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b',
> 'b')]))
> 14:30:12
> 14:30:12 with self.create_pipeline() as p:
> 14:30:12 pcoll = p | beam.Create(['a', 'b'])
> 14:30:12 derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12 assert_that(
> 14:30:12 pcoll | beam.FlatMap(cross_product,
> beam.pvalue.AsList(derived)),
> 14:30:12 > equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b',
> 'b')]))
> 14:30:12
> 14:30:12 apache_beam/runners/portability/fn_api_runner_test.py:258:
> 14:30:12 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _
> 14:30:12 apache_beam/pipeline.py:481: in __exit__
> 14:30:12 self.run().wait_until_finish()
> 14:30:12 apache_beam/runners/portability/portable_runner.py:445: in
> wait_until_finish
> 14:30:12 for state_response in self._state_stream:
> 14:30:12
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
> in __next__
> 14:30:12 return self._next()
> 14:30:12
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
> in _next
> 14:30:12 _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
> in wait
> 14:30:12 _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
> in _wait_once
> 14:30:12 wait_fn(timeout=timeout)
> 14:30:12 /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12 gotit = waiter.acquire(True, timeout)
> 14:30:12 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _
> 14:30:12
> 14:30:12 signum = 14, frame = <frame object at 0x31daf68>
> 14:30:12
> 14:30:12 def handler(signum, frame):
> 14:30:12 msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12 print('=' * 20, msg, '=' * 20)
> 14:30:12 traceback.print_stack(frame)
> 14:30:12 threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12 for thread_id, stack in sys._current_frames().items():
> 14:30:12 th = threads_by_id.get(thread_id)
> 14:30:12 print()
> 14:30:12 print('# Thread:', th or thread_id)
> 14:30:12 traceback.print_stack(stack)
> 14:30:12 > raise BaseException(msg)
> 14:30:12 E BaseException: Timed out after 60 seconds.
> 14:30:12
> 14:30:12 apache_beam/runners/portability/portable_runner_test.py:77:
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01 self =
> <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocessesAndMultiWorkers
> testMethod=test_assert_that>
> 09:06:01
> 09:06:01 def test_assert_that(self):
> 09:06:01 # TODO: figure out a way for fn_api_runner to parse and raise
> the
> 09:06:01 # underlying exception.
> 09:06:01 with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01 with self.create_pipeline() as p:
> 09:06:01 > assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01 E AssertionError: "Failed assert" does not match "Pipeline
> timed out waiting for job service subprocess."
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)