Repository: incubator-airflow
Updated Branches:
  refs/heads/master a30f009ae -> 4e80b5f10


[AIRFLOW-2284] GCS to S3 operator

Closes #3190 from NielsZeilemaker/gcp_to_s3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4e80b5f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4e80b5f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4e80b5f1

Branch: refs/heads/master
Commit: 4e80b5f10ae892cc1a5e11ebf4c29759a9ae38e3
Parents: a30f009
Author: niels <ni...@zeilemaker.nl>
Authored: Mon Apr 9 09:59:35 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Mon Apr 9 09:59:35 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/gcs_to_s3.py          | 107 +++++++++++++++++++
 airflow/hooks/S3_hook.py                        |   2 +-
 .../contrib/operators/test_gcs_list_operator.py |   6 +-
 .../operators/test_gcs_to_s3_operator.py        |  69 ++++++++++++
 4 files changed, 182 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e80b5f1/airflow/contrib/operators/gcs_to_s3.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_s3.py 
b/airflow/contrib/operators/gcs_to_s3.py
new file mode 100644
index 0000000..e596fa1
--- /dev/null
+++ b/airflow/contrib/operators/gcs_to_s3.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.contrib.operators.gcs_list_operator import 
GoogleCloudStorageListOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.hooks.S3_hook import S3Hook
+
+
+class GoogleCloudStorageToS3Operator(GoogleCloudStorageListOperator):
+    """
+    Synchronizes a Google Cloud Storage bucket with an S3 bucket.
+
+    :param bucket: The Google Cloud Storage bucket to find the objects.
+    :type bucket: string
+    :param prefix: Prefix string which filters objects whose name begin with 
this prefix
+    :type prefix: string
+    :param delimiter: The delimiter by which you want to filter the objects.
+        For e.g to lists the CSV files from in a directory in GCS you would use
+        delimiter='.csv'.
+    :type delimiter: string
+    :param google_cloud_storage_conn_id: The connection ID to use when
+        connecting to Google Cloud Storage.
+    :type google_cloud_storage_conn_id: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: string
+    :param dest_aws_conn_id: The destination S3 connection
+    :type dest_aws_conn_id: str
+    :param dest_s3_key: The base S3 key to be used to store the files
+    :type dest_s3_key: str
+    """
+    template_fields = ('bucket', 'prefix', 'delimiter', 'dest_s3_key')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 prefix=None,
+                 delimiter=None,
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 delegate_to=None,
+                 dest_aws_conn_id=None,
+                 dest_s3_key=None,
+                 replace=False,
+                 *args,
+                 **kwargs):
+
+        super(GoogleCloudStorageToS3Operator, self).__init__(
+            bucket=bucket,
+            prefix=prefix,
+            delimiter=delimiter,
+            google_cloud_storage_conn_id=google_cloud_storage_conn_id,
+            delegate_to=delegate_to,
+            *args,
+            **kwargs
+        )
+        self.dest_aws_conn_id = dest_aws_conn_id
+        self.dest_s3_key = dest_s3_key
+        self.replace = replace
+
+    def execute(self, context):
+        # use the super to list all files in an Google Cloud Storage bucket
+        files = super(GoogleCloudStorageToS3Operator, self).execute(context)
+        s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id)
+
+        if not self.replace:
+            # if we are not replacing -> list all files in the S3 bucket
+            # and only keep those files which are present in
+            # Google Cloud Storage and not in S3
+            bucket_name, _ = S3Hook.parse_s3_url(self.dest_s3_key)
+            existing_files = s3_hook.list_keys(bucket_name)
+            files = set(files) - set(existing_files)
+
+        if files:
+            hook = GoogleCloudStorageHook(
+                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+                delegate_to=self.delegate_to
+            )
+
+            for file in files:
+                file_bytes = hook.download(self.bucket, file)
+
+                dest_key = self.dest_s3_key + file
+                self.log.info("Saving file to %s", dest_key)
+
+                s3_hook.load_bytes(file_bytes,
+                                   key=dest_key,
+                                   replace=self.replace)
+
+            self.log.info("All done, uploaded %d files to S3", len(files))
+        else:
+            self.log.info("In sync, no files needed to be uploaded to S3")
+
+        return files

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e80b5f1/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 2431001..5fb7709 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -32,7 +32,7 @@ class S3Hook(AwsHook):
     def parse_s3_url(s3url):
         parsed_url = urlparse(s3url)
         if not parsed_url.netloc:
-            raise AirflowException('Please provide a bucket_name')
+            raise AirflowException('Please provide a bucket_name instead of 
"%s"' % s3url)
         else:
             bucket_name = parsed_url.netloc
             key = parsed_url.path.strip('/')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e80b5f1/tests/contrib/operators/test_gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_list_operator.py 
b/tests/contrib/operators/test_gcs_list_operator.py
index cc2c253..1a1797f 100644
--- a/tests/contrib/operators/test_gcs_list_operator.py
+++ b/tests/contrib/operators/test_gcs_list_operator.py
@@ -28,18 +28,22 @@ TASK_ID = 'test-gcs-list-operator'
 TEST_BUCKET = 'test-bucket'
 DELIMITER = '.csv'
 PREFIX = 'TEST'
+MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
 
 
 class GoogleCloudStorageListOperatorTest(unittest.TestCase):
 
     
@mock.patch('airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageHook')
     def test_execute(self, mock_hook):
+        mock_hook.return_value.list.return_value = MOCK_FILES
+
         operator = GoogleCloudStorageListOperator(task_id=TASK_ID,
                                                   bucket=TEST_BUCKET,
                                                   prefix=PREFIX,
                                                   delimiter=DELIMITER)
 
-        operator.execute(None)
+        files = operator.execute(None)
         mock_hook.return_value.list.assert_called_once_with(
             bucket=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER
         )
+        self.assertEqual(sorted(files), sorted(MOCK_FILES))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e80b5f1/tests/contrib/operators/test_gcs_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_to_s3_operator.py 
b/tests/contrib/operators/test_gcs_to_s3_operator.py
new file mode 100644
index 0000000..3ca4875
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_to_s3_operator.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator
+from airflow.hooks.S3_hook import S3Hook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+try:
+    from moto import mock_s3
+except ImportError:
+    mock_s3 = None
+
+TASK_ID = 'test-gcs-list-operator'
+GCS_BUCKET = 'test-bucket'
+DELIMITER = '.csv'
+PREFIX = 'TEST'
+S3_BUCKET = 's3://bucket/'
+MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
+
+
+class GoogleCloudStorageToS3OperatorTest(unittest.TestCase):
+
+    @mock_s3
+    
@mock.patch('airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageHook')
+    @mock.patch('airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageHook')
+    def test_execute(self, mock_hook, mock_hook2):
+        mock_hook.return_value.list.return_value = MOCK_FILES
+        mock_hook.return_value.download.return_value = b"testing"
+        mock_hook2.return_value.list.return_value = MOCK_FILES
+
+        operator = GoogleCloudStorageToS3Operator(task_id=TASK_ID,
+                                                  bucket=GCS_BUCKET,
+                                                  prefix=PREFIX,
+                                                  delimiter=DELIMITER,
+                                                  dest_aws_conn_id=None,
+                                                  dest_s3_key=S3_BUCKET)
+        # create dest bucket
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key=MOCK_FILES[0], Body=b'testing')
+
+        # we expect MOCK_FILES[1:] to be uploaded
+        # and all MOCK_FILES to be present at the S3 bucket
+        uploaded_files = operator.execute(None)
+        self.assertEqual(sorted(MOCK_FILES[1:]),
+                         sorted(uploaded_files))
+        self.assertEqual(sorted(MOCK_FILES),
+                         sorted(hook.list_keys('bucket', delimiter='/')))

Reply via email to