Repository: incubator-airflow Updated Branches: refs/heads/master eaa03dbc7 -> 3dade5413
[AIRFLOW-2559] Azure Fileshare hook Closes #3457 from NielsZeilemaker/fileshare_hook Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3dade541 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3dade541 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3dade541 Branch: refs/heads/master Commit: 3dade5413f0a3c2028f12fb6f9d7092b9db2eb83 Parents: eaa03db Author: niels <[email protected]> Authored: Mon Jun 18 22:23:53 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Mon Jun 18 22:23:53 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/__init__.py | 1 + airflow/contrib/hooks/azure_fileshare_hook.py | 212 +++++++++++++++++++ docs/code.rst | 2 + docs/integration.rst | 13 ++ .../contrib/hooks/test_azure_fileshare_hook.py | 163 ++++++++++++++ 5 files changed, 391 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/airflow/contrib/hooks/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index 29a52da..95f9892 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -60,6 +60,7 @@ _hooks = { 'jenkins_hook': ['JenkinsHook'], 'aws_dynamodb_hook': ['AwsDynamoDBHook'], 'azure_data_lake_hook': ['AzureDataLakeHook'], + 'azure_fileshare_hook': ['AzureFileShareHook'], } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/airflow/contrib/hooks/azure_fileshare_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/azure_fileshare_hook.py b/airflow/contrib/hooks/azure_fileshare_hook.py new file mode 100644 index 0000000..edabc17 --- /dev/null +++ b/airflow/contrib/hooks/azure_fileshare_hook.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.hooks.base_hook import BaseHook +from azure.storage.file import FileService + + +class AzureFileShareHook(BaseHook): + """ + Interacts with Azure FileShare Storage. + + Additional options passed in the 'extra' field of the connection will be + passed to the `FileService()` constructor. + + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + """ + + def __init__(self, wasb_conn_id='wasb_default'): + self.conn_id = wasb_conn_id + self.connection = self.get_conn() + + def get_conn(self): + """Return the FileService object.""" + conn = self.get_connection(self.conn_id) + service_options = conn.extra_dejson + return FileService(account_name=conn.login, + account_key=conn.password, **service_options) + + def check_for_directory(self, share_name, directory_name, **kwargs): + """ + Check if a directory exists on Azure File Share. + + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param kwargs: Optional keyword arguments that + `FileService.exists()` takes. + :type kwargs: object + :return: True if the file exists, False otherwise. + :rtype bool + """ + return self.connection.exists(share_name, directory_name, + **kwargs) + + def check_for_file(self, share_name, directory_name, file_name, **kwargs): + """ + Check if a file exists on Azure File Share. + + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param file_name: Name of the file. + :type file_name: str + :param kwargs: Optional keyword arguments that + `FileService.exists()` takes. + :type kwargs: object + :return: True if the file exists, False otherwise. + :rtype bool + """ + return self.connection.exists(share_name, directory_name, + file_name, **kwargs) + + def list_directories_and_files(self, share_name, directory_name=None, **kwargs): + """ + Return the list of directories and files stored on a Azure File Share. + + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param kwargs: Optional keyword arguments that + `FileService.list_directories_and_files()` takes. + :type kwargs: object + :return: A list of files and directories + :rtype list + """ + return self.connection.list_directories_and_files(share_name, + directory_name, + **kwargs) + + def create_directory(self, share_name, directory_name, **kwargs): + """ + Create a new direcotry on a Azure File Share. + + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param kwargs: Optional keyword arguments that + `FileService.create_directory()` takes. + :type kwargs: object + :return: A list of files and directories + :rtype list + """ + return self.connection.create_directory(share_name, directory_name, **kwargs) + + def get_file(self, file_path, share_name, directory_name, file_name, **kwargs): + """ + Download a file from Azure File Share. + + :param file_path: Where to store the file. + :type file_path: str + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param file_name: Name of the file. + :type file_name: str + :param kwargs: Optional keyword arguments that + `FileService.get_file_to_path()` takes. + :type kwargs: object + """ + self.connection.get_file_to_path(share_name, directory_name, + file_name, file_path, **kwargs) + + def get_file_to_stream(self, stream, share_name, directory_name, file_name, **kwargs): + """ + Download a file from Azure File Share. + + :param stream: A filehandle to store the file to. + :type stream: file-like object + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param file_name: Name of the file. + :type file_name: str + :param kwargs: Optional keyword arguments that + `FileService.get_file_to_stream()` takes. + :type kwargs: object + """ + self.connection.get_file_to_stream(share_name, directory_name, + file_name, stream, **kwargs) + + def load_file(self, file_path, share_name, directory_name, file_name, **kwargs): + """ + Upload a file to Azure File Share. + + :param file_path: Path to the file to load. + :type file_path: str + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param file_name: Name of the file. + :type file_name: str + :param kwargs: Optional keyword arguments that + `FileService.create_file_from_path()` takes. + :type kwargs: object + """ + self.connection.create_file_from_path(share_name, directory_name, + file_name, file_path, **kwargs) + + def load_string(self, string_data, share_name, directory_name, file_name, **kwargs): + """ + Upload a string to Azure File Share. + + :param string_data: String to load. + :type string_data: str + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param file_name: Name of the file. + :type file_name: str + :param kwargs: Optional keyword arguments that + `FileService.create_file_from_text()` takes. + :type kwargs: object + """ + self.connection.create_file_from_text(share_name, directory_name, + file_name, string_data, **kwargs) + + def load_stream(self, stream, share_name, directory_name, file_name, count, **kwargs): + """ + Upload a stream to Azure File Share. + + :param stream: Opened file/stream to upload as the file content. + :type stream: file-like + :param share_name: Name of the share. + :type share_name: str + :param directory_name: Name of the directory. + :type directory_name: str + :param file_name: Name of the file. + :type file_name: str + :param count: Size of the stream in bytes + :type count: int + :param kwargs: Optional keyword arguments that + `FileService.create_file_from_stream()` takes. + :type kwargs: object + """ + self.connection.create_file_from_stream(share_name, directory_name, + file_name, stream, count, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 6b3a84a..f055fc6 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -366,6 +366,8 @@ Community contributed hooks .. autoclass:: airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook .. autoclass:: airflow.contrib.hooks.aws_hook.AwsHook .. autoclass:: airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook +.. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook +.. autoclass:: airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook .. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook .. autoclass:: airflow.contrib.hooks.cassandra_hook.CassandraHook .. autoclass:: airflow.contrib.hooks.cloudant_hook.CloudantHook http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index 972726b..660b216 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -115,6 +115,19 @@ WasbHook .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook +Azure File Share +'''''''''''''''' + +Cloud variant of a SMB file share. Make sure that a Airflow connection of +type `wasb` exists. Authorization can be done by supplying a login (=Storage account name) +and password (=Storage account key), or login and SAS token in the extra field +(see connection `wasb_default` for an example). + +AzureFileShareHook +"""""""""""""""""" + +.. autoclass:: airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook + Logging ''''''' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3dade541/tests/contrib/hooks/test_azure_fileshare_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_azure_fileshare_hook.py b/tests/contrib/hooks/test_azure_fileshare_hook.py new file mode 100644 index 0000000..5803cd8 --- /dev/null +++ b/tests/contrib/hooks/test_azure_fileshare_hook.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +import json +import unittest + +from airflow import configuration +from airflow import models +from airflow.contrib.hooks.azure_fileshare_hook import AzureFileShareHook +from airflow.utils import db + + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestAzureFileshareHook(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + models.Connection( + conn_id='wasb_test_key', conn_type='wasb', + login='login', password='key' + ) + ) + db.merge_conn( + models.Connection( + conn_id='wasb_test_sas_token', conn_type='wasb', + login='login', extra=json.dumps({'sas_token': 'token'}) + ) + ) + + def test_key(self): + from azure.storage.file import FileService + hook = AzureFileShareHook(wasb_conn_id='wasb_test_key') + self.assertEqual(hook.conn_id, 'wasb_test_key') + self.assertIsInstance(hook.connection, FileService) + + def test_sas_token(self): + from azure.storage.file import FileService + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + self.assertEqual(hook.conn_id, 'wasb_test_sas_token') + self.assertIsInstance(hook.connection, FileService) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_check_for_file(self, mock_service): + mock_instance = mock_service.return_value + mock_instance.exists.return_value = True + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + self.assertTrue(hook.check_for_file('share', 'directory', 'file', timeout=3)) + mock_instance.exists.assert_called_once_with( + 'share', 'directory', 'file', timeout=3 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_check_for_directory(self, mock_service): + mock_instance = mock_service.return_value + mock_instance.exists.return_value = True + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + self.assertTrue(hook.check_for_directory('share', 'directory', timeout=3)) + mock_instance.exists.assert_called_once_with( + 'share', 'directory', timeout=3 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_load_file(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.load_file('path', 'share', 'directory', 'file', max_connections=1) + mock_instance.create_file_from_path.assert_called_once_with( + 'share', 'directory', 'file', 'path', max_connections=1 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_load_string(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.load_string('big string', 'share', 'directory', 'file', timeout=1) + mock_instance.create_file_from_text.assert_called_once_with( + 'share', 'directory', 'file', 'big string', timeout=1 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_load_stream(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.load_stream('stream', 'share', 'directory', 'file', 42, timeout=1) + mock_instance.create_file_from_stream.assert_called_once_with( + 'share', 'directory', 'file', 'stream', 42, timeout=1 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_list_directories_and_files(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.list_directories_and_files('share', 'directory', timeout=1) + mock_instance.list_directories_and_files.assert_called_once_with( + 'share', 'directory', timeout=1 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_create_directory(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.create_directory('share', 'directory', timeout=1) + mock_instance.create_directory.assert_called_once_with( + 'share', 'directory', timeout=1 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_get_file(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.get_file('path', 'share', 'directory', 'file', max_connections=1) + mock_instance.get_file_to_path.assert_called_once_with( + 'share', 'directory', 'file', 'path', max_connections=1 + ) + + @mock.patch('airflow.contrib.hooks.azure_fileshare_hook.FileService', + autospec=True) + def test_get_file_to_stream(self, mock_service): + mock_instance = mock_service.return_value + hook = AzureFileShareHook(wasb_conn_id='wasb_test_sas_token') + hook.get_file_to_stream('stream', 'share', 'directory', 'file', max_connections=1) + mock_instance.get_file_to_stream.assert_called_once_with( + 'share', 'directory', 'file', 'stream', max_connections=1 + ) + + +if __name__ == '__main__': + unittest.main()
