[
https://issues.apache.org/jira/browse/BEAM-12928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419638#comment-17419638
]
Jan Lukavský commented on BEAM-12928:
-------------------------------------
Not sure, but looking at first glance, it seems that the reason of the fail is
some sort of communication error between Flink JobManager and TaskManager. That
can generally happen from time to time. How often does this test fail for this
reason? The core of the problem seems to be that
apache_beam.utils.subprocess_server:subprocess_server.py:122
b'org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy'
It might help if we changed the restart strategy, but apparently FlinkRunner
currently does not expose that:
[https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/]
CC [~dmvk] can you tell from the logs the reason of the AkkaAskTimeout? I'm
afraid that it can simply happen occasionally, because Akka has at-most-once
delivery.
> beam_PostCommit_Python36 - CrossLanguageSpannerIOTest - flakey failing
> ----------------------------------------------------------------------
>
> Key: BEAM-12928
> URL: https://issues.apache.org/jira/browse/BEAM-12928
> Project: Beam
> Issue Type: Bug
> Components: test-failures
> Reporter: Alex Amato
> Assignee: Piotr Szuberski
> Priority: P1
> Labels: currently-failing, flake
>
> _Use this form to file an issue for test failure_
> _https://ci-beam.apache.org/job/beam_PostCommit_Python36/4385/_
>
> *
> [apache_beam.io.gcp.tests.xlang_spannerio_it_test.CrossLanguageSpannerIOTest.test_spanner_read_query|https://ci-beam.apache.org/job/beam_PostCommit_Python36/4385/testReport/junit/apache_beam.io.gcp.tests.xlang_spannerio_it_test/CrossLanguageSpannerIOTest/test_spanner_read_query/]
> *
> [apache_beam.io.gcp.tests.xlang_spannerio_it_test.CrossLanguageSpannerIOTest.test_spanner_read_table|https://ci-beam.apache.org/job/beam_PostCommit_Python36/4385/testReport/junit/apache_beam.io.gcp.tests.xlang_spannerio_it_test/CrossLanguageSpannerIOTest/test_spanner_read_table/]
>
> Initial investigation:
>
> Error Message
> RuntimeError: Pipeline
> BeamApp-jenkins-0921123257-8b921bc2_f20c9b92-9261-4f0b-9d6c-31732a12287b
> failed in state FAILED: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/rpc/taskmanager_0#31887928]] after [10000 ms].
> Message of type [org.apache.flink.runtime.rpc.messages.LocalRpcInvocation]. A
> typical reason for `AskTimeoutException` is that the recipient actor didn't
> send a reply.
> Stacktrace
> self =
> <apache_beam.io.gcp.tests.xlang_spannerio_it_test.CrossLanguageSpannerIOTest
> testMethod=test_spanner_read_query>
> def test_spanner_read_query(self):
> self.insert_read_values('query_read')
> > self.run_read_pipeline('query_read', query=f'SELECT * FROM \{self.table}')
> apache_beam/io/gcp/tests/xlang_spannerio_it_test.py:194:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> apache_beam/io/gcp/tests/xlang_spannerio_it_test.py:220: in run_read_pipeline
> SpannerTestRow(f_int64=2, f_string=f'\{prefix}2', f_boolean=False),
> apache_beam/pipeline.py:590: in __exit__
> self.result = self.run()
> apache_beam/testing/test_pipeline.py:116: in run
> state = result.wait_until_finish()
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _
> self = <apache_beam.runners.portability.portable_runner.PipelineResult object
> at 0x7fa74c495860>
> duration = None
> def wait_until_finish(self, duration=None):
> """
> :param duration: The maximum time in milliseconds to wait for the result of
> the execution. If None or zero, will wait until the pipeline finishes.
> :return: The result of the pipeline, i.e. PipelineResult.
> """
> def read_messages():
> # type: () -> None
> previous_state = -1
> for message in self._message_stream:
> if message.HasField('message_response'):
> logging.log(
> MESSAGE_LOG_LEVELS[message.message_response.importance],
> "%s",
> message.message_response.message_text)
> else:
> current_state = message.state_response.state
> if current_state != previous_state:
> _LOGGER.info(
> "Job state changed to %s",
> self._runner_api_state_to_pipeline_state(current_state))
> previous_state = current_state
> self._messages.append(message)
>
> message_thread = threading.Thread(
> target=read_messages, name='wait_until_finish_read')
> message_thread.daemon = True
> message_thread.start()
>
> if duration:
> state_thread = threading.Thread(
> target=functools.partial(self._observe_state, message_thread),
> name='wait_until_finish_state_observer')
> state_thread.daemon = True
> state_thread.start()
> start_time = time.time()
> duration_secs = duration / 1000
> while (time.time() - start_time < duration_secs and
> state_thread.is_alive()):
> time.sleep(1)
> else:
> self._observe_state(message_thread)
>
> if self._runtime_exception:
> > raise self._runtime_exception
> E RuntimeError: Pipeline
> BeamApp-jenkins-0921123257-8b921bc2_f20c9b92-9261-4f0b-9d6c-31732a12287b
> failed in state FAILED: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/rpc/taskmanager_0#31887928]] after [10000 ms].
> Message of type [org.apache.flink.runtime.rpc.messages.LocalRpcInvocation]. A
> typical reason for `AskTimeoutException` is that the recipient actor didn't
> send a reply.
> apache_beam/runners/portability/portable_runner.py:600: RuntimeError
>
> ----
> _After you've filled out the above details, please [assign the issue to an
> individual|https://beam.apache.org/contribute/postcommits-guides/index.html#find_specialist].
> Assignee should [treat test failures as
> high-priority|https://beam.apache.org/contribute/postcommits-policies/#assigned-failing-test],
> helping to fix the issue or find a more appropriate owner. See [Apache Beam
> Post-Commit
> Policies|https://beam.apache.org/contribute/postcommits-policies]._
--
This message was sent by Atlassian Jira
(v8.3.4#803005)