[
https://issues.apache.org/jira/browse/BEAM-4536?focusedWorklogId=111166&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111166
]
ASF GitHub Bot logged work on BEAM-4536:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jun/18 17:50
Start Date: 12/Jun/18 17:50
Worklog Time Spent: 10m
Work Description: chamikaramj closed pull request #5605: [BEAM-4536]
Remove with_attributes keyword from ReadFromPubSub.
URL: https://github.com/apache/beam/pull/5605
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index e45dd23bfef..6db45bdbfa5 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -108,7 +108,7 @@ class ReadFromPubSub(PTransform):
# Implementation note: This ``PTransform`` is overridden by Directrunner.
def __init__(self, topic=None, subscription=None, id_label=None,
- with_attributes=False, timestamp_attribute=None):
+ timestamp_attribute=None):
"""Initializes ``ReadFromPubSub``.
Args:
@@ -118,12 +118,8 @@ def __init__(self, topic=None, subscription=None,
id_label=None,
deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
- with_attributes:
- True - output elements will be :class:`~PubsubMessage` objects.
- False - output elements will be of type ``str`` (message payload only).
timestamp_attribute: Message value to use as element timestamp. If None,
uses message publishing time as the timestamp.
- Note that this argument doesn't require with_attributes=True.
Timestamp values should be in one of two formats:
@@ -135,12 +131,13 @@ def __init__(self, topic=None, subscription=None,
id_label=None,
units smaller than milliseconds) may be ignored.
"""
super(ReadFromPubSub, self).__init__()
- self.with_attributes = with_attributes
+ # TODO(BEAM-4536): Add with_attributes to kwargs once fixed.
+ self.with_attributes = False
self._source = _PubSubSource(
topic=topic,
subscription=subscription,
id_label=id_label,
- with_attributes=with_attributes,
+ with_attributes=self.with_attributes,
timestamp_attribute=timestamp_attribute)
def expand(self, pvalue):
@@ -174,8 +171,7 @@ def __init__(self, topic=None, subscription=None,
id_label=None):
def expand(self, pvalue):
p = (pvalue.pipeline
- | ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=False)
+ | ReadFromPubSub(self.topic, self.subscription, self.id_label)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
p.element_type = text_type
return p
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index f987947d454..165c072abb1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -63,7 +63,7 @@ def test_expand_with_topic(self):
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label', with_attributes=False,
+ None, 'a_label',
timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
@@ -87,7 +87,7 @@ def test_expand_with_subscription(self):
pcoll = (p
| ReadFromPubSub(
None, 'projects/fakeprj/subscriptions/a_subscription',
- 'a_label', with_attributes=False, timestamp_attribute=None)
+ 'a_label', timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
@@ -107,16 +107,17 @@ def test_expand_with_subscription(self):
def test_expand_with_no_topic_or_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Either a topic or subscription must be provided."):
- ReadFromPubSub(None, None, 'a_label', with_attributes=False,
+ ReadFromPubSub(None, None, 'a_label',
timestamp_attribute=None)
def test_expand_with_both_topic_and_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Only one of topic or subscription should be provided."):
ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
- with_attributes=False, timestamp_attribute=None)
+ timestamp_attribute=None)
- def test_expand_with_other_options(self):
+ # TODO(BEAM-4536): Reenable test when bug is fixed.
+ def _test_expand_with_other_options(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
@@ -291,8 +292,9 @@ def create_client_message(payload, message_id, attributes,
publish_time):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadFromPubSub(unittest.TestCase):
+ # TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
- def test_read_messages_success(self, mock_pubsub):
+ def _test_read_messages_success(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
publish_time = '2018-03-12T13:37:01.234567Z'
@@ -353,8 +355,9 @@ def test_read_payload_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data))
p.run()
+ # TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
- def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
+ def _test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '1337'}
@@ -380,8 +383,10 @@ def
test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data), reify_windows=True)
p.run()
+ # TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
- def test_read_messages_timestamp_attribute_rfc3339_success(self,
mock_pubsub):
+ def _test_read_messages_timestamp_attribute_rfc3339_success(self,
+ mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '2018-03-12T13:37:01.234567Z'}
@@ -407,8 +412,9 @@ def
test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data), reify_windows=True)
p.run()
+ # TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
- def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
+ def _test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '1337'}
@@ -428,8 +434,9 @@ def
test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
with self.assertRaisesRegexp(KeyError, r'Timestamp.*nonexistent'):
p.run()
+ # TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
- def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
+ def _test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '1337 unparseable'}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 111166)
Time Spent: 40m (was: 0.5h)
> Python SDK: Pubsub reading with_attributes broken for Dataflow
> --------------------------------------------------------------
>
> Key: BEAM-4536
> URL: https://issues.apache.org/jira/browse/BEAM-4536
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.5.0
> Reporter: Udi Meiri
> Assignee: Udi Meiri
> Priority: Blocker
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Using
> [ReadFromPubsub|https://github.com/apache/beam/blob/e30e0c807321934e862358e1e3be32dc74374aeb/sdks/python/apache_beam/io/gcp/pubsub.py#L106](with_attributes=True)
> will fail on Dataflow.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)