Abacn commented on issue #21121:
URL: https://github.com/apache/beam/issues/21121#issuecomment-1282863115

   I actually encountered it a couple of times, most recently yesterday: 
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/25312/console
   
   Note that this test somehow not reported in "Test Result" but the logs show 
it is failing:
   ```
   =================================== FAILURES 
===================================
   01:47:54 _______________ StreamingWordCountIT.test_streaming_wordcount_it 
_______________
   01:47:54 [gw0] linux -- Python 3.7.12 
/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7
   01:47:54 
   01:47:54 self = 
<apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT 
testMethod=test_streaming_wordcount_it>
   01:47:54 
   01:47:54     @pytest.mark.it_postcommit
   01:47:54     def test_streaming_wordcount_it(self):
   01:47:54       # Build expected dataset.
   01:47:54       expected_msg = [('%d: 1' % num).encode('utf-8')
   01:47:54                       for num in range(DEFAULT_INPUT_NUMBERS)]
   01:47:54     
   01:47:54       # Set extra options to the pipeline for test purpose
   01:47:54       state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
   01:47:54       pubsub_msg_verifier = PubSubMessageMatcher(
   01:47:54           self.project, self.output_sub.name, expected_msg, 
timeout=400)
   01:47:54       extra_opts = {
   01:47:54           'input_subscription': self.input_sub.name,
   01:47:54           'output_topic': self.output_topic.name,
   01:47:54           'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
   01:47:54           'on_success_matcher': all_of(state_verifier, 
pubsub_msg_verifier)
   01:47:54       }
   01:47:54     
   01:47:54       # Generate input data and inject to PubSub.
   01:47:54       self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
   01:47:54     
   01:47:54       # Get pipeline options from command argument: 
--test-pipeline-options,
   01:47:54       # and start pipeline job by calling pipeline main function.
   01:47:54       streaming_wordcount.run(
   01:47:54           self.test_pipeline.get_full_options_as_args(**extra_opts),
   01:47:54 >         save_main_session=False)
   01:47:54 
   01:47:54 apache_beam/examples/streaming_wordcount_it_test.py:120: 
   01:47:54 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ 
   01:47:54 apache_beam/examples/streaming_wordcount.py:103: in run
   01:47:54     output | beam.io.WriteToPubSub(known_args.output_topic)
   01:47:54 apache_beam/pipeline.py:597: in __exit__
   01:47:54     self.result = self.run()
   01:47:54 apache_beam/pipeline.py:574: in run
   01:47:54     return self.runner.run_pipeline(self, self._options)
   01:47:54 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ 
   01:47:54 
   01:47:54 self = 
<apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 
0x7f636ce38210>
   01:47:54 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f6368208f10>
   01:47:54 options = <apache_beam.options.pipeline_options.PipelineOptions 
object at 0x7f63682084d0>
   01:47:54 
   01:47:54     def run_pipeline(self, pipeline, options):
   01:47:54       """Execute test pipeline and verify test matcher"""
   01:47:54       test_options = options.view_as(TestOptions)
   01:47:54       on_success_matcher = test_options.on_success_matcher
   01:47:54       wait_duration = test_options.wait_until_finish_duration
   01:47:54       is_streaming = options.view_as(StandardOptions).streaming
   01:47:54     
   01:47:54       # [BEAM-1889] Do not send this to remote workers also, there 
is no need to
   01:47:54       # send this option to remote executors.
   01:47:54       test_options.on_success_matcher = None
   01:47:54     
   01:47:54       self.result = super().run_pipeline(pipeline, options)
   01:47:54       if self.result.has_job:
   01:47:54         # 
TODO(markflyhigh)(https://github.com/apache/beam/issues/18254): Use
   01:47:54         # print since Nose dosen't show logs in some cases.
   01:47:54         print('Worker logs: %s' % self.build_console_url(options))
   01:47:54         _LOGGER.info('Console log: ')
   01:47:54         _LOGGER.info(self.build_console_url(options))
   01:47:54     
   01:47:54       try:
   01:47:54         self.wait_until_in_state(PipelineState.RUNNING)
   01:47:54     
   01:47:54         if is_streaming and not wait_duration:
   01:47:54           _LOGGER.warning('Waiting indefinitely for streaming job.')
   01:47:54         self.result.wait_until_finish(duration=wait_duration)
   01:47:54     
   01:47:54         if on_success_matcher:
   01:47:54           from hamcrest import assert_that as hc_assert_that
   01:47:54 >         hc_assert_that(self.result, 
pickler.loads(on_success_matcher))
   01:47:54 E         AssertionError: 
   01:47:54 E         Expected: (Test pipeline expected terminated in state: 
RUNNING and Expected 500 messages.)
   01:47:54 E              but: Expected 500 messages. Got 501 messages. Diffs 
(item, count):
   01:47:54 E           Expected but not in actual: dict_items([])
   01:47:54 E           Unexpected: dict_items([(b'172: 1', 1)])
   01:47:54 E           Unexpected (with all details): [(b'172: 1', {}, {}, 
DatetimeWithNanoseconds(2022, 10, 18, 5, 44, 30, 114000, 
tzinfo=datetime.timezone.utc), ''), (b'172: 1', {}, {}, 
DatetimeWithNanoseconds(2022, 10, 18, 5, 44, 29, 912000, 
tzinfo=datetime.timezone.utc), '')]
   01:47:54 
   01:47:54 apache_beam/runners/dataflow/test_dataflow_runner.py:70: 
AssertionError
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to