Valentyn Tymofieiev created BEAM-12861:
------------------------------------------

             Summary: 
apache_beam.ml.gcp.recommendations_ai_test_it.RecommendationAIIT.test_create_catalog_item
  is flaky
                 Key: BEAM-12861
                 URL: https://issues.apache.org/jira/browse/BEAM-12861
             Project: Beam
          Issue Type: Bug
          Components: io-py-gcp
            Reporter: Valentyn Tymofieiev


apache_beam.ml.gcp.recommendations_ai_test_it.RecommendationAIIT.test_create_catalog_item
 

sample error:
{noformat}

self = <apache_beam.ml.gcp.recommendations_ai_test_it.RecommendationAIIT 
testMethod=test_create_catalog_item>

    def test_create_catalog_item(self):
    
      CATALOG_ITEM = {
          "id": str(int(random.randrange(100000))),
          "title": "Sample laptop",
          "description": "Indisputably the most fantastic laptop ever created.",
          "language_code": "en",
          "category_hierarchies": [{
              "categories": ["Electronic", "Computers"]
          }]
      }
    
      with TestPipeline(is_integration_test=True) as p:
        output = (
            p | 'Create data' >> beam.Create([CATALOG_ITEM])
            | 'Create CatalogItem' >>
            recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT)
            | beam.ParDo(extract_id) | beam.combiners.ToList())
    
>       assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))

apache_beam/ml/gcp/recommendations_ai_test_it.py:80: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:586: in __exit__
    self.result = self.run()
apache_beam/testing/test_pipeline.py:114: in run
    False if self.not_use_test_runner_api else test_runner_api))
apache_beam/pipeline.py:541: in run
    self._options).run(False)
apache_beam/pipeline.py:565: in run
    return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline
    self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <DataflowPipelineResult <Job
 createTime: '2021-08-25T07:09:43.529173Z'
 currentState: CurrentStateValueValuesEnum(JOB...021-08-25T07:09:43.529173Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f74c484b6d8>
duration = None

    def wait_until_finish(self, duration=None):
      if not self.is_in_terminal_state():
        if not self.has_job:
          raise IOError('Failed to get the Dataflow job id.')
    
        thread = threading.Thread(
            target=DataflowRunner.poll_for_job_completion,
            args=(self._runner, self, duration))
    
        # Mark the thread as a daemon thread so a keyboard interrupt on the main
        # thread will terminate everything. This is also the reason we will not
        # use thread.join() to wait for the polling thread.
        thread.daemon = True
        thread.start()
        while thread.is_alive():
          time.sleep(5.0)
    
        # TODO: Merge the termination code in poll_for_job_completion and
        # is_in_terminal_state.
        terminated = self.is_in_terminal_state()
        assert duration or terminated, (
            'Job did not reach to a terminal state after waiting indefinitely.')
    
        if terminated and self.state != PipelineState.DONE:
          # TODO(BEAM-1290): Consider converting this to an error log based on
          # theresolution of the issue.
          raise DataflowRuntimeException(
              'Dataflow pipeline failed. State: %s, Error:\n%s' %
              (self.state, getattr(self._runner, 'last_error_msg', None)),
>             self)
E         
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow 
pipeline failed. State: FAILED, Error:
E         Traceback (most recent call last):
E           File "apache_beam/runners/common.py", line 1232, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 752, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
E           File "apache_beam/runners/common.py", line 877, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/transforms/core.py",
 line 1560, in <lambda>
E             wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E           File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/testing/util.py",
 line 190, in _equal
E             raise BeamAssertException(msg)
E         apache_beam.testing.util.BeamAssertException: Failed assert: 
[['4709']] == [[]], unexpected elements [[]], missing elements [['4709']]
E         
E         During handling of the above exception, another exception occurred:
E         
E         Traceback (most recent call last):
E           File 
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 
651, in do_work
E             work_executor.execute()
E           File 
"/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, 
in execute
E             op.start()
E           File "dataflow_worker/shuffle_operations.py", line 63, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
E           File "dataflow_worker/shuffle_operations.py", line 64, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
E           File "dataflow_worker/shuffle_operations.py", line 79, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
E           File "dataflow_worker/shuffle_operations.py", line 80, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
E           File "dataflow_worker/shuffle_operations.py", line 84, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
E           File "apache_beam/runners/worker/operations.py", line 353, in 
apache_beam.runners.worker.operations.Operation.output
E           File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
E           File "dataflow_worker/shuffle_operations.py", line 261, in 
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
E           File "dataflow_worker/shuffle_operations.py", line 268, in 
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
E           File "apache_beam/runners/worker/operations.py", line 353, in 
apache_beam.runners.worker.operations.Operation.output
E           File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
E           File "apache_beam/runners/worker/operations.py", line 712, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/worker/operations.py", line 713, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/common.py", line 1234, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 1299, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
E           File "apache_beam/runners/common.py", line 1232, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 571, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
E           File "apache_beam/runners/common.py", line 1395, in 
apache_beam.runners.common._OutputProcessor.process_outputs
E           File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
E           File "apache_beam/runners/worker/operations.py", line 712, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/worker/operations.py", line 713, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/common.py", line 1234, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 1299, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
E           File "apache_beam/runners/common.py", line 1232, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 571, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
E           File "apache_beam/runners/common.py", line 1395, in 
apache_beam.runners.common._OutputProcessor.process_outputs
E           File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
E           File "apache_beam/runners/worker/operations.py", line 712, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/worker/operations.py", line 713, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/common.py", line 1234, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 1299, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
E           File "apache_beam/runners/common.py", line 1232, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 571, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
E           File "apache_beam/runners/common.py", line 1395, in 
apache_beam.runners.common._OutputProcessor.process_outputs
E           File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
E           File "apache_beam/runners/worker/operations.py", line 712, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/worker/operations.py", line 713, in 
apache_beam.runners.worker.operations.DoOperation.process
E           File "apache_beam/runners/common.py", line 1234, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 1315, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
E           File "apache_beam/runners/common.py", line 1232, in 
apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 752, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
E           File "apache_beam/runners/common.py", line 877, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/transforms/core.py",
 line 1560, in <lambda>
E             wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E           File 
"/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/testing/util.py",
 line 190, in _equal
E             raise BeamAssertException(msg)
E         apache_beam.testing.util.BeamAssertException: Failed assert: 
[['4709']] == [[]], unexpected elements [[]], missing elements [['4709']] 
[while running 'assert_that/Match']

apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to