Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4d153ad4e -> dc38b2f46


[AIRFLOW-2613] Fix Airflow searching .zip bug

When Airflow was populating a DagBag from a .zip
file, if a single
file in the root directory did not contain the
strings 'airflow' and
'DAG' it would ignore the entire .zip file.

Also added a small amount of logging to not
bombard user with info
about skipping their .py files.

Closes #3505 from Noremac201/dag_name


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dc38b2f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dc38b2f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dc38b2f4

Branch: refs/heads/master
Commit: dc38b2f46dcb5fb9887385a1269ae56198225316
Parents: 4d153ad
Author: Cameron Moberg <[email protected]>
Authored: Sun Jun 17 19:16:12 2018 +0100
Committer: Kaxil Naik <[email protected]>
Committed: Sun Jun 17 19:16:12 2018 +0100

----------------------------------------------------------------------
 airflow/models.py       |  18 +++++++++++++++++-
 docs/concepts.rst       |   3 +++
 docs/faq.rst            |   5 +++++
 tests/dags/test_zip.zip | Bin 1676 -> 2204 bytes
 tests/models.py         |  15 +++++++++++++++
 5 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc38b2f4/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index cef6efc..eda1511 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -195,6 +195,10 @@ class DagBag(BaseDagBag, LoggingMixin):
     :param include_examples: whether to include the examples that ship
         with airflow or not
     :type include_examples: bool
+    :param has_logged: an instance boolean that gets flipped from False to 
True after a
+        file has been skipped. This is to prevent overloading the user with 
logging
+        messages about skipped files. Therefore only once per DagBag is a file 
logged
+        being skipped.
     """
 
     # static class variables to detetct dag cycle
@@ -219,6 +223,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.file_last_changed = {}
         self.executor = executor
         self.import_errors = {}
+        self.has_logged = False
 
         if include_examples:
             example_dag_folder = os.path.join(
@@ -297,6 +302,12 @@ class DagBag(BaseDagBag, LoggingMixin):
                     content = f.read()
                     if not all([s in content for s in (b'DAG', b'airflow')]):
                         self.file_last_changed[filepath] = 
file_last_changed_on_disk
+                        # Don't want to spam user with skip messages
+                        if not self.has_logged:
+                            self.has_logged = True
+                            self.log.info(
+                                "File %s assumed to contain no DAGs. 
Skipping.",
+                                filepath)
                         return found_dags
 
             self.log.debug("Importing %s", filepath)
@@ -333,7 +344,12 @@ class DagBag(BaseDagBag, LoggingMixin):
                                 self.file_last_changed[filepath] = (
                                     file_last_changed_on_disk)
                                 # todo: create ignore list
-                                return found_dags
+                                # Don't want to spam user with skip messages
+                                if not self.has_logged:
+                                    self.has_logged = True
+                                    self.log.info(
+                                        "File %s assumed to contain no DAGs. 
Skipping.",
+                                        filepath)
 
                     if mod_name in sys.modules:
                         del sys.modules[mod_name]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc38b2f4/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 866f916..9b10224 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -35,6 +35,9 @@ the ``DAG`` objects. You can have as many DAGs as you want, 
each describing an
 arbitrary number of tasks. In general, each one should correspond to a single
 logical workflow.
 
+.. note:: When searching for DAGs, Airflow will only consider files where the 
string
+   "airflow" and "DAG" both appear in the contents of the ``.py`` file.
+
 Scope
 -----
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc38b2f4/docs/faq.rst
----------------------------------------------------------------------
diff --git a/docs/faq.rst b/docs/faq.rst
index 275c26b..4621208 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -15,6 +15,11 @@ Here are some of the common causes:
   may want to confirm that this works both where the scheduler runs as well
   as where the worker runs.
 
+- Does the file containing your DAG contain the string "airflow" and "DAG" 
somewhere
+  in the contents? When searching the DAG directory, Airflow ignores files not 
containing
+  "airflow" and "DAG" in order to prevent the DagBag parsing from importing 
all python
+  files collocated with user's DAGs.
+
 - Is your ``start_date`` set properly? The Airflow scheduler triggers the
   task soon after the ``start_date + scheduler_interval`` is passed.
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc38b2f4/tests/dags/test_zip.zip
----------------------------------------------------------------------
diff --git a/tests/dags/test_zip.zip b/tests/dags/test_zip.zip
index f6ab319..20c75a2 100644
Binary files a/tests/dags/test_zip.zip and b/tests/dags/test_zip.zip differ

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc38b2f4/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 63c41c7..d386817 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -1015,6 +1015,21 @@ class DagBagTest(unittest.TestCase):
         dagbag = models.DagBag(include_examples=True)
         self.assertEqual([], dagbag.process_file(f.name))
 
+    def test_zip_skip_log(self):
+        """
+        test the loading of a DAG from within a zip file that skips another 
file because
+        it doesn't have "airflow" and "DAG"
+        """
+        from mock import Mock
+        with patch('airflow.models.DagBag.log') as log_mock:
+            log_mock.info = Mock()
+            test_zip_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
+            dagbag = models.DagBag(dag_folder=test_zip_path, 
include_examples=False)
+
+            self.assertTrue(dagbag.has_logged)
+            log_mock.info.assert_any_call("File %s assumed to contain no DAGs. 
Skipping.",
+                                          test_zip_path)
+
     def test_zip(self):
         """
         test the loading of a DAG within a zip file that includes dependencies

Reply via email to