tvalentyn commented on issue #21121:
URL: https://github.com/apache/beam/issues/21121#issuecomment-1319154978

   encountered again:
   
   ```
   _____ StreamingWordCountIT.test_streaming_wordcount_it _______________
   [gw0] linux -- Python 3.7.12 
/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7
   
   self = 
<apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT 
testMethod=test_streaming_wordcount_it>
   
       @pytest.mark.it_postcommit
       def test_streaming_wordcount_it(self):
         # Build expected dataset.
         expected_msg = [('%d: 1' % num).encode('utf-8')
                         for num in range(DEFAULT_INPUT_NUMBERS)]
       
         # Set extra options to the pipeline for test purpose
         state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
         pubsub_msg_verifier = PubSubMessageMatcher(
             self.project, self.output_sub.name, expected_msg, timeout=400)
         extra_opts = {
             'input_subscription': self.input_sub.name,
             'output_topic': self.output_topic.name,
             'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
             'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
         }
       
         # Generate input data and inject to PubSub.
         self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
       
         # Get pipeline options from command argument: --test-pipeline-options,
         # and start pipeline job by calling pipeline main function.
         streaming_wordcount.run(
             self.test_pipeline.get_full_options_as_args(**extra_opts),
   >         save_main_session=False)
   
   apache_beam/examples/streaming_wordcount_it_test.py:120: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/examples/streaming_wordcount.py:103: in run
       output | beam.io.WriteToPubSub(known_args.output_topic)
   apache_beam/pipeline.py:600: in __exit__
       self.result = self.run()
   apache_beam/pipeline.py:577: in run
       return self.runner.run_pipeline(self, self._options)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner 
object at 0x7fb48857f0d0>
   pipeline = <apache_beam.pipeline.Pipeline object at 0x7fb488584490>
   options = <apache_beam.options.pipeline_options.PipelineOptions object at 
0x7fb4885841d0>
   
       def run_pipeline(self, pipeline, options):
         """Execute test pipeline and verify test matcher"""
         test_options = options.view_as(TestOptions)
         on_success_matcher = test_options.on_success_matcher
         wait_duration = test_options.wait_until_finish_duration
         is_streaming = options.view_as(StandardOptions).streaming
       
         # [BEAM-1889] Do not send this to remote workers also, there is no 
need to
         # send this option to remote executors.
         test_options.on_success_matcher = None
       
         self.result = super().run_pipeline(pipeline, options)
         if self.result.has_job:
           # TODO(markflyhigh)(https://github.com/apache/beam/issues/18254): Use
           # print since Nose dosen't show logs in some cases.
           print('Worker logs: %s' % self.build_console_url(options))
           _LOGGER.info('Console log: ')
           _LOGGER.info(self.build_console_url(options))
       
         try:
           self.wait_until_in_state(PipelineState.RUNNING)
       
           if is_streaming and not wait_duration:
             _LOGGER.warning('Waiting indefinitely for streaming job.')
           self.result.wait_until_finish(duration=wait_duration)
       
           if on_success_matcher:
             from hamcrest import assert_that as hc_assert_that
   >         hc_assert_that(self.result, pickler.loads(on_success_matcher))
   E         AssertionError: 
   E         Expected: (Test pipeline expected terminated in state: RUNNING and 
Expected 500 messages.)
   E              but: Expected 500 messages. Got 486 messages. Diffs (item, 
count):
   E           Expected but not in actual: dict_items([(b'11: 1', 1), (b'159: 
1', 1), (b'161: 1', 1), (b'176: 1', 1), (b'195: 1', 1), (b'202: 1', 1), (b'203: 
1', 1), (b'217: 1', 1), (b'219: 1', 1), (b'277: 1', 1), (b'320: 1', 1), (b'446: 
1', 1), (b'466: 1', 1), (b'485: 1', 1)])
   E           Unexpected: dict_items([])
   E           Unexpected (with all details): []
   
   apache_beam/runners/dataflow/test_dataflow_runner.py:70: AssertionError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to