This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fe1eace [BEAM-5315] Python 3 port bigquery and pubsub related io
modules
new a5ed104 Merge pull request #7685 from Juta/io-gcp
fe1eace is described below
commit fe1eace21fd75e83baa6012d072a19a9c1742bb1
Author: Juta <[email protected]>
AuthorDate: Thu Jan 31 12:12:46 2019 +0100
[BEAM-5315] Python 3 port bigquery and pubsub related io modules
---
sdks/python/apache_beam/io/gcp/pubsub.py | 3 +-
sdks/python/apache_beam/io/gcp/pubsub_test.py | 34 +++++++++++-----------
.../io/gcp/tests/pubsub_matcher_test.py | 12 ++++----
sdks/python/tox.ini | 2 +-
4 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 53da3b4..0711b70 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -27,6 +27,7 @@ from __future__ import absolute_import
import re
from builtins import object
+from future.utils import iteritems
from past.builtins import unicode
from apache_beam import coders
@@ -114,7 +115,7 @@ class PubsubMessage(object):
"""
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.data = self.data
- for key, value in self.attributes.iteritems():
+ for key, value in iteritems(self.attributes):
msg.attributes[key] = value
return msg.SerializeToString()
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 04f8802..612ded6 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -74,7 +74,7 @@ class TestPubsubMessage(unittest.TestCase):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
def test_proto_conversion(self):
- data = 'data'
+ data = b'data'
attributes = {'k1': 'v1', 'k2': 'v2'}
m = PubsubMessage(data, attributes)
m_converted = PubsubMessage._from_proto_str(m._to_proto_str())
@@ -82,25 +82,25 @@ class TestPubsubMessage(unittest.TestCase):
self.assertEqual(m_converted.attributes, attributes)
def test_eq(self):
- a = PubsubMessage('abc', {1: 2, 3: 4})
- b = PubsubMessage('abc', {1: 2, 3: 4})
- c = PubsubMessage('abc', {1: 2})
+ a = PubsubMessage(b'abc', {1: 2, 3: 4})
+ b = PubsubMessage(b'abc', {1: 2, 3: 4})
+ c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(a == b)
self.assertTrue(a != c)
self.assertTrue(b != c)
def test_hash(self):
- a = PubsubMessage('abc', {1: 2, 3: 4})
- b = PubsubMessage('abc', {1: 2, 3: 4})
- c = PubsubMessage('abc', {1: 2})
+ a = PubsubMessage(b'abc', {1: 2, 3: 4})
+ b = PubsubMessage(b'abc', {1: 2, 3: 4})
+ c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(hash(a) == hash(b))
self.assertTrue(hash(a) != hash(c))
self.assertTrue(hash(b) != hash(c))
def test_repr(self):
- a = PubsubMessage('abc', {1: 2, 3: 4})
- b = PubsubMessage('abc', {1: 2, 3: 4})
- c = PubsubMessage('abc', {1: 2})
+ a = PubsubMessage(b'abc', {1: 2, 3: 4})
+ b = PubsubMessage(b'abc', {1: 2, 3: 4})
+ c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(repr(a) == repr(b))
self.assertTrue(repr(a) != repr(c))
self.assertTrue(repr(b) != repr(c))
@@ -333,7 +333,7 @@
transform_evaluator.TransformEvaluatorRegistry._test_evaluators_overrides = {
class TestReadFromPubSub(unittest.TestCase):
def test_read_messages_success(self, mock_pubsub):
- data = 'data'
+ data = b'data'
publish_time_secs = 1520861821
publish_time_nanos = 234567000
attributes = {'key': 'value'}
@@ -399,7 +399,7 @@ class TestReadFromPubSub(unittest.TestCase):
mock.call(mock.ANY, [ack_id])])
def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
- data = 'data'
+ data = b'data'
attributes = {'time': '1337'}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
@@ -429,7 +429,7 @@ class TestReadFromPubSub(unittest.TestCase):
mock.call(mock.ANY, [ack_id])])
def test_read_messages_timestamp_attribute_rfc3339_success(self,
mock_pubsub):
- data = 'data'
+ data = b'data'
attributes = {'time': '2018-03-12T13:37:01.234567Z'}
publish_time_secs = 1337000000
publish_time_nanos = 133700000
@@ -459,7 +459,7 @@ class TestReadFromPubSub(unittest.TestCase):
mock.call(mock.ANY, [ack_id])])
def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
- data = 'data'
+ data = b'data'
attributes = {}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
@@ -490,7 +490,7 @@ class TestReadFromPubSub(unittest.TestCase):
mock.call(mock.ANY, [ack_id])])
def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
- data = 'data'
+ data = b'data'
attributes = {'time': '1337 unparseable'}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
@@ -557,7 +557,7 @@ class TestWriteToPubSub(unittest.TestCase):
mock.call(mock.ANY, data)])
def test_write_messages_with_attributes_success(self, mock_pubsub):
- data = 'data'
+ data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]
@@ -589,7 +589,7 @@ class TestWriteToPubSub(unittest.TestCase):
p.run()
def test_write_messages_unsupported_features(self, mock_pubsub):
- data = 'data'
+ data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 1261aa1..bfed329 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -59,7 +59,7 @@ class PubSubMatcherTest(unittest.TestCase):
def test_message_matcher_success(self, mock_get_sub, unsued_mock):
self.init_matcher()
- self.pubsub_matcher.expected_msg = ['a', 'b']
+ self.pubsub_matcher.expected_msg = [b'a', b'b']
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {})]),
@@ -71,7 +71,7 @@ class PubSubMatcherTest(unittest.TestCase):
def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
- self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+ self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
@@ -82,7 +82,7 @@ class PubSubMatcherTest(unittest.TestCase):
def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
- self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
+ self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})]
mock_sub = mock_get_sub.return_value
# Unexpected attribute 'k'.
mock_sub.pull.side_effect = [
@@ -96,7 +96,7 @@ class PubSubMatcherTest(unittest.TestCase):
def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
- self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+ self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [create_pull_response([
PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
@@ -108,7 +108,7 @@ class PubSubMatcherTest(unittest.TestCase):
def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
- self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+ self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
# Message is missing attribute 'timestamp'.
mock_sub.pull.side_effect = [create_pull_response([
@@ -130,7 +130,7 @@ class PubSubMatcherTest(unittest.TestCase):
with self.assertRaises(AssertionError) as error:
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
- self.assertCountEqual(['c', 'd'], self.pubsub_matcher.messages)
+ self.assertCountEqual([b'c', b'd'], self.pubsub_matcher.messages)
self.assertTrue(
'\nExpected: Expected 1 messages.\n but: Got 2 messages.'
in str(error.exception.args[0]))
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index dbccd56..fc3bad2 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -99,7 +99,7 @@ setenv =
RUN_SKIPPED_PY3_TESTS=0
extras = test,gcp
modules =
-
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.i
[...]
+
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.i
[...]
commands =
python --version
pip --version