Repository: beam
Updated Branches:
  refs/heads/master 3a8ae530c -> 3b890e838


[BEAM-1218] Move the dataflow specific parts from runner_test to 
dataflow_runner_test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8435463
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8435463
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8435463

Branch: refs/heads/master
Commit: d8435463d7e5b416b383bd4caf7752d023339494
Parents: 3a8ae53
Author: Sourabh Bajaj <[email protected]>
Authored: Thu Feb 23 14:46:02 2017 -0800
Committer: Ahmet Altay <[email protected]>
Committed: Thu Feb 23 14:51:01 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner_test.py    | 114 ++++++++++++++++++-
 sdks/python/apache_beam/runners/runner_test.py  | 107 -----------------
 2 files changed, 111 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8435463/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 4a538b1..4a0815a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -17,31 +17,49 @@
 
 """Unit tests for the DataflowRunner class."""
 
+import json
 import unittest
+from datetime import datetime
 
 import mock
 
+import apache_beam as beam
+import apache_beam.transforms as ptransform
+
+from apache_beam.pipeline import Pipeline
+from apache_beam.runners import create_runner
+from apache_beam.runners import DataflowRunner
+from apache_beam.runners import TestDataflowRunner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRuntimeException
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
+from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
-  from apitools.base.py import base_api
+  from apache_beam.runners.dataflow.internal import apiclient
 except ImportError:
-  base_api = None
+  apiclient = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
[email protected](apiclient is None, 'GCP dependencies are not installed')
 class DataflowRunnerTest(unittest.TestCase):
+  default_properties = [
+      '--dataflow_endpoint=ignored',
+      '--job_name=test-job',
+      '--project=test-project',
+      '--staging_location=ignored',
+      '--temp_location=/dev/null',
+      '--no_auth=True']
 
   def test_dataflow_runner_has_metrics(self):
     df_result = DataflowPipelineResult('somejob', 'somerunner')
     self.assertTrue(df_result.metrics())
     self.assertTrue(df_result.metrics().query())
 
-  @unittest.skipIf(base_api is None, 'GCP dependencies are not installed')
   @mock.patch('time.sleep', return_value=None)
   def test_wait_until_finish(self, patched_time_sleep):
     values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
@@ -73,6 +91,96 @@ class DataflowRunnerTest(unittest.TestCase):
         succeeded_runner.job, succeeded_runner)
     succeeded_result.wait_until_finish()
 
+  def test_create_runner(self):
+    self.assertTrue(
+        isinstance(create_runner('DataflowRunner'),
+                   DataflowRunner))
+    self.assertTrue(
+        isinstance(create_runner('TestDataflowRunner'),
+                   TestDataflowRunner))
+
+  def test_remote_runner_translation(self):
+    remote_runner = DataflowRunner()
+    p = Pipeline(remote_runner,
+                 options=PipelineOptions(self.default_properties))
+
+    (p | ptransform.Create([1, 2, 3])  # pylint: 
disable=expression-not-assigned
+     | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
+     | ptransform.GroupByKey())
+    remote_runner.job = apiclient.Job(p.options)
+    super(DataflowRunner, remote_runner).run(p)
+
+  def test_remote_runner_display_data(self):
+    remote_runner = DataflowRunner()
+    p = Pipeline(remote_runner,
+                 options=PipelineOptions(self.default_properties))
+
+    # TODO: Should not subclass ParDo. Switch to PTransform as soon as
+    # composite transforms support display data.
+    class SpecialParDo(beam.ParDo):
+      def __init__(self, fn, now):
+        super(SpecialParDo, self).__init__(fn)
+        self.fn = fn
+        self.now = now
+
+      # Make this a list to be accessible within closure
+      def display_data(self):
+        return {'asubcomponent': self.fn,
+                'a_class': SpecialParDo,
+                'a_time': self.now}
+
+    class SpecialDoFn(beam.DoFn):
+      def display_data(self):
+        return {'dofn_value': 42}
+
+      def process(self):
+        pass
+
+    now = datetime.now()
+    # pylint: disable=expression-not-assigned
+    (p | ptransform.Create([1, 2, 3, 4, 5])
+     | 'Do' >> SpecialParDo(SpecialDoFn(), now))
+
+    remote_runner.job = apiclient.Job(p.options)
+    super(DataflowRunner, remote_runner).run(p)
+    job_dict = json.loads(str(remote_runner.job))
+    steps = [step
+             for step in job_dict['steps']
+             if len(step['properties'].get('display_data', [])) > 0]
+    step = steps[0]
+    disp_data = step['properties']['display_data']
+    disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key'])
+    nspace = SpecialParDo.__module__+ '.'
+    expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo',
+                      'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
+                      'key': 'a_time'},
+                     {'type': 'STRING', 'namespace': nspace+'SpecialParDo',
+                      'value': nspace+'SpecialParDo', 'key': 'a_class',
+                      'shortValue': 'SpecialParDo'},
+                     {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn',
+                      'value': 42, 'key': 'dofn_value'}]
+    expected_data = sorted(expected_data, key=lambda x: 
x['namespace']+x['key'])
+    self.assertEqual(len(disp_data), 3)
+    self.assertEqual(disp_data, expected_data)
+
+  def test_no_group_by_key_directly_after_bigquery(self):
+    remote_runner = DataflowRunner()
+    p = Pipeline(remote_runner,
+                 options=PipelineOptions([
+                     '--dataflow_endpoint=ignored',
+                     '--job_name=test-job',
+                     '--project=test-project',
+                     '--staging_location=ignored',
+                     '--temp_location=/dev/null',
+                     '--no_auth=True'
+                 ]))
+    rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable'))
+    with self.assertRaises(ValueError,
+                           msg=('Coder for the GroupByKey operation'
+                                '"GroupByKey" is not a key-value coder: '
+                                'RowAsDictJsonCoder')):
+      unused_invalid = rows | beam.GroupByKey()
+
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/d8435463/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index d39c6eb..b161cbb 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -22,9 +22,7 @@ the other unit tests. In this file we choose to test only 
aspects related to
 caching and clearing values that are not tested elsewhere.
 """
 
-import json
 import unittest
-from datetime import datetime
 
 import hamcrest as hc
 
@@ -38,22 +36,10 @@ from apache_beam.metrics.metricbase import MetricName
 from apache_beam.pipeline import Pipeline
 from apache_beam.runners import DirectRunner
 from apache_beam.runners import create_runner
-from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.utils.pipeline_options import PipelineOptions
 
-from apache_beam.runners import DataflowRunner
-from apache_beam.runners import TestDataflowRunner
-
-# Protect against environments where api client is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apache_beam.runners.dataflow.internal import apiclient
-except ImportError:
-  apiclient = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
 
 class RunnerTest(unittest.TestCase):
   default_properties = [
@@ -67,14 +53,6 @@ class RunnerTest(unittest.TestCase):
   def test_create_runner(self):
     self.assertTrue(
         isinstance(create_runner('DirectRunner'), DirectRunner))
-    if apiclient is not None:
-      self.assertTrue(
-          isinstance(create_runner('DataflowRunner'),
-                     DataflowRunner))
-    if apiclient is not None:
-      self.assertTrue(
-          isinstance(create_runner('TestDataflowRunner'),
-                     TestDataflowRunner))
     self.assertRaises(ValueError, create_runner, 'xyz')
 
   def test_create_runner_shorthand(self):
@@ -89,72 +67,6 @@ class RunnerTest(unittest.TestCase):
     self.assertTrue(
         isinstance(create_runner('Direct'), DirectRunner))
 
-  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
-  def test_remote_runner_translation(self):
-    remote_runner = DataflowRunner()
-    p = Pipeline(remote_runner,
-                 options=PipelineOptions(self.default_properties))
-
-    (p | ptransform.Create([1, 2, 3])  # pylint: 
disable=expression-not-assigned
-     | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
-     | ptransform.GroupByKey())
-    remote_runner.job = apiclient.Job(p.options)
-    super(DataflowRunner, remote_runner).run(p)
-
-  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
-  def test_remote_runner_display_data(self):
-    remote_runner = DataflowRunner()
-    p = Pipeline(remote_runner,
-                 options=PipelineOptions(self.default_properties))
-
-    # TODO: Should not subclass ParDo. Switch to PTransform as soon as
-    # composite transforms support display data.
-    class SpecialParDo(beam.ParDo):
-      def __init__(self, fn, now):
-        super(SpecialParDo, self).__init__(fn)
-        self.fn = fn
-        self.now = now
-
-      # Make this a list to be accessible within closure
-      def display_data(self):
-        return {'asubcomponent': self.fn,
-                'a_class': SpecialParDo,
-                'a_time': self.now}
-
-    class SpecialDoFn(beam.DoFn):
-      def display_data(self):
-        return {'dofn_value': 42}
-
-      def process(self):
-        pass
-
-    now = datetime.now()
-    # pylint: disable=expression-not-assigned
-    (p | ptransform.Create([1, 2, 3, 4, 5])
-     | 'Do' >> SpecialParDo(SpecialDoFn(), now))
-
-    remote_runner.job = apiclient.Job(p.options)
-    super(DataflowRunner, remote_runner).run(p)
-    job_dict = json.loads(str(remote_runner.job))
-    steps = [step
-             for step in job_dict['steps']
-             if len(step['properties'].get('display_data', [])) > 0]
-    step = steps[0]
-    disp_data = step['properties']['display_data']
-    disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key'])
-    nspace = SpecialParDo.__module__+ '.'
-    expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo',
-                      'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
-                      'key': 'a_time'},
-                     {'type': 'STRING', 'namespace': nspace+'SpecialParDo',
-                      'value': nspace+'SpecialParDo', 'key': 'a_class',
-                      'shortValue': 'SpecialParDo'},
-                     {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn',
-                      'value': 42, 'key': 'dofn_value'}]
-    expected_data = sorted(expected_data, key=lambda x: 
x['namespace']+x['key'])
-    self.assertEqual(len(disp_data), 3)
-    self.assertEqual(disp_data, expected_data)
-
   def test_direct_runner_metrics(self):
     from apache_beam.metrics.metric import Metrics
 
@@ -206,25 +118,6 @@ class RunnerTest(unittest.TestCase):
                 DistributionResult(DistributionData(15, 5, 1, 5)),
                 DistributionResult(DistributionData(15, 5, 1, 5)))))
 
-  @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
-  def test_no_group_by_key_directly_after_bigquery(self):
-    remote_runner = DataflowRunner()
-    p = Pipeline(remote_runner,
-                 options=PipelineOptions([
-                     '--dataflow_endpoint=ignored',
-                     '--job_name=test-job',
-                     '--project=test-project',
-                     '--staging_location=ignored',
-                     '--temp_location=/dev/null',
-                     '--no_auth=True'
-                 ]))
-    rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable'))
-    with self.assertRaises(ValueError,
-                           msg=('Coder for the GroupByKey operation'
-                                '"GroupByKey" is not a key-value coder: '
-                                'RowAsDictJsonCoder')):
-      unused_invalid = rows | beam.GroupByKey()
-
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to