kaxil closed pull request #3749: [AIRFLOW-2900] Show code for packaged DAGs
URL: https://github.com/apache/incubator-airflow/pull/3749
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index 94e18794d6..ddf3094567 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -337,7 +337,8 @@ def process_file(self, filepath, only_if_updated=True,
safe_mode=True):
return found_dags
mods = []
- if not zipfile.is_zipfile(filepath):
+ is_zipfile = zipfile.is_zipfile(filepath)
+ if not is_zipfile:
if safe_mode and os.path.isfile(filepath):
with open(filepath, 'rb') as f:
content = f.read()
@@ -409,7 +410,7 @@ def process_file(self, filepath, only_if_updated=True,
safe_mode=True):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
- if dag.fileloc != filepath:
+ if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 9ce114d5ed..e85bc5909a 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -20,17 +20,21 @@
# flake8: noqa: E402
import inspect
from future import standard_library
-standard_library.install_aliases()
+standard_library.install_aliases() # noqa: E402
from builtins import str, object
from cgi import escape
from io import BytesIO as IO
import functools
import gzip
+import io
import json
+import os
+import re
import time
import wtforms
from wtforms.compat import text_type
+import zipfile
from flask import after_this_request, request, Response
from flask_admin.model import filters
@@ -372,6 +376,22 @@ def zipper(response):
return view_func
+def open_maybe_zipped(f, mode='r'):
+ """
+ Opens the given file. If the path contains a folder with a .zip suffix,
then
+ the folder is treated as a zip archive, opening the file inside the
archive.
+
+ :return: a file object, as in `open`, or as in `ZipFile.open`.
+ """
+
+ _, archive, filename = re.search(
+ r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
+ if archive and zipfile.is_zipfile(archive):
+ return zipfile.ZipFile(archive, mode=mode).open(filename)
+ else:
+ return io.open(f, mode=mode)
+
+
def make_cache_key(*args, **kwargs):
"""
Used by cache to get a unique key per URL
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e1a7caa8bb..aa2530e458 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -661,7 +661,7 @@ def code(self):
dag = dagbag.get_dag(dag_id)
title = dag_id
try:
- with open(dag.fileloc, 'r') as f:
+ with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
html_code = highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
diff --git a/airflow/www_rbac/utils.py b/airflow/www_rbac/utils.py
index a0e9258eae..0176a5312c 100644
--- a/airflow/www_rbac/utils.py
+++ b/airflow/www_rbac/utils.py
@@ -26,6 +26,10 @@
import wtforms
import bleach
import markdown
+import re
+import zipfile
+import os
+import io
from builtins import str
from past.builtins import basestring
@@ -202,6 +206,22 @@ def json_response(obj):
mimetype="application/json")
+def open_maybe_zipped(f, mode='r'):
+ """
+ Opens the given file. If the path contains a folder with a .zip suffix,
then
+ the folder is treated as a zip archive, opening the file inside the
archive.
+
+ :return: a file object, as in `open`, or as in `ZipFile.open`.
+ """
+
+ _, archive, filename = re.search(
+ r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
+ if archive and zipfile.is_zipfile(archive):
+ return zipfile.ZipFile(archive, mode=mode).open(filename)
+ else:
+ return io.open(f, mode=mode)
+
+
def make_cache_key(*args, **kwargs):
"""
Used by cache to get a unique key per URL
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index d011724cc6..3dc3400968 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -400,7 +400,7 @@ def code(self):
dag = dagbag.get_dag(dag_id)
title = dag_id
try:
- with open(dag.fileloc, 'r') as f:
+ with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
html_code = highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index 9034b8b5fd..a06d6b066a 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -195,6 +195,40 @@ def some_func():
self.assertEqual(anonymous_username, kwargs['owner'])
mocked_session_instance.add.assert_called_once()
+ def test_open_maybe_zipped_normal_file(self):
+ with mock.patch(
+ 'io.open', mock.mock_open(read_data="data")) as mock_file:
+ utils.open_maybe_zipped('/path/to/some/file.txt')
+ mock_file.assert_called_with('/path/to/some/file.txt', mode='r')
+
+ def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
+ path = '/path/to/fakearchive.zip.other/file.txt'
+ with mock.patch(
+ 'io.open', mock.mock_open(read_data="data")) as mock_file:
+ utils.open_maybe_zipped(path)
+ mock_file.assert_called_with(path, mode='r')
+
+ @mock.patch("zipfile.is_zipfile")
+ @mock.patch("zipfile.ZipFile")
+ def test_open_maybe_zipped_archive(self, mocked_ZipFile,
mocked_is_zipfile):
+ mocked_is_zipfile.return_value = True
+ instance = mocked_ZipFile.return_value
+ instance.open.return_value = mock.mock_open(read_data="data")
+
+ utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')
+
+ mocked_is_zipfile.assert_called_once()
+ (args, kwargs) = mocked_is_zipfile.call_args_list[0]
+ self.assertEqual('/path/to/archive.zip', args[0])
+
+ mocked_ZipFile.assert_called_once()
+ (args, kwargs) = mocked_ZipFile.call_args_list[0]
+ self.assertEqual('/path/to/archive.zip', args[0])
+
+ instance.open.assert_called_once()
+ (args, kwargs) = instance.open.call_args_list[0]
+ self.assertEqual('deep/path/to/file.txt', args[0])
+
def test_get_python_source_from_method(self):
class AMockClass(object):
def a_method(self):
diff --git a/tests/www_rbac/test_utils.py b/tests/www_rbac/test_utils.py
index 1879ba0826..68d1744ab8 100644
--- a/tests/www_rbac/test_utils.py
+++ b/tests/www_rbac/test_utils.py
@@ -18,6 +18,7 @@
# under the License.
import unittest
+import mock
from xml.dom import minidom
from airflow.www_rbac import utils
@@ -113,6 +114,40 @@ def test_params_all(self):
self.assertEqual('page=3&search=bash_&showPaused=False',
utils.get_params(showPaused=False, page=3,
search='bash_'))
+ def test_open_maybe_zipped_normal_file(self):
+ with mock.patch(
+ 'io.open', mock.mock_open(read_data="data")) as mock_file:
+ utils.open_maybe_zipped('/path/to/some/file.txt')
+ mock_file.assert_called_with('/path/to/some/file.txt', mode='r')
+
+ def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
+ path = '/path/to/fakearchive.zip.other/file.txt'
+ with mock.patch(
+ 'io.open', mock.mock_open(read_data="data")) as mock_file:
+ utils.open_maybe_zipped(path)
+ mock_file.assert_called_with(path, mode='r')
+
+ @mock.patch("zipfile.is_zipfile")
+ @mock.patch("zipfile.ZipFile")
+ def test_open_maybe_zipped_archive(self, mocked_ZipFile,
mocked_is_zipfile):
+ mocked_is_zipfile.return_value = True
+ instance = mocked_ZipFile.return_value
+ instance.open.return_value = mock.mock_open(read_data="data")
+
+ utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')
+
+ mocked_is_zipfile.assert_called_once()
+ (args, kwargs) = mocked_is_zipfile.call_args_list[0]
+ self.assertEqual('/path/to/archive.zip', args[0])
+
+ mocked_ZipFile.assert_called_once()
+ (args, kwargs) = mocked_ZipFile.call_args_list[0]
+ self.assertEqual('/path/to/archive.zip', args[0])
+
+ instance.open.assert_called_once()
+ (args, kwargs) = instance.open.call_args_list[0]
+ self.assertEqual('deep/path/to/file.txt', args[0])
+
if __name__ == '__main__':
unittest.main()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services