kaxil closed pull request #3749: [AIRFLOW-2900] Show code for packaged DAGs
URL: https://github.com/apache/incubator-airflow/pull/3749
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index 94e18794d6..ddf3094567 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -337,7 +337,8 @@ def process_file(self, filepath, only_if_updated=True, 
safe_mode=True):
             return found_dags
 
         mods = []
-        if not zipfile.is_zipfile(filepath):
+        is_zipfile = zipfile.is_zipfile(filepath)
+        if not is_zipfile:
             if safe_mode and os.path.isfile(filepath):
                 with open(filepath, 'rb') as f:
                     content = f.read()
@@ -409,7 +410,7 @@ def process_file(self, filepath, only_if_updated=True, 
safe_mode=True):
                 if isinstance(dag, DAG):
                     if not dag.full_filepath:
                         dag.full_filepath = filepath
-                        if dag.fileloc != filepath:
+                        if dag.fileloc != filepath and not is_zipfile:
                             dag.fileloc = filepath
                     try:
                         dag.is_subdag = False
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 9ce114d5ed..e85bc5909a 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -20,17 +20,21 @@
 # flake8: noqa: E402
 import inspect
 from future import standard_library
-standard_library.install_aliases()
+standard_library.install_aliases()  # noqa: E402
 from builtins import str, object
 
 from cgi import escape
 from io import BytesIO as IO
 import functools
 import gzip
+import io
 import json
+import os
+import re
 import time
 import wtforms
 from wtforms.compat import text_type
+import zipfile
 
 from flask import after_this_request, request, Response
 from flask_admin.model import filters
@@ -372,6 +376,22 @@ def zipper(response):
     return view_func
 
 
+def open_maybe_zipped(f, mode='r'):
+    """
+    Opens the given file. If the path contains a folder with a .zip suffix, 
then
+    the folder is treated as a zip archive, opening the file inside the 
archive.
+
+    :return: a file object, as in `open`, or as in `ZipFile.open`.
+    """
+
+    _, archive, filename = re.search(
+        r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
+    if archive and zipfile.is_zipfile(archive):
+        return zipfile.ZipFile(archive, mode=mode).open(filename)
+    else:
+        return io.open(f, mode=mode)
+
+
 def make_cache_key(*args, **kwargs):
     """
     Used by cache to get a unique key per URL
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e1a7caa8bb..aa2530e458 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -661,7 +661,7 @@ def code(self):
         dag = dagbag.get_dag(dag_id)
         title = dag_id
         try:
-            with open(dag.fileloc, 'r') as f:
+            with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
                 code = f.read()
             html_code = highlight(
                 code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
diff --git a/airflow/www_rbac/utils.py b/airflow/www_rbac/utils.py
index a0e9258eae..0176a5312c 100644
--- a/airflow/www_rbac/utils.py
+++ b/airflow/www_rbac/utils.py
@@ -26,6 +26,10 @@
 import wtforms
 import bleach
 import markdown
+import re
+import zipfile
+import os
+import io
 
 from builtins import str
 from past.builtins import basestring
@@ -202,6 +206,22 @@ def json_response(obj):
         mimetype="application/json")
 
 
+def open_maybe_zipped(f, mode='r'):
+    """
+    Opens the given file. If the path contains a folder with a .zip suffix, 
then
+    the folder is treated as a zip archive, opening the file inside the 
archive.
+
+    :return: a file object, as in `open`, or as in `ZipFile.open`.
+    """
+
+    _, archive, filename = re.search(
+        r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
+    if archive and zipfile.is_zipfile(archive):
+        return zipfile.ZipFile(archive, mode=mode).open(filename)
+    else:
+        return io.open(f, mode=mode)
+
+
 def make_cache_key(*args, **kwargs):
     """
     Used by cache to get a unique key per URL
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index d011724cc6..3dc3400968 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -400,7 +400,7 @@ def code(self):
         dag = dagbag.get_dag(dag_id)
         title = dag_id
         try:
-            with open(dag.fileloc, 'r') as f:
+            with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
                 code = f.read()
             html_code = highlight(
                 code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index 9034b8b5fd..a06d6b066a 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -195,6 +195,40 @@ def some_func():
                 self.assertEqual(anonymous_username, kwargs['owner'])
                 mocked_session_instance.add.assert_called_once()
 
+    def test_open_maybe_zipped_normal_file(self):
+        with mock.patch(
+                'io.open', mock.mock_open(read_data="data")) as mock_file:
+            utils.open_maybe_zipped('/path/to/some/file.txt')
+            mock_file.assert_called_with('/path/to/some/file.txt', mode='r')
+
+    def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
+        path = '/path/to/fakearchive.zip.other/file.txt'
+        with mock.patch(
+                'io.open', mock.mock_open(read_data="data")) as mock_file:
+            utils.open_maybe_zipped(path)
+            mock_file.assert_called_with(path, mode='r')
+
+    @mock.patch("zipfile.is_zipfile")
+    @mock.patch("zipfile.ZipFile")
+    def test_open_maybe_zipped_archive(self, mocked_ZipFile, 
mocked_is_zipfile):
+        mocked_is_zipfile.return_value = True
+        instance = mocked_ZipFile.return_value
+        instance.open.return_value = mock.mock_open(read_data="data")
+
+        utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')
+
+        mocked_is_zipfile.assert_called_once()
+        (args, kwargs) = mocked_is_zipfile.call_args_list[0]
+        self.assertEqual('/path/to/archive.zip', args[0])
+
+        mocked_ZipFile.assert_called_once()
+        (args, kwargs) = mocked_ZipFile.call_args_list[0]
+        self.assertEqual('/path/to/archive.zip', args[0])
+
+        instance.open.assert_called_once()
+        (args, kwargs) = instance.open.call_args_list[0]
+        self.assertEqual('deep/path/to/file.txt', args[0])
+
     def test_get_python_source_from_method(self):
         class AMockClass(object):
             def a_method(self):
diff --git a/tests/www_rbac/test_utils.py b/tests/www_rbac/test_utils.py
index 1879ba0826..68d1744ab8 100644
--- a/tests/www_rbac/test_utils.py
+++ b/tests/www_rbac/test_utils.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import unittest
+import mock
 from xml.dom import minidom
 
 from airflow.www_rbac import utils
@@ -113,6 +114,40 @@ def test_params_all(self):
         self.assertEqual('page=3&search=bash_&showPaused=False',
                          utils.get_params(showPaused=False, page=3, 
search='bash_'))
 
+    def test_open_maybe_zipped_normal_file(self):
+        with mock.patch(
+                'io.open', mock.mock_open(read_data="data")) as mock_file:
+            utils.open_maybe_zipped('/path/to/some/file.txt')
+            mock_file.assert_called_with('/path/to/some/file.txt', mode='r')
+
+    def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
+        path = '/path/to/fakearchive.zip.other/file.txt'
+        with mock.patch(
+                'io.open', mock.mock_open(read_data="data")) as mock_file:
+            utils.open_maybe_zipped(path)
+            mock_file.assert_called_with(path, mode='r')
+
+    @mock.patch("zipfile.is_zipfile")
+    @mock.patch("zipfile.ZipFile")
+    def test_open_maybe_zipped_archive(self, mocked_ZipFile, 
mocked_is_zipfile):
+        mocked_is_zipfile.return_value = True
+        instance = mocked_ZipFile.return_value
+        instance.open.return_value = mock.mock_open(read_data="data")
+
+        utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')
+
+        mocked_is_zipfile.assert_called_once()
+        (args, kwargs) = mocked_is_zipfile.call_args_list[0]
+        self.assertEqual('/path/to/archive.zip', args[0])
+
+        mocked_ZipFile.assert_called_once()
+        (args, kwargs) = mocked_ZipFile.call_args_list[0]
+        self.assertEqual('/path/to/archive.zip', args[0])
+
+        instance.open.assert_called_once()
+        (args, kwargs) = instance.open.call_args_list[0]
+        self.assertEqual('deep/path/to/file.txt', args[0])
+
 
 if __name__ == '__main__':
     unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to