diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
index a95e5fa8f53..04e6e4e7370 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -23,13 +23,17 @@
 Currently, this test blocks until the job is manually terminated.
+import datetime
 import logging
+import random
 import unittest
+import uuid
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 from apache_beam.examples import streaming_wordcount
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
 from apache_beam.runners.runner import PipelineState
 from apache_beam.testing import test_utils
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -47,17 +51,16 @@ class StreamingWordCountIT(unittest.TestCase):
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.uuid = str(uuid.uuid4())
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(
-        project=self.test_pipeline.get_option('project'))
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
-    self._cleanup_pubsub()
+    self.pubsub_client = pubsub.Client(project=self.project)
+    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
+    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
+    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
+    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
@@ -65,6 +68,11 @@ def setUp(self):
+  def _generate_identifier(self):
+    seed = random.randint(0, 999)
+    current_time = datetime.datetime.now().strftime('%m%d%H%M%S')
+    return '%s%d' % (current_time, seed)
   def _inject_numbers(self, topic, num_messages):
     """Inject numbers as test data to PubSub."""
     logging.debug('Injecting %d numbers to topic %s',
@@ -79,13 +87,21 @@ def _cleanup_pubsub(self):
   def tearDown(self):
-  @attr('developing_test')
+  @attr('IT')
   def test_streaming_wordcount_it(self):
+    # Build expected dataset.
+    expected_msg = [('%d: 1' % num) for num in range(DEFAULT_INPUT_NUMBERS)]
     # Set extra options to the pipeline for test purpose
-    pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
-    extra_opts = {'input_sub': self.input_sub.full_name,
+    state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+    pubsub_msg_verifier = PubSubMessageMatcher(self.project,
+                                               OUTPUT_SUB + self.uuid,
+                                               expected_msg,
+                                               timeout=400)
+    extra_opts = {'input_subscription': self.input_sub.full_name,
                   'output_topic': self.output_topic.full_name,
-                  'on_success_matcher': all_of(*pipeline_verifiers)}
+                  'on_success_matcher': all_of(state_verifier,
+                                               pubsub_msg_verifier)}
     # Generate input data and inject to PubSub.
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py 
new file mode 100644
index 00000000000..8fb687908d8
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#    http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""PubSub verifier used for end-to-end test."""
+import logging
+import time
+from collections import Counter
+from hamcrest.core.base_matcher import BaseMatcher
+__all__ = ['PubSubMessageMatcher']
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+    """Initialize PubSubMessageMatcher object.
+    Args:
+      project: A name string of project.
+      sub_name: A name string of subscription which is attached to output.
+      expected_msg: A string list that contains expected message data pulled
+        from the subscription.
+      timeout: Timeout in seconds to wait for all expected messages appears.
+    """
+    if pubsub is None:
+      raise ImportError(
+          'PubSub dependencies are not installed.')
+    if not project:
+      raise ValueError('Invalid project %s.' % project)
+    if not sub_name:
+      raise ValueError('Invalid subscription %s.' % sub_name)
+    if not isinstance(expected_msg, list):
+      raise ValueError('Invalid expected messages %s.' % expected_msg)
+    self.project = project
+    self.sub_name = sub_name
+    self.expected_msg = expected_msg
+    self.timeout = timeout
+    self.messages = None
+  def _matches(self, _):
+    if not self.messages:
+      subscription = (pubsub
+                      .Client(project=self.project)
+                      .subscription(self.sub_name))
+      self.messages = self._wait_for_messages(subscription,
+                                              len(self.expected_msg),
+                                              self.timeout)
+    return Counter(self.messages) == Counter(self.expected_msg)
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+    """Wait for messages from given subscription."""
+    logging.debug('Start pulling messages from %s', subscription.full_name)
+    total_messages = []
+    start_time = time.time()
+    while time.time() - start_time <= timeout:
+      pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+      for ack_id, message in pulled:
+        total_messages.append(message.data)
+        subscription.acknowledge([ack_id])
+      if len(total_messages) >= expected_num:
+        return total_messages
+      time.sleep(1)
+    logging.error('Timeout after %d sec. Received %d messages from %s.',
+                  timeout, len(total_messages), subscription.full_name)
+    return total_messages
+  def describe_to(self, description):
+    description.append_text(
+        'Expected %d messages.' % len(self.expected_msg))
+  def describe_mismatch(self, _, mismatch_description):
+    c_expected = Counter(self.expected_msg)
+    c_actual = Counter(self.messages)
+    diff = (c_expected | c_actual) - (c_expected & c_actual)
+    mismatch_description.append_text(
+        "Got %d messages. Diffs: %s." %
+        (len(self.messages), list(diff.elements())))
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py 
new file mode 100644
index 00000000000..a7fd310c7dd
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#    http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit test for PubSub verifier."""
+import logging
+import unittest
+import mock
+from hamcrest import assert_that as hc_assert_that
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+@unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
+class PubSubMatcherTest(unittest.TestCase):
+  def setUp(self):
+    self.mock_presult = mock.MagicMock()
+    self.pubsub_matcher = PubSubMessageMatcher('mock_project',
+                                               'mock_sub_name',
+                                               ['mock_expected_msg'])
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('google.cloud.pubsub.Client.subscription')
+  def test_message_matcher_success(self, mock_sub_cls, unsued_mock):
+    self.pubsub_matcher.expected_msg = ['a', 'b']
+    mock_sub = mock_sub_cls.return_value
+    mock_sub.pull.side_effect = [
+        [(1, pubsub.message.Message(b'a', 'unused_id'))],
+        [(2, pubsub.message.Message(b'b', 'unused_id'))],
+    ]
+    hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertEqual(mock_sub.pull.call_count, 2)
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('google.cloud.pubsub.Client.subscription')
+  def test_message_matcher_mismatch(self, mock_sub_cls, unused_mock):
+    self.pubsub_matcher.expected_msg = ['a']
+    mock_sub = mock_sub_cls.return_value
+    mock_sub.pull.return_value = [
+        (1, pubsub.message.Message(b'c', 'unused_id')),
+        (1, pubsub.message.Message(b'd', 'unused_id')),
+    ]
+    with self.assertRaises(AssertionError) as error:
+      hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertItemsEqual(['c', 'd'], self.pubsub_matcher.messages)
+    self.assertTrue(
+        '\nExpected: Expected 1 messages.\n     but: Got 2 messages.'
+        in str(error.exception.args[0]))
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('google.cloud.pubsub.Client.subscription')
+  def test_message_metcher_timeout(self, mock_sub_cls, unused_mock):
+    mock_sub = mock_sub_cls.return_value
+    mock_sub.return_value.full_name.return_value = 'mock_sub'
+    self.pubsub_matcher.timeout = 0.1
+    with self.assertRaises(AssertionError) as error:
+      hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertTrue(mock_sub.pull.called)
+    self.assertEqual(
+        '\nExpected: Expected %d messages.\n     but: Got %d messages. Diffs: '
+        '%s.\n' % (1, 0, ['mock_expected_msg']), str(error.exception.args[0]))
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py 
index 4fb5982ec4e..50390b94dba 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
     self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
     if self.result.has_job:
-      project = pipeline._options.view_as(GoogleCloudOptions).project
-      region_id = pipeline._options.view_as(GoogleCloudOptions).region
-      job_id = self.result.job_id()
       # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
       # in some cases.
-      print (
-          'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-          '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+      print('Found: %s.' % self.build_console_url(pipeline.options))
     if not options.view_as(StandardOptions).streaming:
-      # TODO: Ideally, we want to wait until workers start successfully.
-      self.wait_until_running()
+      self.wait_until_in_state(PipelineState.RUNNING)
     if on_success_matcher:
       from hamcrest import assert_that as hc_assert_that
       hc_assert_that(self.result, pickler.loads(on_success_matcher))
+    if options.view_as(StandardOptions).streaming:
+      self.result.cancel()
+      self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
     return self.result
-  def wait_until_running(self):
+  def build_console_url(self, options):
+    """Build a console url of Dataflow job."""
+    project = options.view_as(GoogleCloudOptions).project
+    region_id = options.view_as(GoogleCloudOptions).region
+    job_id = self.result.job_id()
+    return (
+        'https://console.cloud.google.com/dataflow/jobsDetail/locations'
+        '/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  def wait_until_in_state(self, state, timeout=WAIT_TIMEOUT):
     """Wait until Dataflow pipeline terminate or enter RUNNING state."""
     if not self.result.has_job:
       raise IOError('Failed to get the Dataflow job id.')
     start_time = time.time()
-    while time.time() - start_time <= WAIT_TIMEOUT:
+    while time.time() - start_time <= timeout:
       job_state = self.result.state
       if (self.result.is_in_terminal_state() or
           job_state == PipelineState.RUNNING):


