[ 
https://issues.apache.org/jira/browse/AIRFLOW-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683045#comment-16683045
 ] 

ASF GitHub Bot commented on AIRFLOW-3315:
-----------------------------------------

kaxil closed pull request #4161: [AIRFLOW-3315] Add ImapAttachmentSensor
URL: https://github.com/apache/incubator-airflow/pull/4161
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/imap_hook.py 
b/airflow/contrib/hooks/imap_hook.py
index c0b3126a8f..f028d3dda0 100644
--- a/airflow/contrib/hooks/imap_hook.py
+++ b/airflow/contrib/hooks/imap_hook.py
@@ -1,16 +1,21 @@
 # -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import email
 import imaplib
diff --git a/airflow/contrib/sensors/imap_attachment_sensor.py 
b/airflow/contrib/sensors/imap_attachment_sensor.py
new file mode 100644
index 0000000000..c0eb9b6cd2
--- /dev/null
+++ b/airflow/contrib/sensors/imap_attachment_sensor.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.contrib.hooks.imap_hook import ImapHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class ImapAttachmentSensor(BaseSensorOperator):
+    """
+    Waits for a specific attachment on a mail server.
+
+    :param attachment_name: The name of the attachment that will be checked.
+    :type attachment_name: str
+    :param check_regex: If set to True the attachment's name will be parsed as 
regular expression.
+                        Through this you can get a broader set of attachments
+                        that it will look for than just only the equality of 
the attachment name.
+                        The default value is False.
+    :type check_regex: bool
+    :param mail_folder: The mail folder in where to search for the attachment.
+                        The default value is 'INBOX'.
+    :type mail_folder: str
+    :param conn_id: The connection to run the sensor against.
+                    The default value is 'imap_default'.
+    :type conn_id: str
+    """
+    template_fields = ('attachment_name',)
+
+    @apply_defaults
+    def __init__(self,
+                 attachment_name,
+                 mail_folder='INBOX',
+                 check_regex=False,
+                 conn_id='imap_default',
+                 *args,
+                 **kwargs):
+        super(ImapAttachmentSensor, self).__init__(*args, **kwargs)
+
+        self.attachment_name = attachment_name
+        self.mail_folder = mail_folder
+        self.check_regex = check_regex
+        self.conn_id = conn_id
+
+    def poke(self, context):
+        """
+        Pokes for a mail attachment on the mail server.
+
+        :param context: The context that is being provided when poking.
+        :type context: dict
+        :return: True if attachment with the given name is present and False 
if not.
+        :rtype: bool
+        """
+        self.log.info('Poking for %s', self.attachment_name)
+
+        with ImapHook(imap_conn_id=self.conn_id) as imap_hook:
+            return imap_hook.has_mail_attachment(
+                name=self.attachment_name,
+                mail_folder=self.mail_folder,
+                check_regex=self.check_regex
+            )
diff --git a/docs/code.rst b/docs/code.rst
index cca3b5ff77..1ea5de7626 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -226,6 +226,7 @@ Sensors
 .. autoclass:: 
airflow.contrib.sensors.gcs_sensor.GoogleCloudStoragePrefixSensor
 .. autoclass:: airflow.contrib.sensors.hdfs_sensor.HdfsSensorFolder
 .. autoclass:: airflow.contrib.sensors.hdfs_sensor.HdfsSensorRegex
+.. autoclass:: 
airflow.contrib.sensors.imap_attachment_sensor.ImapAttachmentSensor
 .. autoclass:: airflow.contrib.sensors.jira_sensor.JiraSensor
 .. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor
 .. autoclass:: airflow.contrib.sensors.qubole_sensor.QuboleSensor
diff --git a/tests/contrib/hooks/test_imap_hook.py 
b/tests/contrib/hooks/test_imap_hook.py
index 63dc13a874..b4e4ff3ed7 100644
--- a/tests/contrib/hooks/test_imap_hook.py
+++ b/tests/contrib/hooks/test_imap_hook.py
@@ -1,16 +1,21 @@
 # -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import imaplib
 import unittest
diff --git a/tests/contrib/sensors/test_imap_attachment_sensor.py 
b/tests/contrib/sensors/test_imap_attachment_sensor.py
new file mode 100644
index 0000000000..297de02a99
--- /dev/null
+++ b/tests/contrib/sensors/test_imap_attachment_sensor.py
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from mock import patch, Mock
+
+from airflow import configuration, models
+from airflow.contrib.sensors.imap_attachment_sensor import ImapAttachmentSensor
+from airflow.utils import db
+
+imap_hook_string = 'airflow.contrib.sensors.imap_attachment_sensor.ImapHook'
+
+
+class TestImapAttachmentSensor(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='imap_test',
+                host='base_url',
+                login='user',
+                password='password'
+            )
+        )
+
+    @patch(imap_hook_string)
+    def test_poke_with_attachment_found(self, mock_imap_hook):
+        mock_imap_hook.return_value.__enter__ = 
Mock(return_value=mock_imap_hook)
+        mock_imap_hook.has_mail_attachment.return_value = True
+
+        imap_attachment_sensor = ImapAttachmentSensor(
+            conn_id='imap_test',
+            attachment_name='test_attachment',
+            task_id='check_for_attachment_on_mail_server_test',
+            dag=None
+        )
+
+        self.assertTrue(imap_attachment_sensor.poke(context={}))
+        mock_imap_hook.has_mail_attachment.assert_called_once_with(
+            name='test_attachment',
+            mail_folder='INBOX',
+            check_regex=False
+        )
+
+    @patch(imap_hook_string)
+    def test_poke_with_attachment_not_found(self, mock_imap_hook):
+        mock_imap_hook.return_value.__enter__ = 
Mock(return_value=mock_imap_hook)
+        mock_imap_hook.has_mail_attachment.return_value = False
+
+        imap_attachment_sensor = ImapAttachmentSensor(
+            conn_id='imap_test',
+            attachment_name='test_attachment',
+            task_id='check_for_attachment_on_mail_server_test',
+            dag=None
+        )
+
+        self.assertFalse(imap_attachment_sensor.poke(context={}))
+        mock_imap_hook.has_mail_attachment.assert_called_once_with(
+            name='test_attachment',
+            mail_folder='INBOX',
+            check_regex=False
+        )
+
+    @patch(imap_hook_string)
+    def test_poke_with_check_regex_true(self, mock_imap_hook):
+        mock_imap_hook.return_value.__enter__ = 
Mock(return_value=mock_imap_hook)
+        mock_imap_hook.has_mail_attachment.return_value = True
+
+        imap_attachment_sensor = ImapAttachmentSensor(
+            conn_id='imap_test',
+            attachment_name='.*_test_attachment',
+            task_id='check_for_attachment_on_mail_server_test',
+            check_regex=True,
+            dag=None
+        )
+
+        self.assertTrue(imap_attachment_sensor.poke(context={}))
+        mock_imap_hook.has_mail_attachment.assert_called_once_with(
+            name='.*_test_attachment',
+            mail_folder='INBOX',
+            check_regex=True
+        )
+
+    @patch(imap_hook_string)
+    def test_poke_with_different_mail_folder(self, mock_imap_hook):
+        mock_imap_hook.return_value.__enter__ = 
Mock(return_value=mock_imap_hook)
+        mock_imap_hook.has_mail_attachment.return_value = True
+
+        imap_attachment_sensor = ImapAttachmentSensor(
+            conn_id='imap_test',
+            attachment_name='test_attachment',
+            task_id='check_for_attachment_on_mail_server_test',
+            mail_folder='test',
+            dag=None
+        )
+
+        self.assertTrue(imap_attachment_sensor.poke(context={}))
+        mock_imap_hook.has_mail_attachment.assert_called_once_with(
+            name='test_attachment',
+            mail_folder='test',
+            check_regex=False
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add ImapAttachmentSensor to poke for mail attachments
> -----------------------------------------------------
>
>                 Key: AIRFLOW-3315
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3315
>             Project: Apache Airflow
>          Issue Type: New Feature
>            Reporter: Felix Uellendall
>            Assignee: Felix Uellendall
>            Priority: Major
>
> This kind of sensor pokes a mail server for attachments in mails with a given 
> name.
> It will use the existing 
> [ImapHook|https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-2780]
>  to establish a connection to the mail server 
> and search for the attachment in all mails.
> If an attachment has been found it will immediately stop and return that an 
> attachment has been found for the given name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to