tvalentyn opened a new issue, #26343:
URL: https://github.com/apache/beam/issues/26343

   ### What happened?
   
   Sample error from: 
https://ci-beam.apache.org/job/beam_PostCommit_Python39/1718/testReport/junit/apache_beam.io.gcp.bigquery_read_it_test/ReadAllBQTests/test_read_queries/
   
   ```
   Error Message
   apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: 
Dataflow pipeline failed. State: FAILED, Error:
   Traceback (most recent call last):
     File "apache_beam/runners/common.py", line 1418, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 838, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 984, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/transforms/core.py",
 line 1960, in <lambda>
       wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
     File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/testing/util.py",
 line 191, in _equal
       raise BeamAssertException(msg)
   apache_beam.testing.util.BeamAssertException: Failed assert: [{'number': 1, 
'str': 'abc'}, {'number': 2, 'str': 'def'}, {'number': 3, 'str': '你好'}, 
{'number': 4, 'str': 'привет'}, {'number': 10, 'str': 'abcd'}, {'number': 20, 
'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}, 
{'number': 10, 'str': 'abcde', 'extra': 3}] == [{'number': 10, 'str': 'abcde', 
'extra': 3}, {'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 2, 'str': 
'def'}, {'number': 1, 'str': 'abc'}, {'number': 3, 'str': '你好'}, {'number': 4, 
'str': 'привет'}], unexpected elements [{'number': 10, 'str': 'abcde', 'extra': 
3}], missing elements [{'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 
'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}]
   
   ...
   
   self = <apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests 
testMethod=test_read_queries>
   
       @skip(['PortableRunner', 'FlinkRunner'])
       @pytest.mark.it_postcommit
       def test_read_queries(self):
         # TODO(https://github.com/apache/beam/issues/20610): Remove experiment 
when
         # tests run on r_v2.
         args = self.args + ["--experiments=use_runner_v2"]
         with beam.Pipeline(argv=args) as p:
           result = (
               p
               | beam.Create([
                   beam.io.ReadFromBigQueryRequest(query=self.query1),
                   beam.io.ReadFromBigQueryRequest(
                       query=self.query2, use_standard_sql=False),
                   beam.io.ReadFromBigQueryRequest(
                       table='%s.%s' % (self.dataset_id, self.table_name3))
               ])
               | beam.io.ReadAllFromBigQuery())
   >       assert_that(
               result,
               equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + 
self.TABLE_DATA_3))
   
   apache_beam/io/gcp/bigquery_read_it_test.py:809: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/pipeline.py:600: in __exit__
       self.result = self.run()
   apache_beam/pipeline.py:550: in run
       return Pipeline.from_runner_api(
   apache_beam/pipeline.py:577: in run
       return self.runner.run_pipeline(self, self._options)
   apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline
       self.result.wait_until_finish(duration=wait_duration)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <DataflowPipelineResult <Job
    clientRequestId: '20230419080645509292-9631'
    createTime: '2023-04-19T08:06:55.403621Z'
   ...023-04-19T08:06:55.403621Z'
    steps: []
    tempFiles: []
    type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f6d540733a0>
   duration = None
   
       def wait_until_finish(self, duration=None):
         if not self.is_in_terminal_state():
           if not self.has_job:
             raise IOError('Failed to get the Dataflow job id.')
           consoleUrl = (
               "Console URL: 
[https://console.cloud.google.com/";](https://console.cloud.google.com/)
               f"dataflow/jobs/<RegionId>/{self.job_id()}"
               "?project=<ProjectId>")
           thread = threading.Thread(
               target=DataflowRunner.poll_for_job_completion,
               args=(self._runner, self, duration))
       
           # Mark the thread as a daemon thread so a keyboard interrupt on the 
main
           # thread will terminate everything. This is also the reason we will 
not
           # use thread.join() to wait for the polling thread.
           thread.daemon = True
           thread.start()
           while thread.is_alive():
             time.sleep(5.0)
       
           # TODO: Merge the termination code in poll_for_job_completion and
           # is_in_terminal_state.
           terminated = self.is_in_terminal_state()
           assert duration or terminated, (
               'Job did not reach to a terminal state after waiting 
indefinitely. '
               '{}'.format(consoleUrl))
       
           # TODO(https://github.com/apache/beam/issues/21695): Also run this 
check
           # if wait_until_finish was called after the pipeline completed.
           if terminated and self.state != PipelineState.DONE:
             # TODO(BEAM-1290): Consider converting this to an error log based 
on
             # theresolution of the issue.
             _LOGGER.error(consoleUrl)
   >         raise DataflowRuntimeException(
                 'Dataflow pipeline failed. State: %s, Error:\n%s' %
                 (self.state, getattr(self._runner, 'last_error_msg', None)),
                 self)
   E         
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow 
pipeline failed. State: FAILED, Error:
   E         Traceback (most recent call last):
   E           File "apache_beam/runners/common.py", line 1418, in 
apache_beam.runners.common.DoFnRunner.process
   E           File "apache_beam/runners/common.py", line 838, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
   E           File "apache_beam/runners/common.py", line 984, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
   E           File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/transforms/core.py",
 line 1960, in <lambda>
   E             wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   E           File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/testing/util.py",
 line 191, in _equal
   E             raise BeamAssertException(msg)
   E         apache_beam.testing.util.BeamAssertException: Failed assert: 
[{'number': 1, 'str': 'abc'}, {'number': 2, 'str': 'def'}, {'number': 3, 'str': 
'你好'}, {'number': 4, 'str': 'привет'}, {'number': 10, 'str': 'abcd'}, 
{'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 
'str': 'привет'}, {'number': 10, 'str': 'abcde', 'extra': 3}] == [{'number': 
10, 'str': 'abcde', 'extra': 3}, {'number': 10, 'str': 'abcde', 'extra': 3}, 
{'number': 2, 'str': 'def'}, {'number': 1, 'str': 'abc'}, {'number': 3, 'str': 
'你好'}, {'number': 4, 'str': 'привет'}], unexpected elements [{'number': 10, 
'str': 'abcde', 'extra': 3}], missing elements [{'number': 10, 'str': 'abcd'}, 
{'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 
'str': 'привет'}]
   ...
   ```
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to