[ https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17015534#comment-17015534 ]
Robert Bradshaw commented on BEAM-9118: --------------------------------------- I'm not aware of anything that has changed here recently. On Tue, Jan 14, 2020 at 3:44 PM Valentyn Tymofieiev (Jira) > apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses > is flaky > ------------------------------------------------------------------------------------------------ > > Key: BEAM-9118 > URL: https://issues.apache.org/jira/browse/BEAM-9118 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core > Reporter: Valentyn Tymofieiev > Assignee: Robert Bradshaw > Priority: Major > > Sample errors: > https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373 > {noformat} > 4:30:12 self = > <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses > testMethod=test_pardo_unfusable_side_inputs> > 14:30:12 > 14:30:12 def test_pardo_unfusable_side_inputs(self): > 14:30:12 def cross_product(elem, sides): > 14:30:12 for side in sides: > 14:30:12 yield elem, side > 14:30:12 with self.create_pipeline() as p: > 14:30:12 pcoll = p | beam.Create(['a', 'b']) > 14:30:12 assert_that( > 14:30:12 pcoll | beam.FlatMap(cross_product, > beam.pvalue.AsList(pcoll)), > 14:30:12 equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', > 'b')])) > 14:30:12 > 14:30:12 with self.create_pipeline() as p: > 14:30:12 pcoll = p | beam.Create(['a', 'b']) > 14:30:12 derived = ((pcoll,) | beam.Flatten() > 14:30:12 | beam.Map(lambda x: (x, x)) > 14:30:12 | beam.GroupByKey() > 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0])) > 14:30:12 assert_that( > 14:30:12 pcoll | beam.FlatMap(cross_product, > beam.pvalue.AsList(derived)), > 14:30:12 > equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', > 'b')])) > 14:30:12 > 14:30:12 apache_beam/runners/portability/fn_api_runner_test.py:258: > 14:30:12 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ > 14:30:12 apache_beam/pipeline.py:481: in __exit__ > 14:30:12 self.run().wait_until_finish() > 14:30:12 apache_beam/runners/portability/portable_runner.py:445: in > wait_until_finish > 14:30:12 for state_response in self._state_stream: > 14:30:12 > target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416: > in __next__ > 14:30:12 return self._next() > 14:30:12 > target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694: > in _next > 14:30:12 _common.wait(self._state.condition.wait, _response_ready) > 14:30:12 > target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140: > in wait > 14:30:12 _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb) > 14:30:12 > target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105: > in _wait_once > 14:30:12 wait_fn(timeout=timeout) > 14:30:12 /usr/lib/python3.6/threading.py:299: in wait > 14:30:12 gotit = waiter.acquire(True, timeout) > 14:30:12 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ > 14:30:12 > 14:30:12 signum = 14, frame = <frame object at 0x31daf68> > 14:30:12 > 14:30:12 def handler(signum, frame): > 14:30:12 msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS > 14:30:12 print('=' * 20, msg, '=' * 20) > 14:30:12 traceback.print_stack(frame) > 14:30:12 threads_by_id = {th.ident: th for th in threading.enumerate()} > 14:30:12 for thread_id, stack in sys._current_frames().items(): > 14:30:12 th = threads_by_id.get(thread_id) > 14:30:12 print() > 14:30:12 print('# Thread:', th or thread_id) > 14:30:12 traceback.print_stack(stack) > 14:30:12 > raise BaseException(msg) > 14:30:12 E BaseException: Timed out after 60 seconds. > 14:30:12 > 14:30:12 apache_beam/runners/portability/portable_runner_test.py:77: > BaseException > {noformat} > https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/ > {noformat} > 09:06:01 self = > <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocessesAndMultiWorkers > testMethod=test_assert_that> > 09:06:01 > 09:06:01 def test_assert_that(self): > 09:06:01 # TODO: figure out a way for fn_api_runner to parse and raise > the > 09:06:01 # underlying exception. > 09:06:01 with self.assertRaisesRegex(Exception, 'Failed assert'): > 09:06:01 with self.create_pipeline() as p: > 09:06:01 > assert_that(p | beam.Create(['a', 'b']), equal_to(['a'])) > 09:06:01 E AssertionError: "Failed assert" does not match "Pipeline > timed out waiting for job service subprocess." > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)