shunping opened a new pull request, #39132: URL: https://github.com/apache/beam/pull/39132
Flaky test: https://github.com/apache/beam/actions/runs/28270530208/job/83766814632?pr=39130 ``` _______________ ApproximateQuantilesTest.test_batched_quantiles ________________ [gw3] linux -- Python 3.10.20 /runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloudcoverage/py310-cloudcoverage/bin/python self = <apache_beam.transforms.stats_test.ApproximateQuantilesTest testMethod=test_batched_quantiles> def test_batched_quantiles(self): > with TestPipeline() as p: apache_beam/transforms/stats_test.py:482: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/pipeline.py:652: in __exit__ self.result = self.run() apache_beam/testing/test_pipeline.py:123: in run state = result.wait_until_finish(duration=self.timeout) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7bd4656ec820> duration = None def wait_until_finish(self, duration=None): """ :param duration: The maximum time in milliseconds to wait for the result of the execution. If None or zero, will wait until the pipeline finishes. :return: The result of the pipeline, i.e. PipelineResult. """ last_error_text = None def read_messages() -> None: nonlocal last_error_text previous_state = -1 for message in self._message_stream: if message.HasField('message_response'): mr = message.message_response logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text) if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR: last_error_text = mr.message_text else: current_state = message.state_response.state if current_state != previous_state: _LOGGER.info( "Job state changed to %s", self.runner_api_state_to_pipeline_state(current_state)) previous_state = current_state self._messages.append(message) message_thread = threading.Thread( target=read_messages, name='wait_until_finish_read') message_thread.daemon = True message_thread.start() if duration: state_thread = threading.Thread( target=functools.partial(self._observe_state, message_thread), name='wait_until_finish_state_observer') state_thread.daemon = True state_thread.start() start_time = time.time() duration_secs = duration / 1000 while (time.time() - start_time < duration_secs and E self._write_value_to_tag(tag, windowed_value, watermark_estimator) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1797, in _write_value_to_tag E self.main_receivers.receive(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 264, in receive E self.consumer.process(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 956, in process E delayed_applications = self.dofn_runner.process(o) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1501, in process E self._reraise_augmented(exn, windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1589, in _reraise_augmented E raise exn E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1499, in process E return self.do_fn_invoker.invoke_process(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 685, in invoke_process E self.output_handler.handle_process_outputs( E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1684, in handle_process_outputs E self._write_value_to_tag(tag, windowed_value, watermark_estimator) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1797, in _write_value_to_tag E self.main_receivers.receive(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 264, in receive E self.consumer.process(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 956, in process E delayed_applications = self.dofn_runner.process(o) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1501, in process E self._reraise_augmented(exn, windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1589, in _reraise_augmented E raise exn E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1499, in process E return self.do_fn_invoker.invoke_process(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 685, in invoke_process E self.output_handler.handle_process_outputs( E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1684, in handle_process_outputs E self._write_value_to_tag(tag, windowed_value, watermark_estimator) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1797, in _write_value_to_tag E self.main_receivers.receive(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 264, in receive E self.consumer.process(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py", line 956, in process E delayed_applications = self.dofn_runner.process(o) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1501, in process E self._reraise_augmented(exn, windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1610, in _reraise_augmented E raise new_exn E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1499, in process E return self.do_fn_invoker.invoke_process(windowed_value) E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 913, in invoke_process E self._invoke_process_per_window( E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py", line 1058, in _invoke_process_per_window E self.process_method(*args_for_process, **kwargs_for_process), E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/transforms/core.py", line 2126, in <lambda> E wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/testing/util.py", line 202, in _equal E raise BeamAssertException(msg) E apache_beam.testing.util.BeamAssertException: Failed assert: [[(99.9, 499), (72.5, 225), (50.0, 0)]] == [[(99.9, 499), (22.5, 275), (50.0, 0)]], unexpected elements [[(99.9, 499), (22.5, 275), (50.0, 0)]], missing elements [[(99.9, 499), (72.5, 225), (50.0, 0)]] [while running 'checkGloballyWithKeyAndReversed/Match'] apache_beam/runners/portability/portable_runner.py:572: RuntimeError ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
