Abacn commented on issue #21121: URL: https://github.com/apache/beam/issues/21121#issuecomment-1282863115
I actually encountered it a couple of times, most recently yesterday: https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/25312/console Note that this test somehow not reported in "Test Result" but the logs show it is failing: ``` =================================== FAILURES =================================== 01:47:54 _______________ StreamingWordCountIT.test_streaming_wordcount_it _______________ 01:47:54 [gw0] linux -- Python 3.7.12 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7 01:47:54 01:47:54 self = <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT testMethod=test_streaming_wordcount_it> 01:47:54 01:47:54 @pytest.mark.it_postcommit 01:47:54 def test_streaming_wordcount_it(self): 01:47:54 # Build expected dataset. 01:47:54 expected_msg = [('%d: 1' % num).encode('utf-8') 01:47:54 for num in range(DEFAULT_INPUT_NUMBERS)] 01:47:54 01:47:54 # Set extra options to the pipeline for test purpose 01:47:54 state_verifier = PipelineStateMatcher(PipelineState.RUNNING) 01:47:54 pubsub_msg_verifier = PubSubMessageMatcher( 01:47:54 self.project, self.output_sub.name, expected_msg, timeout=400) 01:47:54 extra_opts = { 01:47:54 'input_subscription': self.input_sub.name, 01:47:54 'output_topic': self.output_topic.name, 01:47:54 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, 01:47:54 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier) 01:47:54 } 01:47:54 01:47:54 # Generate input data and inject to PubSub. 01:47:54 self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS) 01:47:54 01:47:54 # Get pipeline options from command argument: --test-pipeline-options, 01:47:54 # and start pipeline job by calling pipeline main function. 01:47:54 streaming_wordcount.run( 01:47:54 self.test_pipeline.get_full_options_as_args(**extra_opts), 01:47:54 > save_main_session=False) 01:47:54 01:47:54 apache_beam/examples/streaming_wordcount_it_test.py:120: 01:47:54 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 01:47:54 apache_beam/examples/streaming_wordcount.py:103: in run 01:47:54 output | beam.io.WriteToPubSub(known_args.output_topic) 01:47:54 apache_beam/pipeline.py:597: in __exit__ 01:47:54 self.result = self.run() 01:47:54 apache_beam/pipeline.py:574: in run 01:47:54 return self.runner.run_pipeline(self, self._options) 01:47:54 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 01:47:54 01:47:54 self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7f636ce38210> 01:47:54 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f6368208f10> 01:47:54 options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7f63682084d0> 01:47:54 01:47:54 def run_pipeline(self, pipeline, options): 01:47:54 """Execute test pipeline and verify test matcher""" 01:47:54 test_options = options.view_as(TestOptions) 01:47:54 on_success_matcher = test_options.on_success_matcher 01:47:54 wait_duration = test_options.wait_until_finish_duration 01:47:54 is_streaming = options.view_as(StandardOptions).streaming 01:47:54 01:47:54 # [BEAM-1889] Do not send this to remote workers also, there is no need to 01:47:54 # send this option to remote executors. 01:47:54 test_options.on_success_matcher = None 01:47:54 01:47:54 self.result = super().run_pipeline(pipeline, options) 01:47:54 if self.result.has_job: 01:47:54 # TODO(markflyhigh)(https://github.com/apache/beam/issues/18254): Use 01:47:54 # print since Nose dosen't show logs in some cases. 01:47:54 print('Worker logs: %s' % self.build_console_url(options)) 01:47:54 _LOGGER.info('Console log: ') 01:47:54 _LOGGER.info(self.build_console_url(options)) 01:47:54 01:47:54 try: 01:47:54 self.wait_until_in_state(PipelineState.RUNNING) 01:47:54 01:47:54 if is_streaming and not wait_duration: 01:47:54 _LOGGER.warning('Waiting indefinitely for streaming job.') 01:47:54 self.result.wait_until_finish(duration=wait_duration) 01:47:54 01:47:54 if on_success_matcher: 01:47:54 from hamcrest import assert_that as hc_assert_that 01:47:54 > hc_assert_that(self.result, pickler.loads(on_success_matcher)) 01:47:54 E AssertionError: 01:47:54 E Expected: (Test pipeline expected terminated in state: RUNNING and Expected 500 messages.) 01:47:54 E but: Expected 500 messages. Got 501 messages. Diffs (item, count): 01:47:54 E Expected but not in actual: dict_items([]) 01:47:54 E Unexpected: dict_items([(b'172: 1', 1)]) 01:47:54 E Unexpected (with all details): [(b'172: 1', {}, {}, DatetimeWithNanoseconds(2022, 10, 18, 5, 44, 30, 114000, tzinfo=datetime.timezone.utc), ''), (b'172: 1', {}, {}, DatetimeWithNanoseconds(2022, 10, 18, 5, 44, 29, 912000, tzinfo=datetime.timezone.utc), '')] 01:47:54 01:47:54 apache_beam/runners/dataflow/test_dataflow_runner.py:70: AssertionError ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
