Repository: incubator-airflow
Updated Branches:
  refs/heads/master eaa03dbc7 -> 3dade5413


[AIRFLOW-2559] Azure Fileshare hook

Closes #3457 from NielsZeilemaker/fileshare_hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3dade541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3dade541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3dade541

Branch: refs/heads/master
Commit: 3dade5413f0a3c2028f12fb6f9d7092b9db2eb83
Parents: eaa03db
Author: niels <[email protected]>
Authored: Mon Jun 18 22:23:53 2018 +0100
Committer: Kaxil Naik <[email protected]>
Committed: Mon Jun 18 22:23:53 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py               |   1 +
 airflow/contrib/hooks/azure_fileshare_hook.py   | 212 +++++++++++++++++++
 docs/code.rst                                   |   2 +
 docs/integration.rst                            |  13 ++
 .../contrib/hooks/test_azure_fileshare_hook.py  | 163 ++++++++++++++
 5 files changed, 391 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py 
b/airflow/contrib/hooks/__init__.py
index 29a52da..95f9892 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -60,6 +60,7 @@ _hooks = {
     'jenkins_hook': ['JenkinsHook'],
     'aws_dynamodb_hook': ['AwsDynamoDBHook'],
     'azure_data_lake_hook': ['AzureDataLakeHook'],
+    'azure_fileshare_hook': ['AzureFileShareHook'],
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/airflow/contrib/hooks/azure_fileshare_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/azure_fileshare_hook.py 
b/airflow/contrib/hooks/azure_fileshare_hook.py
new file mode 100644
index 0000000..edabc17
--- /dev/null
+++ b/airflow/contrib/hooks/azure_fileshare_hook.py
@@ -0,0 +1,212 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.hooks.base_hook import BaseHook
+from azure.storage.file import FileService
+
+
+class AzureFileShareHook(BaseHook):
+    """
+    Interacts with Azure FileShare Storage.
+
+    Additional options passed in the 'extra' field of the connection will be
+    passed to the `FileService()` constructor.
+
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    """
+
+    def __init__(self, wasb_conn_id='wasb_default'):
+        self.conn_id = wasb_conn_id
+        self.connection = self.get_conn()
+
+    def get_conn(self):
+        """Return the FileService object."""
+        conn = self.get_connection(self.conn_id)
+        service_options = conn.extra_dejson
+        return FileService(account_name=conn.login,
+                           account_key=conn.password, **service_options)
+
+    def check_for_directory(self, share_name, directory_name, **kwargs):
+        """
+        Check if a directory exists on Azure File Share.
+
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.exists()` takes.
+        :type kwargs: object
+        :return: True if the file exists, False otherwise.
+        :rtype bool
+        """
+        return self.connection.exists(share_name, directory_name,
+                                      **kwargs)
+
+    def check_for_file(self, share_name, directory_name, file_name, **kwargs):
+        """
+        Check if a file exists on Azure File Share.
+
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param file_name: Name of the file.
+        :type file_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.exists()` takes.
+        :type kwargs: object
+        :return: True if the file exists, False otherwise.
+        :rtype bool
+        """
+        return self.connection.exists(share_name, directory_name,
+                                      file_name, **kwargs)
+
+    def list_directories_and_files(self, share_name, directory_name=None, 
**kwargs):
+        """
+        Return the list of directories and files stored on a Azure File Share.
+
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.list_directories_and_files()` takes.
+        :type kwargs: object
+        :return: A list of files and directories
+        :rtype list
+        """
+        return self.connection.list_directories_and_files(share_name,
+                                                          directory_name,
+                                                          **kwargs)
+
+    def create_directory(self, share_name, directory_name, **kwargs):
+        """
+        Create a new direcotry on a Azure File Share.
+
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.create_directory()` takes.
+        :type kwargs: object
+        :return: A list of files and directories
+        :rtype list
+        """
+        return self.connection.create_directory(share_name, directory_name, 
**kwargs)
+
+    def get_file(self, file_path, share_name, directory_name, file_name, 
**kwargs):
+        """
+        Download a file from Azure File Share.
+
+        :param file_path: Where to store the file.
+        :type file_path: str
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param file_name: Name of the file.
+        :type file_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.get_file_to_path()` takes.
+        :type kwargs: object
+        """
+        self.connection.get_file_to_path(share_name, directory_name,
+                                         file_name, file_path, **kwargs)
+
+    def get_file_to_stream(self, stream, share_name, directory_name, 
file_name, **kwargs):
+        """
+        Download a file from Azure File Share.
+
+        :param stream: A filehandle to store the file to.
+        :type stream: file-like object
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param file_name: Name of the file.
+        :type file_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.get_file_to_stream()` takes.
+        :type kwargs: object
+        """
+        self.connection.get_file_to_stream(share_name, directory_name,
+                                           file_name, stream, **kwargs)
+
+    def load_file(self, file_path, share_name, directory_name, file_name, 
**kwargs):
+        """
+        Upload a file to Azure File Share.
+
+        :param file_path: Path to the file to load.
+        :type file_path: str
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param file_name: Name of the file.
+        :type file_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.create_file_from_path()` takes.
+        :type kwargs: object
+        """
+        self.connection.create_file_from_path(share_name, directory_name,
+                                              file_name, file_path, **kwargs)
+
+    def load_string(self, string_data, share_name, directory_name, file_name, 
**kwargs):
+        """
+        Upload a string to Azure File Share.
+
+        :param string_data: String to load.
+        :type string_data: str
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param file_name: Name of the file.
+        :type file_name: str
+        :param kwargs: Optional keyword arguments that
+            `FileService.create_file_from_text()` takes.
+        :type kwargs: object
+        """
+        self.connection.create_file_from_text(share_name, directory_name,
+                                              file_name, string_data, **kwargs)
+
+    def load_stream(self, stream, share_name, directory_name, file_name, 
count, **kwargs):
+        """
+        Upload a stream to Azure File Share.
+
+        :param stream: Opened file/stream to upload as the file content.
+        :type stream: file-like
+        :param share_name: Name of the share.
+        :type share_name: str
+        :param directory_name: Name of the directory.
+        :type directory_name: str
+        :param file_name: Name of the file.
+        :type file_name: str
+        :param count: Size of the stream in bytes
+        :type count: int
+        :param kwargs: Optional keyword arguments that
+            `FileService.create_file_from_stream()` takes.
+        :type kwargs: object
+        """
+        self.connection.create_file_from_stream(share_name, directory_name,
+                                                file_name, stream, count, 
**kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 6b3a84a..f055fc6 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -366,6 +366,8 @@ Community contributed hooks
 .. autoclass:: airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook
 .. autoclass:: airflow.contrib.hooks.aws_hook.AwsHook
 .. autoclass:: airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook
+.. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook
+.. autoclass:: airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook
 .. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook
 .. autoclass:: airflow.contrib.hooks.cassandra_hook.CassandraHook
 .. autoclass:: airflow.contrib.hooks.cloudant_hook.CloudantHook

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 972726b..660b216 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -115,6 +115,19 @@ WasbHook
 
 .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
 
+Azure File Share
+''''''''''''''''
+
+Cloud variant of a SMB file share. Make sure that a Airflow connection of 
+type `wasb` exists. Authorization can be done by supplying a login (=Storage 
account name) 
+and password (=Storage account key), or login and SAS token in the extra field 
+(see connection `wasb_default` for an example).
+
+AzureFileShareHook
+""""""""""""""""""
+
+.. autoclass:: airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook
+
 Logging
 '''''''
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/tests/contrib/hooks/test_azure_fileshare_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_azure_fileshare_hook.py 
b/tests/contrib/hooks/test_azure_fileshare_hook.py
new file mode 100644
index 0000000..5803cd8
--- /dev/null
+++ b/tests/contrib/hooks/test_azure_fileshare_hook.py
@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+import json
+import unittest
+
+from airflow import configuration
+from airflow import models
+from airflow.contrib.hooks.azure_fileshare_hook import AzureFileShareHook
+from airflow.utils import db
+
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestAzureFileshareHook(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='wasb_test_key', conn_type='wasb',
+                login='login', password='key'
+            )
+        )
+        db.merge_conn(
+            models.Connection(
+                conn_id='wasb_test_sas_token', conn_type='wasb',
+                login='login', extra=json.dumps({'sas_token': 'token'})
+            )
+        )
+
+    def test_key(self):
+        from azure.storage.file import FileService
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_key')
+        self.assertEqual(hook.conn_id, 'wasb_test_key')
+        self.assertIsInstance(hook.connection, FileService)
+
+    def test_sas_token(self):
+        from azure.storage.file import FileService
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertEqual(hook.conn_id, 'wasb_test_sas_token')
+        self.assertIsInstance(hook.connection, FileService)
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_check_for_file(self, mock_service):
+        mock_instance = mock_service.return_value
+        mock_instance.exists.return_value = True
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertTrue(hook.check_for_file('share', 'directory', 'file', 
timeout=3))
+        mock_instance.exists.assert_called_once_with(
+            'share', 'directory', 'file', timeout=3
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_check_for_directory(self, mock_service):
+        mock_instance = mock_service.return_value
+        mock_instance.exists.return_value = True
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertTrue(hook.check_for_directory('share', 'directory', 
timeout=3))
+        mock_instance.exists.assert_called_once_with(
+            'share', 'directory', timeout=3
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_load_file(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.load_file('path', 'share', 'directory', 'file', max_connections=1)
+        mock_instance.create_file_from_path.assert_called_once_with(
+            'share', 'directory', 'file', 'path', max_connections=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_load_string(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.load_string('big string', 'share', 'directory', 'file', timeout=1)
+        mock_instance.create_file_from_text.assert_called_once_with(
+            'share', 'directory', 'file', 'big string', timeout=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_load_stream(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.load_stream('stream', 'share', 'directory', 'file', 42, timeout=1)
+        mock_instance.create_file_from_stream.assert_called_once_with(
+            'share', 'directory', 'file', 'stream', 42, timeout=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_list_directories_and_files(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.list_directories_and_files('share', 'directory', timeout=1)
+        mock_instance.list_directories_and_files.assert_called_once_with(
+            'share', 'directory', timeout=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_create_directory(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.create_directory('share', 'directory', timeout=1)
+        mock_instance.create_directory.assert_called_once_with(
+            'share', 'directory', timeout=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_get_file(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.get_file('path', 'share', 'directory', 'file', max_connections=1)
+        mock_instance.get_file_to_path.assert_called_once_with(
+            'share', 'directory', 'file', 'path', max_connections=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService',
+                autospec=True)
+    def test_get_file_to_stream(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token')
+        hook.get_file_to_stream('stream', 'share', 'directory', 'file', 
max_connections=1)
+        mock_instance.get_file_to_stream.assert_called_once_with(
+            'share', 'directory', 'file', 'stream', max_connections=1
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to