This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b11a4c  [BEAM-7630] add ITs for writing and reading bytes from pubsub
     new 2b5e30d  Merge pull request #8985 from Juta/bytes
3b11a4c is described below

commit 3b11a4cf491ef49ea090c26ad4a2ebd72a94ab8c
Author: Juta <[email protected]>
AuthorDate: Mon Jul 8 10:48:43 2019 +0200

    [BEAM-7630] add ITs for writing and reading bytes from pubsub
---
 .../examples/streaming_wordcount_it_test.py        |  3 ++-
 .../apache_beam/io/gcp/pubsub_integration_test.py  | 24 ++++++++++++++++++++--
 .../apache_beam/io/gcp/tests/pubsub_matcher.py     |  2 +-
 .../io/gcp/tests/pubsub_matcher_test.py            |  6 +++---
 4 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index eddca52..c194d52 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -82,7 +82,8 @@ class StreamingWordCountIT(unittest.TestCase):
   @attr('IT')
   def test_streaming_wordcount_it(self):
     # Build expected dataset.
-    expected_msg = [('%d: 1' % num) for num in range(DEFAULT_INPUT_NUMBERS)]
+    expected_msg = [('%d: 1' % num).encode('utf-8')
+                    for num in range(DEFAULT_INPUT_NUMBERS)]
 
     # Set extra options to the pipeline for test purpose
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index c8a743e..2c43786 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -63,6 +63,10 @@ class PubSubIntegrationTest(unittest.TestCase):
           PubsubMessage(b'data002', {
               TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
           }),
+          PubsubMessage(b'data003\xab\xac', {}),
+          PubsubMessage(b'data004\xab\xac', {
+              TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
+          })
       ],
       'TestDataflowRunner': [
           # Use ID_LABEL attribute to deduplicate messages with the same ID.
@@ -74,6 +78,12 @@ class PubSubIntegrationTest(unittest.TestCase):
           # by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
           PubsubMessage(b'data002', {
               TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
+          }),
+          PubsubMessage(b'data003\xab\xac', {ID_LABEL: 'foo2'}),
+          PubsubMessage(b'data003\xab\xac', {ID_LABEL: 'foo2'}),
+          PubsubMessage(b'data003\xab\xac', {ID_LABEL: 'foo2'}),
+          PubsubMessage(b'data004\xab\xac', {
+              TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
           })
       ],
   }
@@ -85,6 +95,12 @@ class PubSubIntegrationTest(unittest.TestCase):
               TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
               'processed': 'IT',
           }),
+          PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}),
+          PubsubMessage(b'data004\xab\xac-seen', {
+              TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
+              TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
+              'processed': 'IT',
+          })
       ],
       'TestDataflowRunner': [
           PubsubMessage(b'data001-seen', {'processed': 'IT'}),
@@ -92,6 +108,11 @@ class PubSubIntegrationTest(unittest.TestCase):
               TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
               'processed': 'IT',
           }),
+          PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}),
+          PubsubMessage(b'data004\xab\xac-seen', {
+              TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
+              'processed': 'IT',
+          })
       ],
   }
 
@@ -139,8 +160,7 @@ class PubSubIntegrationTest(unittest.TestCase):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
     expected_messages = self.EXPECTED_OUTPUT_MESSAGES[self.runner_name]
     if not with_attributes:
-      expected_messages = [pubsub_msg.data.decode('utf-8')
-                           for pubsub_msg in expected_messages]
+      expected_messages = [pubsub_msg.data for pubsub_msg in expected_messages]
     if self.runner_name == 'TestDirectRunner':
       strip_attributes = None
     else:
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index ba7a674..7a0b5c8 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -103,7 +103,7 @@ class PubSubMessageMatcher(BaseMatcher):
       for rm in response.received_messages:
         msg = PubsubMessage._from_message(rm.message)
         if not self.with_attributes:
-          total_messages.append(msg.data.decode('utf-8'))
+          total_messages.append(msg.data)
           continue
 
         if self.strip_attributes:
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py 
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 6a58ddf..cb9fbb9 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -59,7 +59,7 @@ class PubSubMatcherTest(unittest.TestCase):
 
   def test_message_matcher_success(self, mock_get_sub, unsued_mock):
     self.init_matcher()
-    self.pubsub_matcher.expected_msg = ['a', 'b']
+    self.pubsub_matcher.expected_msg = [b'a', b'b']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
         create_pull_response([PullResponseMessage(b'a', {})]),
@@ -121,7 +121,7 @@ class PubSubMatcherTest(unittest.TestCase):
 
   def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
     self.init_matcher()
-    self.pubsub_matcher.expected_msg = ['a']
+    self.pubsub_matcher.expected_msg = [b'a']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
         create_pull_response([PullResponseMessage(b'c', {}),
@@ -130,7 +130,7 @@ class PubSubMatcherTest(unittest.TestCase):
     with self.assertRaises(AssertionError) as error:
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
-    self.assertCountEqual(['c', 'd'], self.pubsub_matcher.messages)
+    self.assertCountEqual([b'c', b'd'], self.pubsub_matcher.messages)
     self.assertTrue(
         '\nExpected: Expected 1 messages.\n     but: Got 2 messages.'
         in str(error.exception.args[0]))

Reply via email to