tvalentyn opened a new issue, #37779:
URL: https://github.com/apache/beam/issues/37779

   ### What happened?
   
   Saw the below error on 
https://github.com/apache/beam/actions/runs/22721571966/job/65884874499?pr=37725:
   
   
   ```
   =================================== FAILURES 
===================================
   ______ TestAnomalyDetection.test_multiple_detectors_without_aggregation_0 
______
   [gw2] linux -- Python 3.13.3 
/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/bin/python
   
   a = (<apache_beam.ml.anomaly.transforms_test.TestAnomalyDetection 
testMethod=test_multiple_detectors_without_aggregation_0>,)
   kw = {}
   
       @wraps(func)
       def standalone_func(*a, **kw):
   >       return func(*(a + p.args), **p.kwargs, **kw)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   
   
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/parameterized/parameterized.py:620:
 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/ml/anomaly/transforms_test.py:254: in 
test_multiple_detectors_without_aggregation
       with beam.Pipeline() as p:
            ^^^^^^^^^^^^^^^
   
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:655:
 in __exit__
       self.result.wait_until_finish()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.runners.portability.portable_runner.PipelineResult 
object at 0x7cf663edb380>
   duration = None
   
       def wait_until_finish(self, duration=None):
         """
         :param duration: The maximum time in milliseconds to wait for the 
result of
         the execution. If None or zero, will wait until the pipeline finishes.
         :return: The result of the pipeline, i.e. PipelineResult.
         """
         last_error_text = None
       
         def read_messages() -> None:
           nonlocal last_error_text
           previous_state = -1
           for message in self._message_stream:
             if message.HasField('message_response'):
               mr = message.message_response
               logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", 
mr.message_text)
               if mr.importance == 
beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
                 last_error_text = mr.message_text
             else:
               current_state = message.state_response.state
               if current_state != previous_state:
                 _LOGGER.info(
                     "Job state changed to %s",
                     self.runner_api_state_to_pipeline_state(current_state))
                 previous_state = current_state
             self._messages.append(message)
       
         message_thread = threading.Thread(
             target=read_messages, name='wait_until_finish_read')
         message_thread.daemon = True
         message_thread.start()
       
         if duration:
           state_thread = threading.Thread(
               target=functools.partial(self._observe_state, message_thread),
               name='wait_until_finish_state_observer')
   E         File "apache_beam/runners/common.py", line 1588, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
   E           raise exn
   E         File "apache_beam/runners/common.py", line 1498, in 
apache_beam.runners.common.DoFnRunner.process
   E           return self.do_fn_invoker.invoke_process(windowed_value)
   E         File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
   E           self.output_handler.handle_process_outputs(
   E         File "apache_beam/runners/common.py", line 1683, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
   E           self._write_value_to_tag(tag, windowed_value, 
watermark_estimator)
   E         File "apache_beam/runners/common.py", line 1796, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
   E           self.main_receivers.receive(windowed_value)
   E         File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
   E           self.consumer.process(windowed_value)
   E         File "apache_beam/runners/worker/operations.py", line 954, in 
apache_beam.runners.worker.operations.DoOperation.process
   E           with self.scoped_process_state:
   E         File "apache_beam/runners/worker/operations.py", line 955, in 
apache_beam.runners.worker.operations.DoOperation.process
   E           delayed_applications = self.dofn_runner.process(o)
   E         File "apache_beam/runners/common.py", line 1500, in 
apache_beam.runners.common.DoFnRunner.process
   E           self._reraise_augmented(exn, windowed_value)
   E         File "apache_beam/runners/common.py", line 1609, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
   E           raise new_exn
   E         File "apache_beam/runners/common.py", line 1498, in 
apache_beam.runners.common.DoFnRunner.process
   E           return self.do_fn_invoker.invoke_process(windowed_value)
   E         File "apache_beam/runners/common.py", line 912, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
   E           self._invoke_process_per_window(
   E         File "apache_beam/runners/common.py", line 1057, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
   E           self.process_method(*args_for_process, **kwargs_for_process),
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/transforms/core.py",
 line 2116, in <lambda>
   E           wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   E                                                 ~~^^^^^^^^^^^^^^^^^^^^
   E         File 
"/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/testing/util.py",
 line 203, in _equal
   E           raise BeamAssertException(msg)
   E       apache_beam.testing.util.BeamAssertException: Failed assert: [(1, 
AnomalyResult(example=Row(x1=1, x2=4), 
predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, 
threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, 
info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, 
info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, 
x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', 
score=2.1213203435596424, label=0, threshold=3, info='', 
source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0, 
label=0, threshold=2, info='', source_predictions=None)])), (1, 
AnomalyResult(example=Row(x1=10, x2=4), predictions=[AnomalyPrediction(model_id
 ='zscore_x1', score=8.0, label=1, threshold=3, info='', 
source_predictions=None), AnomalyPrediction(model_id='zscore_x2', 
score=0.5773502691896252, label=0, threshold=2, info='', 
source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=10), 
predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, 
label=0, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, 
info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', 
score=0.16452254913212455, label=0, threshold=3, info='', 
source_predictions=None), AnomalyPrediction(model_id='zscore_x2', 
score=0.5368754921931594, label=0, threshold=2, info='', 
source_predictions=None)])), (2, AnomalyResult(example=Row(x1=100, x2=5), 
predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, 
threshold=3, info='', source_predictions=None), AnomalyPrediction(model_
 id='zscore_x2', score=nan, label=-2, threshold=2, info='', 
source_predictions=None)]))] == [(1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=1, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, 
info='', source_predictions=None)])), (1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, 
x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, 
info='', source_predictions=None)])), (1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=10, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=8.0, label=1, 
threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=0.57735
 02691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, 
info='', source_predictions=None)])), (1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, 
threshold=2, info='', source_predictions=None)])), (2, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=100, 
x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=nan, label=
 -2, threshold=2, info='', source_predictions=None)])), (1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, 
x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', 
score=2.1213203435596424, label=0, threshold=3, info='', 
source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.0, 
label=0, threshold=2, info='', source_predictions=None)]))], unexpected 
elements [(1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, 
x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, 
info='', source_predictions=None)])), (1, 
AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, 
label=-2, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=0.5368754
 921931594, label=0, threshold=2, info='', source_predictions=None)]))], 
missing elements [(1, AnomalyResult(example=Row(x1=2, x2=10), 
predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, 
label=0, threshold=3, info='', source_predictions=None), 
AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, 
info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, 
x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', 
score=0.16452254913212455, label=0, threshold=3, info='', 
source_predictions=None), AnomalyPrediction(model_id='zscore_x2', 
score=0.5368754921931594, label=0, threshold=2, info='', 
source_predictions=None)]))] [while running 'assert_that/Match']
   ```
   
   ### Issue Failure
   
   Failure: Test is flaky
   
   ### Issue Priority
   
   Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be 
sure the product is healthy)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to