Repository: incubator-airflow Updated Branches: refs/heads/master f5462c78f -> f1bc5f38a
[AIRFLOW-1065] Add functionality for Azure Blob Storage over wasb:// This PR implements a hook to interface with Azure storage over wasb:// via azure-storage; adds sensors to check for blobs or prefixes; and adds an operator to transfer a local file to the Blob Storage. Design is similar to that of the S3Hook in airflow.operators.S3_hook. Closes #2216 from hgrif/AIRFLOW-1065 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f1bc5f38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f1bc5f38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f1bc5f38 Branch: refs/heads/master Commit: f1bc5f38ac88054270e8b1863d48e953b58a7c74 Parents: f5462c7 Author: Henk Griffioen <[email protected]> Authored: Wed Apr 5 09:56:23 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Apr 5 09:56:23 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/__init__.py | 3 +- airflow/contrib/hooks/wasb_hook.py | 94 +++++++++++++++ airflow/contrib/operators/__init__.py | 1 + airflow/contrib/operators/file_to_wasb.py | 61 ++++++++++ airflow/contrib/sensors/wasb_sensor.py | 97 +++++++++++++++ airflow/models.py | 4 + airflow/utils/db.py | 4 + docs/code.rst | 1 + docs/integration.rst | 50 ++++++++ scripts/ci/requirements.txt | 1 + setup.py | 2 + tests/contrib/hooks/test_wasb_hook.py | 113 ++++++++++++++++++ tests/contrib/operators/test_file_to_wasb.py | 88 ++++++++++++++ tests/contrib/sensors/test_wasb_sensor.py | 138 ++++++++++++++++++++++ tests/core.py | 1 + 15 files changed, 657 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/hooks/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index 19fc2b4..182a49f 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -44,7 +44,8 @@ _hooks = { 'gcp_dataflow_hook': ['DataFlowHook'], 'spark_submit_operator': ['SparkSubmitOperator'], 'cloudant_hook': ['CloudantHook'], - 'fs_hook': ['FSHook'] + 'fs_hook': ['FSHook'], + 'wasb_hook': ['WasbHook'] } import os as _os http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/hooks/wasb_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/wasb_hook.py b/airflow/contrib/hooks/wasb_hook.py new file mode 100644 index 0000000..89eaa5b --- /dev/null +++ b/airflow/contrib/hooks/wasb_hook.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from airflow.hooks.base_hook import BaseHook + +from azure.storage.blob import BlockBlobService + + +class WasbHook(BaseHook): + """ + Interacts with Azure Blob Storage through the wasb:// protocol. + + Additional options passed in the 'extra' field of the connection will be + passed to the `BlockBlockService()` constructor. For example, authenticate + using a SAS token by adding {"sas_token": "YOUR_TOKEN"}. + + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + """ + + def __init__(self, wasb_conn_id='wasb_default'): + self.conn_id = wasb_conn_id + self.connection = self.get_conn() + + def get_conn(self): + """Return the BlockBlobService object.""" + conn = self.get_connection(self.conn_id) + service_options = conn.extra_dejson + return BlockBlobService(account_name=conn.login, + account_key=conn.password, **service_options) + + def check_for_blob(self, container_name, blob_name, **kwargs): + """ + Check if a blob exists on Azure Blob Storage. + + :param container_name: Name of the container. + :type container_name: str + :param blob_name: Name of the blob. + :type blob_name: str + :param kwargs: Optional keyword arguments that + `BlockBlobService.exists()` takes. + :type kwargs: object + :return: True if the blob exists, False otherwise. + :rtype bool + """ + return self.connection.exists(container_name, blob_name, **kwargs) + + def check_for_prefix(self, container_name, prefix, **kwargs): + """ + Check if a prefix exists on Azure Blob storage. + + :param container_name: Name of the container. + :type container_name: str + :param prefix: Prefix of the blob. + :type prefix: str + :param kwargs: Optional keyword arguments that + `BlockBlobService.list_blobs()` takes. + :type kwargs: object + :return: True if blobs matching the prefix exist, False otherwise. + :rtype bool + """ + matches = self.connection.list_blobs(container_name, prefix, + num_results=1, **kwargs) + return len(list(matches)) > 0 + + def load_file(self, file_path, container_name, blob_name, **kwargs): + """ + Upload a file to Azure Blob Storage. + + :param file_path: Path to the file to load. + :type file_path: str + :param container_name: Name of the container. + :type container_name: str + :param blob_name: Name of the blob. + :type blob_name: str + :param kwargs: Optional keyword arguments that + `BlockBlobService.create_blob_from_path()` takes. + :type kwargs: object + """ + # Reorder the argument order from airflow.hooks.S3_hook.load_file. + self.connection.create_blob_from_path(container_name, blob_name, + file_path, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/operators/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py index bef3433..4ea6c17 100644 --- a/airflow/contrib/operators/__init__.py +++ b/airflow/contrib/operators/__init__.py @@ -37,6 +37,7 @@ _operators = { 'vertica_to_hive': ['VerticaToHiveTransfer'], 'qubole_operator': ['QuboleOperator'], 'spark_submit_operator': ['SparkSubmitOperator'], + 'file_to_wasb': ['FileToWasbOperator'], 'fs_operator': ['FileSensor'] } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/operators/file_to_wasb.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py new file mode 100644 index 0000000..32e6b29 --- /dev/null +++ b/airflow/contrib/operators/file_to_wasb.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +from airflow.contrib.hooks.wasb_hook import WasbHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class FileToWasbOperator(BaseOperator): + """ + Uploads a file to Azure Blob Storage. + + :param file_path: Path to the file to load. + :type file_path: str + :param container_name: Name of the container. + :type container_name: str + :param blob_name: Name of the blob. + :type blob_name: str + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + :param load_options: Optional keyword arguments that + `WasbHook.load_file()` takes. + :type load_options: dict + """ + template_fields = ('file_path', 'container_name', 'blob_name') + + @apply_defaults + def __init__(self, file_path, container_name, blob_name, + wasb_conn_id='wasb_default', load_options=None, *args, + **kwargs): + super(FileToWasbOperator, self).__init__(*args, **kwargs) + if load_options is None: + load_options = {} + self.file_path = file_path + self.container_name = container_name + self.blob_name = blob_name + self.wasb_conn_id = wasb_conn_id + self.load_options = load_options + + def execute(self, context): + """Upload a file to Azure Blob Storage.""" + hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + logging.info( + 'Uploading {self.file_path} to wasb://{self.container_name} as ' + '{self.blob_name}'.format(**locals())) + hook.load_file(self.file_path, self.container_name, self.blob_name, + **self.load_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/sensors/wasb_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py new file mode 100644 index 0000000..3f3d56c --- /dev/null +++ b/airflow/contrib/sensors/wasb_sensor.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +from airflow.contrib.hooks.wasb_hook import WasbHook +from airflow.operators.sensors import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class WasbBlobSensor(BaseSensorOperator): + """ + Waits for a blob to arrive on Azure Blob Storage. + + :param container_name: Name of the container. + :type container_name: str + :param blob_name: Name of the blob. + :type blob_name: str + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + :param check_options: Optional keyword arguments that + `WasbHook.check_for_blob()` takes. + :type check_options: dict + """ + + template_fields = ('container_name', 'blob_name') + + @apply_defaults + def __init__(self, container_name, blob_name, + wasb_conn_id='wasb_default', check_options=None, *args, + **kwargs): + super(WasbBlobSensor, self).__init__(*args, **kwargs) + if check_options is None: + check_options = {} + self.wasb_conn_id = wasb_conn_id + self.container_name = container_name + self.blob_name = blob_name + self.check_options = check_options + + def poke(self, context): + logging.info( + 'Poking for blob: {self.blob_name}\n' + 'in wasb://{self.container_name}'.format(**locals()) + ) + hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + return hook.check_for_blob(self.container_name, self.blob_name, + **self.check_options) + + +class WasbPrefixSensor(BaseSensorOperator): + """ + Waits for blobs matching a prefix to arrive on Azure Blob Storage. + + :param container_name: Name of the container. + :type container_name: str + :param prefix: Prefix of the blob. + :type prefix: str + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + :param check_options: Optional keyword arguments that + `WasbHook.check_for_prefix()` takes. + :type check_options: dict + """ + + template_fields = ('container_name', 'prefix') + + @apply_defaults + def __init__(self, container_name, prefix, wasb_conn_id='wasb_default', + check_options=None, *args, **kwargs): + super(WasbPrefixSensor, self).__init__(*args, **kwargs) + if check_options is None: + check_options = {} + self.wasb_conn_id = wasb_conn_id + self.container_name = container_name + self.prefix = prefix + self.check_options = check_options + + def poke(self, context): + logging.info( + 'Poking for prefix: {self.prefix}\n' + 'in wasb://{self.container_name}'.format(**locals()) + ) + hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + return hook.check_for_prefix(self.container_name, self.prefix, + **self.check_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 7171c05..8628100 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -542,6 +542,7 @@ class Connection(Base): ('mesos_framework-id', 'Mesos Framework ID'), ('jira', 'JIRA',), ('redis', 'Redis',), + ('wasb', 'Azure Blob Storage'), ] def __init__( @@ -674,6 +675,9 @@ class Connection(Base): elif self.conn_type == 'redis': from airflow.contrib.hooks.redis_hook import RedisHook return RedisHook(redis_conn_id=self.conn_id) + elif self.conn_type == 'wasb': + from airflow.contrib.hooks.wasb_hook import WasbHook + return WasbHook(wasb_conn_id=self.conn_id) except: pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index a619a41..7da9217 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -171,6 +171,10 @@ def initdb(): host='localhost', port=5433)) merge_conn( models.Connection( + conn_id='wasb_default', conn_type='wasb', + extra='{"sas_token": null}')) + merge_conn( + models.Connection( conn_id='webhdfs_default', conn_type='hdfs', host='localhost', port=50070)) merge_conn( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index fabe6db..683e85f 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -98,6 +98,7 @@ Community-contributed Operators .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator .. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator .. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator +.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator .. autoclass:: airflow.contrib.operators.QuboleOperator .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index 10bc038..4a6b676 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -1,9 +1,59 @@ Integration =========== +- :ref:`Azure` - :ref:`AWS` - :ref:`GCP` +.. _Azure: + +Azure: Microsoft Azure +---------------------- + +Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob +Storage. Note that the Hook, Sensor and Operator are in the contrib section. + +Azure Blob Storage +'''''''''''''''''' + +All classes communicate via the Window Azure Storage Blob protocol. Make sure that a +Airflow connection of type `wasb` exists. Authorization can be done by supplying a +login (=Storage account name) and password (=KEY), or login and SAS token in the extra +field (see connection `wasb_default` for an example). + +- :ref:`WasbBlobSensor`: Checks if a blob is present on Azure Blob storage. +- :ref:`WasbPrefixSensor`: Checks if blobs matching a prefix are present on Azure Blob storage. +- :ref:`FileToWasbOperator`: Uploads a local file to a container as a blob. +- :ref:`WasbHook`: Interface with Azure Blob Storage. + +.. _WasbBlobSensor: + +WasbBlobSensor +""""""""""""""" + +.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor + +.. _WasbPrefixSensor: + +WasbPrefixSensor +""""""""""""""""" + +.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor + +.. _FileToWasbOperator: + +FileToWasbOperator +""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator + +.. _WasbHook: + +WasbHook +""""""""" + +.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook + .. _AWS: AWS: Amazon Webservices http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/scripts/ci/requirements.txt ---------------------------------------------------------------------- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index d206f16..1905398 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -1,4 +1,5 @@ alembic +azure-storage>=0.34.0 bcrypt bleach boto http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index a582499..ea60dca 100644 --- a/setup.py +++ b/setup.py @@ -104,6 +104,7 @@ async = [ 'eventlet>= 0.9.7', 'gevent>=0.13' ] +azure = ['azure-storage>=0.34.0'] celery = [ 'celery>=3.1.17', 'flower>=0.7.3' @@ -237,6 +238,7 @@ def do_setup(): 'all': devel_all, 'all_dbs': all_dbs, 'async': async, + 'azure': azure, 'celery': celery, 'cgroups': cgroups, 'cloudant': cloudant, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/hooks/test_wasb_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_wasb_hook.py b/tests/contrib/hooks/test_wasb_hook.py new file mode 100644 index 0000000..aa92937 --- /dev/null +++ b/tests/contrib/hooks/test_wasb_hook.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import json +import unittest + +from airflow import configuration +from airflow import models +from airflow.contrib.hooks.wasb_hook import WasbHook +from airflow.utils import db + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestWasbHook(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + models.Connection( + conn_id='wasb_test_key', conn_type='wasb', + login='login', password='key' + ) + ) + db.merge_conn( + models.Connection( + conn_id='wasb_test_sas_token', conn_type='wasb', + login='login', extra=json.dumps({'sas_token': 'token'}) + ) + ) + + def test_key(self): + from azure.storage.blob import BlockBlobService + hook = WasbHook(wasb_conn_id='wasb_test_key') + self.assertEqual(hook.conn_id, 'wasb_test_key') + self.assertIsInstance(hook.connection, BlockBlobService) + + def test_sas_token(self): + from azure.storage.blob import BlockBlobService + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + self.assertEqual(hook.conn_id, 'wasb_test_sas_token') + self.assertIsInstance(hook.connection, BlockBlobService) + + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_check_for_blob(self, mock_service): + mock_instance = mock_service.return_value + mock_instance.exists.return_value = True + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + self.assertTrue(hook.check_for_blob('container', 'blob', timeout=3)) + mock_instance.exists.assert_called_once_with( + 'container', 'blob', timeout=3 + ) + + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_check_for_blob_empty(self, mock_service): + mock_service.return_value.exists.return_value = False + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + self.assertFalse(hook.check_for_blob('container', 'blob')) + + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_check_for_prefix(self, mock_service): + mock_instance = mock_service.return_value + mock_instance.list_blobs.return_value = iter(['blob_1']) + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + self.assertTrue(hook.check_for_prefix('container', 'prefix', + timeout=3)) + mock_instance.list_blobs.assert_called_once_with( + 'container', 'prefix', timeout=3 + ) + + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_check_for_prefix_empty(self, mock_service): + mock_instance = mock_service.return_value + mock_instance.list_blobs.return_value = iter([]) + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + self.assertFalse(hook.check_for_prefix('container', 'prefix')) + + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_check_for_prefix(self, mock_service): + mock_instance = mock_service.return_value + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + hook.load_file('path', 'container', 'blob', max_connections=1) + mock_instance.create_blob_from_path.assert_called_once_with( + 'container', 'blob', 'path', max_connections=1 + ) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/operators/test_file_to_wasb.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_file_to_wasb.py b/tests/contrib/operators/test_file_to_wasb.py new file mode 100644 index 0000000..bdaeb79 --- /dev/null +++ b/tests/contrib/operators/test_file_to_wasb.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import unittest + +from airflow import DAG, configuration +from airflow.contrib.operators.file_to_wasb import FileToWasbOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestFileToWasbOperator(unittest.TestCase): + + _config = { + 'file_path': 'file', + 'container_name': 'container', + 'blob_name': 'blob', + 'wasb_conn_id': 'wasb_default', + 'retries': 3, + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2017, 1, 1) + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_init(self): + operator = FileToWasbOperator( + task_id='wasb_operator', + dag=self.dag, + **self._config + ) + self.assertEqual(operator.file_path, self._config['file_path']) + self.assertEqual(operator.container_name, + self._config['container_name']) + self.assertEqual(operator.blob_name, self._config['blob_name']) + self.assertEqual(operator.wasb_conn_id, self._config['wasb_conn_id']) + self.assertEqual(operator.load_options, {}) + self.assertEqual(operator.retries, self._config['retries']) + + operator = FileToWasbOperator( + task_id='wasb_operator', + dag=self.dag, + load_options={'timeout': 2}, + **self._config + ) + self.assertEqual(operator.load_options, {'timeout': 2}) + + @mock.patch('airflow.contrib.operators.file_to_wasb.WasbHook', + autospec=True) + def test_execute(self, mock_hook): + mock_instance = mock_hook.return_value + operator = FileToWasbOperator( + task_id='wasb_sensor', + dag=self.dag, + load_options={'timeout': 2}, + **self._config + ) + operator.execute(None) + mock_instance.load_file.assert_called_once_with( + 'file', 'container', 'blob', timeout=2 + ) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/sensors/test_wasb_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_wasb_sensor.py b/tests/contrib/sensors/test_wasb_sensor.py new file mode 100644 index 0000000..a26ba2d --- /dev/null +++ b/tests/contrib/sensors/test_wasb_sensor.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import unittest + +from airflow import DAG, configuration +from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor +from airflow.contrib.sensors.wasb_sensor import WasbPrefixSensor + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestWasbBlobSensor(unittest.TestCase): + + _config = { + 'container_name': 'container', + 'blob_name': 'blob', + 'wasb_conn_id': 'conn_id', + 'timeout': 100, + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2017, 1, 1) + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_init(self): + sensor = WasbBlobSensor( + task_id='wasb_sensor', + dag=self.dag, + **self._config + ) + self.assertEqual(sensor.container_name, self._config['container_name']) + self.assertEqual(sensor.blob_name, self._config['blob_name']) + self.assertEqual(sensor.wasb_conn_id, self._config['wasb_conn_id']) + self.assertEqual(sensor.check_options, {}) + self.assertEqual(sensor.timeout, self._config['timeout']) + + sensor = WasbBlobSensor( + task_id='wasb_sensor', + dag=self.dag, + check_options={'timeout': 2}, + **self._config + ) + self.assertEqual(sensor.check_options, {'timeout': 2}) + + @mock.patch('airflow.contrib.sensors.wasb_sensor.WasbHook', + autospec=True) + def test_poke(self, mock_hook): + mock_instance = mock_hook.return_value + sensor = WasbBlobSensor( + task_id='wasb_sensor', + dag=self.dag, + check_options={'timeout': 2}, + **self._config + ) + sensor.poke(None) + mock_instance.check_for_blob.assert_called_once_with( + 'container', 'blob', timeout=2 + ) + + +class TestWasbPrefixSensor(unittest.TestCase): + + _config = { + 'container_name': 'container', + 'prefix': 'prefix', + 'wasb_conn_id': 'conn_id', + 'timeout': 100, + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2017, 1, 1) + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_init(self): + sensor = WasbPrefixSensor( + task_id='wasb_sensor', + dag=self.dag, + **self._config + ) + self.assertEqual(sensor.container_name, self._config['container_name']) + self.assertEqual(sensor.prefix, self._config['prefix']) + self.assertEqual(sensor.wasb_conn_id, self._config['wasb_conn_id']) + self.assertEqual(sensor.check_options, {}) + self.assertEqual(sensor.timeout, self._config['timeout']) + + sensor = WasbPrefixSensor( + task_id='wasb_sensor', + dag=self.dag, + check_options={'timeout': 2}, + **self._config + ) + self.assertEqual(sensor.check_options, {'timeout': 2}) + + @mock.patch('airflow.contrib.sensors.wasb_sensor.WasbHook', + autospec=True) + def test_poke(self, mock_hook): + mock_instance = mock_hook.return_value + sensor = WasbPrefixSensor( + task_id='wasb_sensor', + dag=self.dag, + check_options={'timeout': 2}, + **self._config + ) + sensor.poke(None) + mock_instance.check_for_prefix.assert_called_once_with( + 'container', 'prefix', timeout=2 + ) + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 8b3d1b8..a6bf613 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1111,6 +1111,7 @@ class CliTests(unittest.TestCase): self.assertIn(['mssql_default', 'mssql'], conns) self.assertIn(['mysql_default', 'mysql'], conns) self.assertIn(['postgres_default', 'postgres'], conns) + self.assertIn(['wasb_default', 'wasb'], conns) # Attempt to list connections with invalid cli args with mock.patch('sys.stdout',
