Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3cac39674 -> 259c864a0


[AIRFLOW-781] Allow DataFlowOperators to accept jobs stored in GCS

Closes #2037 from fenglu-g/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/259c864a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/259c864a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/259c864a

Branch: refs/heads/master
Commit: 259c864a07617458841471e71a1034ed196ba3fc
Parents: 3cac396
Author: Feng Lu <fen...@fengcloud.hot.corp.google.com>
Authored: Wed Feb 1 09:36:02 2017 -0800
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Wed Feb 1 09:36:25 2017 -0800

----------------------------------------------------------------------
 airflow/contrib/operators/dataflow_operator.py | 53 +++++++++++++++++++++
 tests/contrib/operators/dataflow_operator.py   | 12 +++--
 2 files changed, 62 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/259c864a/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py 
b/airflow/contrib/operators/dataflow_operator.py
index ef49eb6..c1dca24 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -14,7 +14,9 @@
 
 import copy
 import re
+import uuid
 
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -107,6 +109,9 @@ class DataFlowJavaOperator(BaseOperator):
         self.options = options
 
     def execute(self, context):
+        bucket_helper = GoogleCloudBucketHelper(
+            self.gcp_conn_id, self.delegate_to)
+        self.jar = bucket_helper.google_cloud_to_local(self.jar)
         hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
 
@@ -168,6 +173,9 @@ class DataFlowPythonOperator(BaseOperator):
 
     def execute(self, context):
         """Execute the python dataflow job."""
+        bucket_helper = GoogleCloudBucketHelper(
+            self.gcp_conn_id, self.delegate_to)
+        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
         hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
         dataflow_options = self.dataflow_default_options.copy()
@@ -180,3 +188,48 @@ class DataFlowPythonOperator(BaseOperator):
         hook.start_python_dataflow(
             self.task_id, formatted_options,
             self.py_file, self.py_options)
+
+
+class GoogleCloudBucketHelper():
+    """GoogleCloudStorageHook helper class to download GCS object."""
+    GCS_PREFIX_LENGTH = 5
+
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        self._gcs_hook = GoogleCloudStorageHook(gcp_conn_id, delegate_to)
+
+    def google_cloud_to_local(self, file_name):
+        """
+        Checks whether the file specified by file_name is stored in Google 
Cloud
+        Storage (GCS), if so, downloads the file and saves it locally. The full
+        path of the saved file will be returned. Otherwise the local file_name
+        will be returned immediately.
+
+        :param file_name: The full path of input file.
+        :type file_name: string
+        :return: The full path of local file.
+        :type: string
+        """
+        if not file_name.startswith('gs://'):
+            return file_name
+
+        # Extracts bucket_id and object_id by first removing 'gs://' prefix and
+        # then split the remaining by path delimiter '/'.
+        path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/')
+        if path_components < 2:
+            raise Exception(
+                'Invalid Google Cloud Storage (GCS) object path: {}.'
+                .format(file_name))
+
+        bucket_id = path_components[0]
+        object_id = '/'.join(path_components[1:])
+        local_file = '/tmp/dataflow{}-{}'.format(str(uuid.uuid1())[:8],
+                                                 path_components[-1])
+        file_size = self._gcs_hook.download(bucket_id, object_id, local_file)
+
+        if file_size > 0:
+            return local_file
+        raise Exception(
+            'Failed to download Google Cloud Storage GCS object: {}'
+            .format(file_name))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/259c864a/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py 
b/tests/contrib/operators/dataflow_operator.py
index 4f887c1..5329723 100644
--- a/tests/contrib/operators/dataflow_operator.py
+++ b/tests/contrib/operators/dataflow_operator.py
@@ -27,7 +27,7 @@ except ImportError:
 
 
 TASK_ID = 'test-python-dataflow'
-PY_FILE = 'apache_beam.examples.wordcount'
+PY_FILE = 'gs://my-bucket/my-object.py'
 PY_OPTIONS = ['-m']
 DEFAULT_OPTIONS = {
     'project': 'test',
@@ -36,6 +36,7 @@ DEFAULT_OPTIONS = {
 ADDITIONAL_OPTIONS = {
     'output': 'gs://test/output'
 }
+GCS_HOOK_STRING = 'airflow.contrib.operators.dataflow_operator.{}'
 
 
 class DataFlowPythonOperatorTest(unittest.TestCase):
@@ -59,12 +60,14 @@ class DataFlowPythonOperatorTest(unittest.TestCase):
                          ADDITIONAL_OPTIONS)
 
     @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
-    def test_exec(self, dataflow_mock):
+    @mock.patch(GCS_HOOK_STRING.format('GoogleCloudStorageHook'))
+    def test_exec(self, gcs_hook, dataflow_mock):
         """Test DataFlowHook is created and the right args are passed to
         start_python_workflow.
 
         """
         start_python_hook = dataflow_mock.return_value.start_python_dataflow
+        gcs_download_hook = gcs_hook.return_value.download
         self.dataflow.execute(None)
         assert dataflow_mock.called
         expected_options = {
@@ -72,5 +75,8 @@ class DataFlowPythonOperatorTest(unittest.TestCase):
             'staging_location': 'gs://test/staging',
             'output': 'gs://test/output'
         }
+        gcs_download_hook.assert_called_once_with(
+            'my-bucket', 'my-object.py', mock.ANY)
         start_python_hook.assert_called_once_with(TASK_ID, expected_options,
-                                                  PY_FILE, PY_OPTIONS)
+                                                  mock.ANY, PY_OPTIONS)
+        self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))

Reply via email to