tvalentyn opened a new issue, #26343: URL: https://github.com/apache/beam/issues/26343
### What happened? Sample error from: https://ci-beam.apache.org/job/beam_PostCommit_Python39/1718/testReport/junit/apache_beam.io.gcp.bigquery_read_it_test/ReadAllBQTests/test_read_queries/ ``` Error Message apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/transforms/core.py", line 1960, in <lambda> wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/testing/util.py", line 191, in _equal raise BeamAssertException(msg) apache_beam.testing.util.BeamAssertException: Failed assert: [{'number': 1, 'str': 'abc'}, {'number': 2, 'str': 'def'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}, {'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}, {'number': 10, 'str': 'abcde', 'extra': 3}] == [{'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 2, 'str': 'def'}, {'number': 1, 'str': 'abc'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}], unexpected elements [{'number': 10, 'str': 'abcde', 'extra': 3}], missing elements [{'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}] ... self = <apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests testMethod=test_read_queries> @skip(['PortableRunner', 'FlinkRunner']) @pytest.mark.it_postcommit def test_read_queries(self): # TODO(https://github.com/apache/beam/issues/20610): Remove experiment when # tests run on r_v2. args = self.args + ["--experiments=use_runner_v2"] with beam.Pipeline(argv=args) as p: result = ( p | beam.Create([ beam.io.ReadFromBigQueryRequest(query=self.query1), beam.io.ReadFromBigQueryRequest( query=self.query2, use_standard_sql=False), beam.io.ReadFromBigQueryRequest( table='%s.%s' % (self.dataset_id, self.table_name3)) ]) | beam.io.ReadAllFromBigQuery()) > assert_that( result, equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3)) apache_beam/io/gcp/bigquery_read_it_test.py:809: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/pipeline.py:600: in __exit__ self.result = self.run() apache_beam/pipeline.py:550: in run return Pipeline.from_runner_api( apache_beam/pipeline.py:577: in run return self.runner.run_pipeline(self, self._options) apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline self.result.wait_until_finish(duration=wait_duration) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <DataflowPipelineResult <Job clientRequestId: '20230419080645509292-9631' createTime: '2023-04-19T08:06:55.403621Z' ...023-04-19T08:06:55.403621Z' steps: [] tempFiles: [] type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f6d540733a0> duration = None def wait_until_finish(self, duration=None): if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') consoleUrl = ( "Console URL: [https://console.cloud.google.com/"](https://console.cloud.google.com/) f"dataflow/jobs/<RegionId>/{self.job_id()}" "?project=<ProjectId>") thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, args=(self._runner, self, duration)) # Mark the thread as a daemon thread so a keyboard interrupt on the main # thread will terminate everything. This is also the reason we will not # use thread.join() to wait for the polling thread. thread.daemon = True thread.start() while thread.is_alive(): time.sleep(5.0) # TODO: Merge the termination code in poll_for_job_completion and # is_in_terminal_state. terminated = self.is_in_terminal_state() assert duration or terminated, ( 'Job did not reach to a terminal state after waiting indefinitely. ' '{}'.format(consoleUrl)) # TODO(https://github.com/apache/beam/issues/21695): Also run this check # if wait_until_finish was called after the pipeline completed. if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. _LOGGER.error(consoleUrl) > raise DataflowRuntimeException( 'Dataflow pipeline failed. State: %s, Error:\n%s' % (self.state, getattr(self._runner, 'last_error_msg', None)), self) E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: E Traceback (most recent call last): E File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process E File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process E File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window E File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/transforms/core.py", line 1960, in <lambda> E wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] E File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/testing/util.py", line 191, in _equal E raise BeamAssertException(msg) E apache_beam.testing.util.BeamAssertException: Failed assert: [{'number': 1, 'str': 'abc'}, {'number': 2, 'str': 'def'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}, {'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}, {'number': 10, 'str': 'abcde', 'extra': 3}] == [{'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 2, 'str': 'def'}, {'number': 1, 'str': 'abc'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}], unexpected elements [{'number': 10, 'str': 'abcde', 'extra': 3}], missing elements [{'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}] ... ``` ### Issue Priority Priority: 1 (data loss / total loss of function) ### Issue Components - [X] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [X] Component: IO connector - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
