[ 
https://issues.apache.org/jira/browse/BEAM-5513?focusedWorklogId=151917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151917
 ]

ASF GitHub Bot logged work on BEAM-5513:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/18 23:11
            Start Date: 05/Oct/18 23:11
    Worklog Time Spent: 10m 
      Work Description: charlesccychen closed pull request #6564: [BEAM-5513] 
Upgrade Python SDK to PubSub 0.35.4
URL: https://github.com/apache/beam/pull/6564
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py 
b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 6dc60d0a807..2fc19daa1af 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -72,15 +72,15 @@ def setUp(self):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    unique_topic_name = self.INPUT_TOPIC + _unique_id
-    unique_subscrition_name = self.INPUT_SUB + _unique_id
-    self.input_topic = self.pubsub_client.topic(unique_topic_name)
-    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.INPUT_TOPIC + 
_unique_id))
 
-    self.input_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic])
-    self.input_sub.create()
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project,
+                                          self.INPUT_SUB + _unique_id),
+        self.input_topic.name)
 
     # Set up BigQuery environment
     from google.cloud import bigquery
@@ -95,14 +95,15 @@ def _inject_pubsub_game_events(self, topic, message_count):
     """Inject game events as test data to PubSub."""
 
     logging.debug('Injecting %d game events to topic %s',
-                  message_count, topic.full_name)
+                  message_count, topic.name)
 
     for _ in range(message_count):
-      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+      self.pub_client.publish(topic.name,
+                              self.INPUT_EVENT % self._test_timestamp)
 
   def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub])
-    test_utils.cleanup_topics([self.input_topic])
+    test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+    test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
   def _cleanup_dataset(self):
     self.dataset.delete()
@@ -123,9 +124,9 @@ def test_game_stats_it(self):
 
     # TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
 
-    extra_opts = {'subscription': self.input_sub.full_name,
+    extra_opts = {'subscription': self.input_sub.name,
                   'dataset': self.dataset.name,
-                  'topic': self.input_topic.full_name,
+                  'topic': self.input_topic.name,
                   'fixed_window_duration': 1,
                   'user_activity_window_duration': 1,
                   'wait_until_finish_duration':
@@ -143,8 +144,6 @@ def test_game_stats_it(self):
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_topic,
-                                               self.input_sub])
     self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git 
a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index ab109425eb6..e0e309b1265 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -73,15 +73,16 @@ def setUp(self):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    unique_topic_name = self.INPUT_TOPIC + _unique_id
-    unique_subscrition_name = self.INPUT_SUB + _unique_id
-    self.input_topic = self.pubsub_client.topic(unique_topic_name)
-    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
 
-    self.input_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic])
-    self.input_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.INPUT_TOPIC + 
_unique_id))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project,
+                                          self.INPUT_SUB + _unique_id),
+        self.input_topic.name)
 
     # Set up BigQuery environment
     from google.cloud import bigquery
@@ -96,14 +97,15 @@ def _inject_pubsub_game_events(self, topic, message_count):
     """Inject game events as test data to PubSub."""
 
     logging.debug('Injecting %d game events to topic %s',
-                  message_count, topic.full_name)
+                  message_count, topic.name)
 
     for _ in range(message_count):
-      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+      self.pub_client.publish(topic.name,
+                              self.INPUT_EVENT % self._test_timestamp)
 
   def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub])
-    test_utils.cleanup_topics([self.input_topic])
+    test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+    test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
   def _cleanup_dataset(self):
     self.dataset.delete()
@@ -131,9 +133,9 @@ def test_leader_board_it(self):
                                         teams_query,
                                         self.DEFAULT_EXPECTED_CHECKSUM)
 
-    extra_opts = {'subscription': self.input_sub.full_name,
+    extra_opts = {'subscription': self.input_sub.name,
                   'dataset': self.dataset.name,
-                  'topic': self.input_topic.full_name,
+                  'topic': self.input_topic.name,
                   'team_window_duration': 1,
                   'wait_until_finish_duration':
                       self.WAIT_UNTIL_FINISH_DURATION,
@@ -151,8 +153,6 @@ def test_leader_board_it(self):
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_topic,
-                                               self.input_sub])
     self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 3c0cfa93a2c..78e89a1e184 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -52,31 +52,31 @@ def setUp(self):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
-
-    self.input_topic.create()
-    self.output_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
-    self.input_sub.create()
-    self.output_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+        self.input_topic.name)
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, OUTPUT_SUB + 
self.uuid),
+        self.output_topic.name)
 
   def _inject_numbers(self, topic, num_messages):
     """Inject numbers as test data to PubSub."""
-    logging.debug('Injecting %d numbers to topic %s',
-                  num_messages, topic.full_name)
+    logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
     for n in range(num_messages):
-      topic.publish(str(n))
-
-  def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
-    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+      self.pub_client.publish(self.input_topic.name, str(n))
 
   def tearDown(self):
-    self._cleanup_pubsub()
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
 
   @attr('IT')
   def test_streaming_wordcount_it(self):
@@ -86,17 +86,16 @@ def test_streaming_wordcount_it(self):
     # Set extra options to the pipeline for test purpose
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
     pubsub_msg_verifier = PubSubMessageMatcher(self.project,
-                                               OUTPUT_SUB + self.uuid,
+                                               self.output_sub.name,
                                                expected_msg,
                                                timeout=400)
-    extra_opts = {'input_subscription': self.input_sub.full_name,
-                  'output_topic': self.output_topic.full_name,
+    extra_opts = {'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
                   'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_sub])
     self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 241419425e1..a1644abaf11 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,11 +38,10 @@
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.annotations import deprecated
 
-# The protobuf library is only used for running on Dataflow.
 try:
-  from google.cloud.proto.pubsub.v1 import pubsub_pb2
+  from google.cloud import pubsub
 except ImportError:
-  pubsub_pb2 = None
+  pubsub = None
 
 __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
            'WriteStringsToPubSub', 'WriteToPubSub']
@@ -92,7 +91,7 @@ def _from_proto_str(proto_msg):
     Returns:
       A new PubsubMessage object.
     """
-    msg = pubsub_pb2.PubsubMessage()
+    msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.ParseFromString(proto_msg)
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
@@ -109,7 +108,7 @@ def _to_proto_str(self):
       
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
       containing the payload of this object.
     """
-    msg = pubsub_pb2.PubsubMessage()
+    msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.data = self.data
     for key, value in self.attributes.iteritems():
       msg.attributes[key] = value
@@ -117,9 +116,9 @@ def _to_proto_str(self):
 
   @staticmethod
   def _from_message(msg):
-    """Construct from ``google.cloud.pubsub.message.Message``.
+    """Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``.
 
-    
https://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/api/message.html
+    
https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
     """
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 9bb81fc645f..5b060e5af4f 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -100,21 +100,25 @@ def setUp(self):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
-
-    self.input_topic.create()
-    self.output_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
-    self.input_sub.create()
-    self.output_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+        self.input_topic.name)
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, OUTPUT_SUB + 
self.uuid),
+        self.output_topic.name)
 
   def tearDown(self):
-    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
-    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
 
   def _test_streaming(self, with_attributes):
     """Runs IT pipeline with message verifier.
@@ -139,21 +143,20 @@ def _test_streaming(self, with_attributes):
       strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE]
     pubsub_msg_verifier = PubSubMessageMatcher(
         self.project,
-        OUTPUT_SUB + self.uuid,
+        self.output_sub.name,
         expected_messages,
         timeout=MESSAGE_MATCHER_TIMEOUT_S,
         with_attributes=with_attributes,
         strip_attributes=strip_attributes)
-    extra_opts = {'input_subscription': self.input_sub.full_name,
-                  'output_topic': self.output_topic.full_name,
+    extra_opts = {'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
                   'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_sub])
     for msg in self.INPUT_MESSAGES[self.runner_name]:
-      self.input_topic.publish(msg.data, **msg.attributes)
+      self.pub_client.publish(self.input_topic.name, msg.data, 
**msg.attributes)
 
     # Get pipeline options from command argument: --test-pipeline-options,
     # and start pipeline job by calling pipeline main function.
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 6e19950777f..a95ffc6e542 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -20,12 +20,9 @@
 
 from __future__ import absolute_import
 
-import functools
 import logging
 import unittest
 from builtins import object
-from builtins import range
-from builtins import zip
 
 import hamcrest as hc
 import mock
@@ -43,6 +40,7 @@
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _get_transform_overrides
 from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
+from apache_beam.testing import test_utils
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import TestWindowedValue
 from apache_beam.testing.util import assert_that
@@ -54,18 +52,10 @@
 from apache_beam.utils import timestamp
 
 # Protect against environments where the PubSub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-# The protobuf library is only used for running on Dataflow.
-try:
-  from google.cloud.proto.pubsub.v1 import pubsub_pb2
-except ImportError:
-  pubsub_pb2 = None
 
 
 class TestPubsubMessage(unittest.TestCase):
@@ -81,8 +71,7 @@ def test_payload_invalid(self):
     with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'):
       _ = PubsubMessage(None, {})
 
-  @unittest.skipIf(pubsub_pb2 is None,
-                   'PubSub proto dependencies are not installed')
+  @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
   def test_proto_conversion(self):
     data = 'data'
     attributes = {'k1': 'v1', 'k2': 'v2'}
@@ -220,7 +209,7 @@ def test_expand_deprecated(self):
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
-    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
 
   def test_expand(self):
     p = TestPipeline()
@@ -240,7 +229,7 @@ def test_expand(self):
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
-    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
     self.assertEqual(True, write_transform.dofn.with_attributes)
     # TODO(BEAM-4275): These properties aren't supported yet in direct runner.
     self.assertEqual(None, write_transform.dofn.id_label)
@@ -333,118 +322,25 @@ def finish_bundle(self):
 }
 
 
-class FakePubsubTopic(object):
-
-  def __init__(self, name, client):
-    self.name = name
-    self.client = client
-
-  def subscription(self, name):
-    return FakePubsubSubscription(name, self.name, self.client)
-
-  def batch(self):
-    if self.client.batch is None:
-      self.client.batch = FakeBatch(self.client)
-    return self.client.batch
-
-
-class FakePubsubSubscription(object):
-
-  def __init__(self, name, topic, client):
-    self.name = name
-    self.topic = topic
-    self.client = client
-
-  def create(self):
-    pass
-
-
-class FakeAutoAck(object):
-
-  def __init__(self, sub, **unused_kwargs):
-    self.sub = sub
-
-  def __enter__(self):
-    messages = self.sub.client.messages_read
-    self.ack_id_to_msg = dict(zip(range(len(messages)), messages))
-    return self.ack_id_to_msg
-
-  def __exit__(self, exc_type, exc_val, exc_tb):
-    pass
-
-
-class FakeBatch(object):
-  """Context manager that accept Pubsub client writes via publish().
-
-  Verifies writes on exit.
-  """
-
-  def __init__(self, client):
-    self.client = client
-    self.published = []
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exc_type, exc_val, exc_tb):
-    if exc_type is not None:
-      return  # Exception will be raised.
-    hc.assert_that(self.published,
-                   hc.only_contains(*self.client.messages_write))
-
-  def publish(self, message, **attrs):
-    self.published.append([message, attrs])
-
-
-class FakePubsubClient(object):
-
-  def __init__(self, messages_read=None, messages_write=None, project=None,
-               **unused_kwargs):
-    """Creates a Pubsub client fake.
-
-    Args:
-      messages_read: List of PubsubMessage objects to return.
-      messages_write: List of [data, attributes] pairs, corresponding to
-        messages expected to be written to the client.
-      project: Name of GCP project.
-    """
-    self.messages_read = messages_read
-    self.messages_write = messages_write
-    self.project = project
-    self.batch = None
-
-  def topic(self, name):
-    return FakePubsubTopic(name, self)
-
-
-def create_client_message(data, message_id, attributes, publish_time):
-  """Returns a message as it would be returned from Cloud Pub/Sub client.
-
-  This is what the reader sees.
-  """
-  msg = pubsub.message.Message(data, message_id, attributes)
-  msg._service_timestamp = publish_time
-  return msg
-
-
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.SubscriberClient')
 class TestReadFromPubSub(unittest.TestCase):
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
-    publish_time = '2018-03-12T13:37:01.234567Z'
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
     attributes = {'key': 'value'}
-    payloads = [create_client_message(
-        data, message_id, attributes, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(PubsubMessage(data, attributes),
                           timestamp.Timestamp(1520861821.234567),
                           [window.GlobalWindow()])]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -453,17 +349,18 @@ def test_read_messages_success(self, mock_pubsub):
                               None, None, with_attributes=True))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_strings_success(self, mock_pubsub):
     data = u'🤷 ¯\\_(ツ)_/¯'
     data_encoded = data.encode('utf-8')
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)
+    ])
     expected_elements = [data]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -472,16 +369,16 @@ def test_read_strings_success(self, mock_pubsub):
                                      None, None))
     assert_that(pcoll, equal_to(expected_elements))
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_data_success(self, mock_pubsub):
     data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
     expected_elements = [data_encoded]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -489,24 +386,26 @@ def test_read_data_success(self, mock_pubsub):
              | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
     assert_that(pcoll, equal_to(expected_elements))
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '1337'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp(micros=int(attributes['time']) * 1000),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -516,24 +415,26 @@ def 
test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
                  with_attributes=True, timestamp_attribute='time'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_rfc3339_success(self, 
mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '2018-03-12T13:37:01.234567Z'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    publish_time_secs = 1337000000
+    publish_time_nanos = 133700000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(attributes['time']),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -543,24 +444,27 @@ def 
test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
                  with_attributes=True, timestamp_attribute='time'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {}
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
     publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(publish_time),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -570,18 +474,20 @@ def test_read_messages_timestamp_attribute_missing(self, 
mock_pubsub):
                  with_attributes=True, timestamp_attribute='nonexistent'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '1337 unparseable'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -591,20 +497,10 @@ def 
test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
              with_attributes=True, timestamp_attribute='time'))
     with self.assertRaisesRegexp(ValueError, r'parse'):
       p.run()
+    mock_pubsub.return_value.acknowledge.assert_not_called()
 
-  @mock.patch('google.cloud.pubsub')
-  def test_read_message_id_label_unsupported(self, mock_pubsub):
+  def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
     # id_label is unsupported in DirectRunner.
-    data = 'data'
-    message_id = 'message_id'
-    attributes = {'time': '1337 unparseable'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
-
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     _ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 
'a_label'))
@@ -614,16 +510,12 @@ def test_read_message_id_label_unsupported(self, 
mock_pubsub):
 
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.PublisherClient')
 class TestWriteToPubSub(unittest.TestCase):
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_success(self, mock_pubsub):
     data = 'data'
     payloads = [data]
-    expected_payloads = [[data, {}]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -632,15 +524,12 @@ def test_write_messages_success(self, mock_pubsub):
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
                          with_attributes=False))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_deprecated(self, mock_pubsub):
     data = 'data'
     payloads = [data]
-    expected_payloads = [[data, {}]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -648,16 +537,13 @@ def test_write_messages_deprecated(self, mock_pubsub):
          | Create(payloads)
          | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_with_attributes_success(self, mock_pubsub):
     data = 'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
-    expected_payloads = [[data, attributes]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -666,15 +552,14 @@ def test_write_messages_with_attributes_success(self, 
mock_pubsub):
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
                          with_attributes=True))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data, **attributes)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_with_attributes_error(self, mock_pubsub):
     data = 'data'
     # Sending raw data when WriteToPubSub expects a PubsubMessage object.
     payloads = [data]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient)
-
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     _ = (p
@@ -685,15 +570,10 @@ def test_write_messages_with_attributes_error(self, 
mock_pubsub):
                                  r'str.*has no attribute.*data'):
       p.run()
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_unsupported_features(self, mock_pubsub):
     data = 'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
-    expected_payloads = [[data, attributes]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 6217faf569d..7a0b5c828e9 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -31,12 +31,10 @@
 
 
 # Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
 
 DEFAULT_TIMEOUT = 5 * 60
 MAX_MESSAGES_IN_ONE_PULL = 50
@@ -49,8 +47,9 @@ class PubSubMessageMatcher(BaseMatcher):
   subscription until all expected messages are shown or timeout.
   """
 
-  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT,
-               with_attributes=False, strip_attributes=None):
+  def __init__(self, project, sub_name, expected_msg,
+               timeout=DEFAULT_TIMEOUT, with_attributes=False,
+               strip_attributes=None):
     """Initialize PubSubMessageMatcher object.
 
     Args:
@@ -59,8 +58,9 @@ def __init__(self, project, sub_name, expected_msg, 
timeout=DEFAULT_TIMEOUT,
       expected_msg: A string list that contains expected message data pulled
         from the subscription. See also: with_attributes.
       timeout: Timeout in seconds to wait for all expected messages appears.
-      with_attributes: Whether expected_msg is a list of
-        ``PubsubMessage`` objects.
+      with_attributes: If True, will match against both message data and
+        attributes. If True, expected_msg should be a list of ``PubsubMessage``
+        objects. Otherwise, it should be a list of ``bytes``.
       strip_attributes: List of strings. If with_attributes==True, strip the
         attributes keyed by these values from incoming messages.
         If a key is missing, will add an attribute with an error message as
@@ -86,28 +86,26 @@ def __init__(self, project, sub_name, expected_msg, 
timeout=DEFAULT_TIMEOUT,
 
   def _matches(self, _):
     if self.messages is None:
-      self.messages = self._wait_for_messages(self._get_subscription(),
-                                              len(self.expected_msg),
+      self.messages = self._wait_for_messages(len(self.expected_msg),
                                               self.timeout)
     return Counter(self.messages) == Counter(self.expected_msg)
 
-  def _get_subscription(self):
-    return pubsub.Client(project=self.project).subscription(self.sub_name)
-
-  def _wait_for_messages(self, subscription, expected_num, timeout):
+  def _wait_for_messages(self, expected_num, timeout):
     """Wait for messages from given subscription."""
-    logging.debug('Start pulling messages from %s', subscription.full_name)
     total_messages = []
+
+    sub_client = pubsub.SubscriberClient()
     start_time = time.time()
     while time.time() - start_time <= timeout:
-      pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
-      for ack_id, message in pulled:
-        subscription.acknowledge([ack_id])
+      response = sub_client.pull(self.sub_name,
+                                 max_messages=MAX_MESSAGES_IN_ONE_PULL,
+                                 return_immediately=True)
+      for rm in response.received_messages:
+        msg = PubsubMessage._from_message(rm.message)
         if not self.with_attributes:
-          total_messages.append(message.data)
+          total_messages.append(msg.data)
           continue
 
-        msg = PubsubMessage._from_message(message)
         if self.strip_attributes:
           for attr in self.strip_attributes:
             try:
@@ -117,12 +115,16 @@ def _wait_for_messages(self, subscription, expected_num, 
timeout):
                                       'expected attribute not found.')
         total_messages.append(msg)
 
+      ack_ids = [rm.ack_id for rm in response.received_messages]
+      if ack_ids:
+        sub_client.acknowledge(self.sub_name, ack_ids)
       if len(total_messages) >= expected_num:
-        return total_messages
+        break
       time.sleep(1)
 
-    logging.error('Timeout after %d sec. Received %d messages from %s.',
-                  timeout, len(total_messages), subscription.full_name)
+    if time.time() - start_time > timeout:
+      logging.error('Timeout after %d sec. Received %d messages from %s.',
+                    timeout, len(total_messages), self.sub_name)
     return total_messages
 
   def describe_to(self, description):
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py 
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 0e5948163f9..0f9351d346d 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -27,17 +27,19 @@
 
 from apache_beam.io.gcp.pubsub import PubsubMessage
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.testing.test_utils import PullResponseMessage
+from apache_beam.testing.test_utils import create_pull_response
 
 # Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
 
 
 @unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
+@mock.patch('time.sleep', return_value=None)
+@mock.patch('google.cloud.pubsub.SubscriberClient')
 class PubSubMatcherTest(unittest.TestCase):
 
   def setUp(self):
@@ -48,90 +50,75 @@ def init_matcher(self, with_attributes=False, 
strip_attributes=None):
         'mock_project', 'mock_sub_name', ['mock_expected_msg'],
         with_attributes=with_attributes, strip_attributes=strip_attributes)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_success(self, mock_get_sub, unsued_mock):
     self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a', 'b']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
-        [(1, pubsub.message.Message(b'a', 'unused_id'))],
-        [(2, pubsub.message.Message(b'b', 'unused_id'))],
+        create_pull_response([PullResponseMessage(b'a', {})]),
+        create_pull_response([PullResponseMessage(b'b', {})]),
     ]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 2)
+    self.assertEqual(mock_sub.acknowledge.call_count, 2)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+    ]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['k'] = 'v'  # Unexpected.
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    # Unexpected attribute 'k'.
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+    ]
     with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['id'] = 'foo'
-    msg_a.attributes['timestamp'] = 'bar'
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    mock_sub.pull.side_effect = [create_pull_response([
+        PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
+    ])]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    # msg_a is missing attribute 'timestamp'.
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['id'] = 'foo'
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    # Message is missing attribute 'timestamp'.
+    mock_sub.pull.side_effect = [create_pull_response([
+        PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'})
+    ])]
     with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
     self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a']
     mock_sub = mock_get_sub.return_value
-    mock_sub.pull.return_value = [
-        (1, pubsub.message.Message(b'c', 'unused_id')),
-        (1, pubsub.message.Message(b'd', 'unused_id')),
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'c', {}),
+                              PullResponseMessage(b'd', {})]),
     ]
     with self.assertRaises(AssertionError) as error:
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
@@ -140,10 +127,9 @@ def test_message_matcher_mismatch(self, mock_get_sub, 
unused_mock):
     self.assertTrue(
         '\nExpected: Expected 1 messages.\n     but: Got 2 messages.'
         in str(error.exception.args[0]))
+    self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
     self.init_matcher()
     mock_sub = mock_get_sub.return_value
@@ -152,6 +138,7 @@ def test_message_matcher_timeout(self, mock_get_sub, 
unused_mock):
     with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertTrue(mock_sub.pull.called)
+    self.assertEqual(mock_sub.acknowledge.call_count, 0)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 00e37f3c9fd..d410992ab1d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -25,6 +25,7 @@
 
 import itertools
 import logging
+import time
 
 from google.protobuf import wrappers_pb2
 
@@ -264,11 +265,12 @@ def expand(self, pvalue):
 
 
 class _DirectWriteToPubSubFn(DoFn):
-  _topic = None
+  BUFFER_SIZE_ELEMENTS = 100
+  FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
 
   def __init__(self, sink):
     self.project = sink.project
-    self.topic_name = sink.topic_name
+    self.short_topic_name = sink.topic_name
     self.id_label = sink.id_label
     self.timestamp_attribute = sink.timestamp_attribute
     self.with_attributes = sink.with_attributes
@@ -282,30 +284,33 @@ def __init__(self, sink):
                                 'supported for PubSub writes')
 
   def start_bundle(self):
-    from google.cloud import pubsub
-
-    if self._topic is None:
-      self._topic = pubsub.Client(project=self.project).topic(
-          self.topic_name)
     self._buffer = []
 
   def process(self, elem):
     self._buffer.append(elem)
-    if len(self._buffer) >= 100:
+    if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
       self._flush()
 
   def finish_bundle(self):
     self._flush()
 
   def _flush(self):
-    if self._buffer:
-      with self._topic.batch() as batch:
-        for elem in self._buffer:
-          if self.with_attributes:
-            batch.publish(elem.data, **elem.attributes)
-          else:
-            batch.publish(elem)
-      self._buffer = []
+    from google.cloud import pubsub
+    pub_client = pubsub.PublisherClient()
+    topic = pub_client.topic_path(self.project, self.short_topic_name)
+
+    if self.with_attributes:
+      futures = [pub_client.publish(topic, elem.data, **elem.attributes)
+                 for elem in self._buffer]
+    else:
+      futures = [pub_client.publish(topic, elem)
+                 for elem in self._buffer]
+
+    timer_start = time.time()
+    for future in futures:
+      remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start)
+      future.result(remaining)
+    self._buffer = []
 
 
 def _get_pubsub_transform_overrides(pipeline_options):
diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py 
b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
index 8facca8edc8..23dfeabc2ab 100644
--- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
@@ -52,5 +52,6 @@ def run_pipeline(self, pipeline):
     finally:
       if not PipelineState.is_terminal(self.result.state):
         self.result.cancel()
+        self.result.wait_until_finish()
 
     return self.result
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ef12e2cb02f..fad0704bd3e 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -377,20 +377,44 @@ def finish_bundle(self):
 
 
 class _PubSubSubscriptionWrapper(object):
-  """Wrapper for garbage-collecting temporary PubSub subscriptions."""
+  """Wrapper for managing temporary PubSub subscriptions."""
 
-  def __init__(self, subscription, should_cleanup):
-    self.subscription = subscription
-    self.should_cleanup = should_cleanup
+  def __init__(self, project, short_topic_name, short_sub_name):
+    """Initialize subscription wrapper.
+
+    If sub_name is None, will create a temporary subscription to topic_name.
+
+    Args:
+      project: GCP project name for topic and subscription. May be None.
+        Required if sub_name is None.
+      short_topic_name: Valid topic name without
+        'projects/{project}/topics/' prefix. May be None.
+        Required if sub_name is None.
+      short_sub_name: Valid subscription name without
+        'projects/{project}/subscriptions/' prefix. May be None.
+    """
+    from google.cloud import pubsub
+    self.sub_client = pubsub.SubscriberClient()
+
+    if short_sub_name is None:
+      self.sub_name = self.sub_client.subscription_path(
+          project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 
32)))
+      topic_name = self.sub_client.topic_path(project, short_topic_name)
+      self.sub_client.create_subscription(self.sub_name, topic_name)
+      self._should_cleanup = True
+    else:
+      self.sub_name = self.sub_client.subscription_path(project, 
short_sub_name)
+      self._should_cleanup = False
 
   def __del__(self):
-    if self.should_cleanup:
-      self.subscription.delete()
+    if self._should_cleanup:
+      self.sub_client.delete_subscription(self.sub_name)
 
 
 class _PubSubReadEvaluator(_TransformEvaluator):
   """TransformEvaluator for PubSub read."""
 
+  # A mapping of transform to _PubSubSubscriptionWrapper.
   _subscription_cache = {}
 
   def __init__(self, evaluation_context, applied_ptransform,
@@ -404,26 +428,16 @@ def __init__(self, evaluation_context, applied_ptransform,
     if self.source.id_label:
       raise NotImplementedError(
           'DirectRunner: id_label is not supported for PubSub reads')
-    self._subscription = _PubSubReadEvaluator.get_subscription(
+    self._sub_name = _PubSubReadEvaluator.get_subscription(
         self._applied_ptransform, self.source.project, self.source.topic_name,
         self.source.subscription_name)
 
   @classmethod
-  def get_subscription(cls, transform, project, topic, subscription_name):
+  def get_subscription(cls, transform, project, topic, short_sub_name):
     if transform not in cls._subscription_cache:
-      from google.cloud import pubsub
-      should_create = not subscription_name
-      if should_create:
-        subscription_name = 'beam_%d_%x' % (
-            int(time.time()), random.randrange(1 << 32))
-      wrapper = _PubSubSubscriptionWrapper(
-          pubsub.Client(project=project).topic(topic).subscription(
-              subscription_name),
-          should_create)
-      if should_create:
-        wrapper.subscription.create()
+      wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name)
       cls._subscription_cache[transform] = wrapper
-    return cls._subscription_cache[transform].subscription
+    return cls._subscription_cache[transform].sub_name
 
   def start_bundle(self):
     pass
@@ -438,28 +452,34 @@ def _read_from_pubsub(self, timestamp_attribute):
     # evaluator fails with an exception before emitting a bundle. However,
     # the DirectRunner currently doesn't retry work items anyway, so the
     # pipeline would enter an inconsistent state on any error.
-    with pubsub.subscription.AutoAck(
-        self._subscription, return_immediately=True,
-        max_messages=10) as results:
-      def _get_element(message):
-        parsed_message = PubsubMessage._from_message(message)
-        if (timestamp_attribute and
-            timestamp_attribute in parsed_message.attributes):
-          rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+    sub_client = pubsub.SubscriberClient()
+    response = sub_client.pull(self._sub_name, max_messages=10,
+                               return_immediately=True)
+
+    def _get_element(message):
+      parsed_message = PubsubMessage._from_message(message)
+      if (timestamp_attribute and
+          timestamp_attribute in parsed_message.attributes):
+        rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+        try:
+          timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+        except ValueError:
           try:
-            timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
-          except ValueError:
-            try:
-              timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
-            except ValueError as e:
-              raise ValueError('Bad timestamp value: %s' % e)
-        else:
-          timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+            timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+          except ValueError as e:
+            raise ValueError('Bad timestamp value: %s' % e)
+      else:
+        timestamp = Timestamp(message.publish_time.seconds,
+                              message.publish_time.nanos // 1000)
+
+      return timestamp, parsed_message
 
-        return timestamp, parsed_message
+    results = [_get_element(rm.message) for rm in response.received_messages]
+    ack_ids = [rm.ack_id for rm in response.received_messages]
+    if ack_ids:
+      sub_client.acknowledge(self._sub_name, ack_ids)
 
-      return [_get_element(message)
-              for unused_ack_id, message in iteritems(results)]
+    return results
 
   def finish_bundle(self):
     data = self._read_from_pubsub(self.source.timestamp_attribute)
diff --git a/sdks/python/apache_beam/testing/test_utils.py 
b/sdks/python/apache_beam/testing/test_utils.py
index 6ca75e65102..50dd7b4bb17 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -24,11 +24,9 @@
 
 import hashlib
 import imp
-import logging
 import os
 import shutil
 import tempfile
-import time
 from builtins import object
 
 from mock import Mock
@@ -136,46 +134,61 @@ def delete_files(file_paths):
   FileSystems.delete(file_paths)
 
 
-def wait_for_subscriptions_created(subs, timeout=60):
-  """Wait for all PubSub subscriptions are created."""
-  return _wait_until_all_exist(subs, timeout)
-
+def cleanup_subscriptions(sub_client, subs):
+  """Cleanup PubSub subscriptions if exist."""
+  for sub in subs:
+    sub_client.delete_subscription(sub.name)
 
-def wait_for_topics_created(topics, timeout=60):
-  """Wait for all PubSub topics are created."""
-  return _wait_until_all_exist(topics, timeout)
 
+def cleanup_topics(pub_client, topics):
+  """Cleanup PubSub topics if exist."""
+  for topic in topics:
+    pub_client.delete_topic(topic.name)
 
-def _wait_until_all_exist(components, timeout):
-  unchecked_components = set(components)
-  start_time = time.time()
-  while time.time() - start_time <= timeout:
-    unchecked_components = set(
-        [c for c in unchecked_components if not c.exists()])
-    if len(unchecked_components) == 0:
-      return True
-    time.sleep(2)
 
-  raise RuntimeError(
-      'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
-      'They are %s.' % (timeout, len(unchecked_components),
-                        len(components), list(unchecked_components)))
+class PullResponseMessage(object):
+  """Data representing a pull request response.
 
+  Utility class for ``create_pull_response``.
+  """
+  def __init__(self, data, attributes=None,
+               publish_time_secs=None, publish_time_nanos=None, ack_id=None):
+    self.data = data
+    self.attributes = attributes
+    self.publish_time_secs = publish_time_secs
+    self.publish_time_nanos = publish_time_nanos
+    self.ack_id = ack_id
 
-def cleanup_subscriptions(subs):
-  """Cleanup PubSub subscriptions if exist."""
-  _cleanup_pubsub(subs)
 
+def create_pull_response(responses):
+  """Create an instance of ``google.cloud.pubsub.types.ReceivedMessage``.
 
-def cleanup_topics(topics):
-  """Cleanup PubSub topics if exist."""
-  _cleanup_pubsub(topics)
+  Used to simulate the response from pubsub.SubscriberClient().pull().
 
+  Args:
+    responses: list of ``PullResponseMessage``
 
-def _cleanup_pubsub(components):
-  for c in components:
-    if c.exists():
-      c.delete()
-    else:
-      logging.debug('Cannot delete topic/subscription. %s does not exist.',
-                    c.full_name)
+  Returns:
+    An instance of ``google.cloud.pubsub.types.PullResponse`` populated with
+    responses.
+  """
+  from google.cloud import pubsub
+
+  res = pubsub.types.PullResponse()
+  for response in responses:
+    received_message = res.received_messages.add()
+
+    message = received_message.message
+    message.data = response.data
+    if response.attributes is not None:
+      for k, v in response.attributes.items():
+        message.attributes[k] = v
+    if response.publish_time_secs is not None:
+      message.publish_time.seconds = response.publish_time_secs
+    if response.publish_time_nanos is not None:
+      message.publish_time.nanos = response.publish_time_nanos
+
+    if response.ack_id is not None:
+      received_message.ack_id = response.ack_id
+
+  return res
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py 
b/sdks/python/apache_beam/testing/test_utils_test.py
index 56df9fe091a..cd22df0b252 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -82,56 +82,19 @@ def test_temp_file_field_correct(self):
         self.assertEqual(f.readline(), 'line2\n')
         self.assertEqual(f.readline(), 'line3\n')
 
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
-    sub1 = mock.MagicMock()
-    sub1.exists.return_value = True
-    sub2 = mock.MagicMock()
-    sub2.exists.return_value = False
-    with self.assertRaises(RuntimeError) as error:
-      utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
-    self.assertTrue(sub1.exists.called)
-    self.assertTrue(sub2.exists.called)
-    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_topics_created_fails(self, patched_time_sleep):
-    topic1 = mock.MagicMock()
-    topic1.exists.return_value = True
-    topic2 = mock.MagicMock()
-    topic2.exists.return_value = False
-    with self.assertRaises(RuntimeError) as error:
-      utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
-    self.assertTrue(topic1.exists.called)
-    self.assertTrue(topic2.exists.called)
-    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
-    sub1 = mock.MagicMock()
-    sub1.exists.return_value = True
-    self.assertTrue(
-        utils.wait_for_subscriptions_created([sub1], timeout=0.1))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
-    topic1 = mock.MagicMock()
-    topic1.exists.return_value = True
-    self.assertTrue(
-        utils.wait_for_subscriptions_created([topic1], timeout=0.1))
-    self.assertTrue(topic1.exists.called)
-
   def test_cleanup_subscriptions(self):
-    mock_sub = mock.MagicMock()
-    mock_sub.exist.return_value = True
-    utils.cleanup_subscriptions([mock_sub])
-    self.assertTrue(mock_sub.delete.called)
+    sub_client = mock.Mock()
+    sub = mock.Mock()
+    sub.name = 'test_sub'
+    utils.cleanup_subscriptions(sub_client, [sub])
+    sub_client.delete_subscription.assert_called_with(sub.name)
 
   def test_cleanup_topics(self):
-    mock_topics = mock.MagicMock()
-    mock_topics.exist.return_value = True
-    utils.cleanup_subscriptions([mock_topics])
-    self.assertTrue(mock_topics.delete.called)
+    pub_client = mock.Mock()
+    topic = mock.Mock()
+    topic.name = 'test_topic'
+    utils.cleanup_topics(pub_client, [topic])
+    pub_client.delete_topic.assert_called_with(topic.name)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/container/base_image_requirements.txt 
b/sdks/python/container/base_image_requirements.txt
index 13a93b5f3e3..095dc6d6383 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -45,10 +45,9 @@ nose==1.3.7
 # GCP extra features
 google-apitools==0.5.20
 googledatastore==7.0.1
-google-cloud-pubsub==0.26.0
+google-cloud-pubsub==0.35.4
 google-cloud-bigquery==0.25.0
 proto-google-cloud-datastore-v1==0.90.4
-proto-google-cloud-pubsub-v1==0.15.4
 
 # Optional packages
 cython==0.28.1
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 254135b2efb..e7eb836e6bd 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -141,8 +141,7 @@ def get_version():
     'google-apitools>=0.5.18,<=0.5.20',
     'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
     'googledatastore==7.0.1',
-    'google-cloud-pubsub==0.26.0',
-    'proto-google-cloud-pubsub-v1==0.15.4',
+    'google-cloud-pubsub==0.35.4',
     # GCP packages required by tests
     'google-cloud-bigquery==0.25.0',
 ]


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 151917)
    Time Spent: 1h 20m  (was: 1h 10m)

> Upgrade google-cloud-pubsub to 0.35.4
> -------------------------------------
>
>                 Key: BEAM-5513
>                 URL: https://issues.apache.org/jira/browse/BEAM-5513
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Version 0.35.4 is not the latest version out, but it is the latest supported 
> on Dataflow runner.
> [~markflyhigh]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to