shunping opened a new pull request, #39132:
URL: https://github.com/apache/beam/pull/39132

   Flaky test: 
https://github.com/apache/beam/actions/runs/28270530208/job/83766814632?pr=39130
   
   ```
   _______________ ApproximateQuantilesTest.test_batched_quantiles 
________________
   [gw3] linux -- Python 3.10.20 
/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloudcoverage/py310-cloudcoverage/bin/python
   
   self = <apache_beam.transforms.stats_test.ApproximateQuantilesTest 
testMethod=test_batched_quantiles>
   
       def test_batched_quantiles(self):
   >     with TestPipeline() as p:
   
   apache_beam/transforms/stats_test.py:482: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/pipeline.py:652: in __exit__
       self.result = self.run()
   apache_beam/testing/test_pipeline.py:123: in run
       state = result.wait_until_finish(duration=self.timeout)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.runners.portability.portable_runner.PipelineResult 
object at 0x7bd4656ec820>
   duration = None
   
       def wait_until_finish(self, duration=None):
         """
         :param duration: The maximum time in milliseconds to wait for the 
result of
         the execution. If None or zero, will wait until the pipeline finishes.
         :return: The result of the pipeline, i.e. PipelineResult.
         """
         last_error_text = None
       
         def read_messages() -> None:
           nonlocal last_error_text
           previous_state = -1
           for message in self._message_stream:
             if message.HasField('message_response'):
               mr = message.message_response
               logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", 
mr.message_text)
               if mr.importance == 
beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
                 last_error_text = mr.message_text
             else:
               current_state = message.state_response.state
               if current_state != previous_state:
                 _LOGGER.info(
                     "Job state changed to %s",
                     self.runner_api_state_to_pipeline_state(current_state))
                 previous_state = current_state
             self._messages.append(message)
       
         message_thread = threading.Thread(
             target=read_messages, name='wait_until_finish_read')
         message_thread.daemon = True
         message_thread.start()
       
         if duration:
           state_thread = threading.Thread(
               target=functools.partial(self._observe_state, message_thread),
               name='wait_until_finish_state_observer')
           state_thread.daemon = True
           state_thread.start()
           start_time = time.time()
           duration_secs = duration / 1000
           while (time.time() - start_time < duration_secs and
   E           self._write_value_to_tag(tag, windowed_value, 
watermark_estimator)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1797, in _write_value_to_tag
   E           self.main_receivers.receive(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py",
 line 264, in receive
   E           self.consumer.process(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py",
 line 956, in process
   E           delayed_applications = self.dofn_runner.process(o)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1501, in process
   E           self._reraise_augmented(exn, windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1589, in _reraise_augmented
   E           raise exn
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1499, in process
   E           return self.do_fn_invoker.invoke_process(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 685, in invoke_process
   E           self.output_handler.handle_process_outputs(
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1684, in handle_process_outputs
   E           self._write_value_to_tag(tag, windowed_value, 
watermark_estimator)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1797, in _write_value_to_tag
   E           self.main_receivers.receive(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py",
 line 264, in receive
   E           self.consumer.process(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py",
 line 956, in process
   E           delayed_applications = self.dofn_runner.process(o)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1501, in process
   E           self._reraise_augmented(exn, windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1589, in _reraise_augmented
   E           raise exn
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1499, in process
   E           return self.do_fn_invoker.invoke_process(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 685, in invoke_process
   E           self.output_handler.handle_process_outputs(
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1684, in handle_process_outputs
   E           self._write_value_to_tag(tag, windowed_value, 
watermark_estimator)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1797, in _write_value_to_tag
   E           self.main_receivers.receive(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py",
 line 264, in receive
   E           self.consumer.process(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/worker/operations.py",
 line 956, in process
   E           delayed_applications = self.dofn_runner.process(o)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1501, in process
   E           self._reraise_augmented(exn, windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1610, in _reraise_augmented
   E           raise new_exn
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1499, in process
   E           return self.do_fn_invoker.invoke_process(windowed_value)
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 913, in invoke_process
   E           self._invoke_process_per_window(
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/runners/common.py",
 line 1058, in _invoke_process_per_window
   E           self.process_method(*args_for_process, **kwargs_for_process),
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/transforms/core.py",
 line 2126, in <lambda>
   E           wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/testing/util.py",
 line 202, in _equal
   E           raise BeamAssertException(msg)
   E       apache_beam.testing.util.BeamAssertException: Failed assert: 
[[(99.9, 499), (72.5, 225), (50.0, 0)]] == [[(99.9, 499), (22.5, 275), (50.0, 
0)]], unexpected elements [[(99.9, 499), (22.5, 275), (50.0, 0)]], missing 
elements [[(99.9, 499), (72.5, 225), (50.0, 0)]] [while running 
'checkGloballyWithKeyAndReversed/Match']
   
   apache_beam/runners/portability/portable_runner.py:572: RuntimeError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to