[ 
https://issues.apache.org/jira/browse/BEAM-13218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ananda Prasad Inguva updated BEAM-13218:
----------------------------------------
    Description: 
One of the integration tests is failing in PostCommits: 

[https://ci-beam.apache.org/job/beam_PostCommit_Python37/4496/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/]

 

[https://ci-beam.apache.org/job/beam_PostCommit_Python38/1893/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/]
{code:java}
Error MessageAssertionError:  Expected: (Test pipeline expected terminated in 
state: RUNNING and Expected 4 messages.)      but: Expected 4 messages. Got 4 
messages. Diffs (item, count):   Expected but not in actual: 
dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), 
(PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])   
Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 
'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), 
(PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 
'PubSubMessageMatcher error: expected attribute not found.'}), 1)])   Stripped 
attributes: ['id', 'timestamp']Stacktraceself = 
<apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest 
testMethod=test_streaming_with_attributes>

    @pytest.mark.it_postcommit
    def test_streaming_with_attributes(self):
>     self._test_streaming(with_attributes=True)

apache_beam/io/gcp/pubsub_integration_test.py:213: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/io/gcp/pubsub_integration_test.py:205: in _test_streaming
    timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
apache_beam/io/gcp/pubsub_it_pipeline.py:93: in run_pipeline
    result = p.run()
apache_beam/pipeline.py:573: in run
    return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner 
object at 0x7fa1d1561950>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fa1d1561750>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 
0x7fa1d15614d0>

    def run_pipeline(self, pipeline, options):
      """Execute test pipeline and verify test matcher"""
      test_options = options.view_as(TestOptions)
      on_success_matcher = test_options.on_success_matcher
      wait_duration = test_options.wait_until_finish_duration
      is_streaming = options.view_as(StandardOptions).streaming
    
      # [BEAM-1889] Do not send this to remote workers also, there is no need to
      # send this option to remote executors.
      test_options.on_success_matcher = None
    
      self.result = super().run_pipeline(pipeline, options)
      if self.result.has_job:
        # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
        # in some cases.
        print('Worker logs: %s' % self.build_console_url(options))
    
      try:
        self.wait_until_in_state(PipelineState.RUNNING)
    
        if is_streaming and not wait_duration:
          _LOGGER.warning('Waiting indefinitely for streaming job.')
        self.result.wait_until_finish(duration=wait_duration)
    
        if on_success_matcher:
          from hamcrest import assert_that as hc_assert_that
>         hc_assert_that(self.result, pickler.loads(on_success_matcher))
E         AssertionError: 
E         Expected: (Test pipeline expected terminated in state: RUNNING and 
Expected 4 messages.)
E              but: Expected 4 messages. Got 4 messages. Diffs (item, count):
E           Expected but not in actual: 
dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), 
(PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])
E           Unexpected: dict_items([(PubsubMessage(b'data001-seen', 
{'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected 
attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', 
{'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected 
attribute not found.'}), 1)])
E           Stripped attributes: ['id', 'timestamp']

apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError {code}

  was:
One of the integration tests is failing: 

[https://ci-beam.apache.org/job/beam_PostCommit_Python37/4496/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/]

 

https://ci-beam.apache.org/job/beam_PostCommit_Python38/1893/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/
{code:java}
Error MessageAssertionError:  Expected: (Test pipeline expected terminated in 
state: RUNNING and Expected 4 messages.)      but: Expected 4 messages. Got 4 
messages. Diffs (item, count):   Expected but not in actual: 
dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), 
(PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])   
Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 
'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), 
(PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 
'PubSubMessageMatcher error: expected attribute not found.'}), 1)])   Stripped 
attributes: ['id', 'timestamp']Stacktraceself = 
<apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest 
testMethod=test_streaming_with_attributes>

    @pytest.mark.it_postcommit
    def test_streaming_with_attributes(self):
>     self._test_streaming(with_attributes=True)

apache_beam/io/gcp/pubsub_integration_test.py:213: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/io/gcp/pubsub_integration_test.py:205: in _test_streaming
    timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
apache_beam/io/gcp/pubsub_it_pipeline.py:93: in run_pipeline
    result = p.run()
apache_beam/pipeline.py:573: in run
    return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner 
object at 0x7fa1d1561950>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fa1d1561750>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 
0x7fa1d15614d0>

    def run_pipeline(self, pipeline, options):
      """Execute test pipeline and verify test matcher"""
      test_options = options.view_as(TestOptions)
      on_success_matcher = test_options.on_success_matcher
      wait_duration = test_options.wait_until_finish_duration
      is_streaming = options.view_as(StandardOptions).streaming
    
      # [BEAM-1889] Do not send this to remote workers also, there is no need to
      # send this option to remote executors.
      test_options.on_success_matcher = None
    
      self.result = super().run_pipeline(pipeline, options)
      if self.result.has_job:
        # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
        # in some cases.
        print('Worker logs: %s' % self.build_console_url(options))
    
      try:
        self.wait_until_in_state(PipelineState.RUNNING)
    
        if is_streaming and not wait_duration:
          _LOGGER.warning('Waiting indefinitely for streaming job.')
        self.result.wait_until_finish(duration=wait_duration)
    
        if on_success_matcher:
          from hamcrest import assert_that as hc_assert_that
>         hc_assert_that(self.result, pickler.loads(on_success_matcher))
E         AssertionError: 
E         Expected: (Test pipeline expected terminated in state: RUNNING and 
Expected 4 messages.)
E              but: Expected 4 messages. Got 4 messages. Diffs (item, count):
E           Expected but not in actual: 
dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), 
(PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])
E           Unexpected: dict_items([(PubsubMessage(b'data001-seen', 
{'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected 
attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', 
{'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected 
attribute not found.'}), 1)])
E           Stripped attributes: ['id', 'timestamp']

apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError {code}


> apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_with_attributes
>  is failing
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-13218
>                 URL: https://issues.apache.org/jira/browse/BEAM-13218
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Ananda Prasad Inguva
>            Priority: P1
>              Labels: currently-failing, flaky
>
> One of the integration tests is failing in PostCommits: 
> [https://ci-beam.apache.org/job/beam_PostCommit_Python37/4496/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/]
>  
> [https://ci-beam.apache.org/job/beam_PostCommit_Python38/1893/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/]
> {code:java}
> Error MessageAssertionError:  Expected: (Test pipeline expected terminated in 
> state: RUNNING and Expected 4 messages.)      but: Expected 4 messages. Got 4 
> messages. Diffs (item, count):   Expected but not in actual: 
> dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), 
> (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])   
> Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 
> 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 
> 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 
> 'PubSubMessageMatcher error: expected attribute not found.'}), 1)])   
> Stripped attributes: ['id', 'timestamp']Stacktraceself = 
> <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest 
> testMethod=test_streaming_with_attributes>
>     @pytest.mark.it_postcommit
>     def test_streaming_with_attributes(self):
> >     self._test_streaming(with_attributes=True)
> apache_beam/io/gcp/pubsub_integration_test.py:213: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> apache_beam/io/gcp/pubsub_integration_test.py:205: in _test_streaming
>     timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
> apache_beam/io/gcp/pubsub_it_pipeline.py:93: in run_pipeline
>     result = p.run()
> apache_beam/pipeline.py:573: in run
>     return self.runner.run_pipeline(self, self._options)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner 
> object at 0x7fa1d1561950>
> pipeline = <apache_beam.pipeline.Pipeline object at 0x7fa1d1561750>
> options = <apache_beam.options.pipeline_options.PipelineOptions object at 
> 0x7fa1d15614d0>
>     def run_pipeline(self, pipeline, options):
>       """Execute test pipeline and verify test matcher"""
>       test_options = options.view_as(TestOptions)
>       on_success_matcher = test_options.on_success_matcher
>       wait_duration = test_options.wait_until_finish_duration
>       is_streaming = options.view_as(StandardOptions).streaming
>     
>       # [BEAM-1889] Do not send this to remote workers also, there is no need 
> to
>       # send this option to remote executors.
>       test_options.on_success_matcher = None
>     
>       self.result = super().run_pipeline(pipeline, options)
>       if self.result.has_job:
>         # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
>         # in some cases.
>         print('Worker logs: %s' % self.build_console_url(options))
>     
>       try:
>         self.wait_until_in_state(PipelineState.RUNNING)
>     
>         if is_streaming and not wait_duration:
>           _LOGGER.warning('Waiting indefinitely for streaming job.')
>         self.result.wait_until_finish(duration=wait_duration)
>     
>         if on_success_matcher:
>           from hamcrest import assert_that as hc_assert_that
> >         hc_assert_that(self.result, pickler.loads(on_success_matcher))
> E         AssertionError: 
> E         Expected: (Test pipeline expected terminated in state: RUNNING and 
> Expected 4 messages.)
> E              but: Expected 4 messages. Got 4 messages. Diffs (item, count):
> E           Expected but not in actual: 
> dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), 
> (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])
> E           Unexpected: dict_items([(PubsubMessage(b'data001-seen', 
> {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected 
> attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', 
> {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected 
> attribute not found.'}), 1)])
> E           Stripped attributes: ['id', 'timestamp']
> apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to