This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 99a6bf7834 Fix ExternalTaskSensor cant check zipped dag (#27056)
99a6bf7834 is described below

commit 99a6bf783412432416813d1c4bb41052054dd5c6
Author: Chenglong Yan <[email protected]>
AuthorDate: Wed Nov 16 20:53:01 2022 +0800

    Fix ExternalTaskSensor cant check zipped dag (#27056)
    
    closes: #14264
---
 airflow/sensors/external_task.py                   |  3 +-
 .../test_external_task_sensor_check_existense.py   | 34 +++++++++++++++++
 tests/sensors/test_external_task_sensor.py         | 44 +++++++++++++++++++++-
 3 files changed, 79 insertions(+), 2 deletions(-)

diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 1c45a1790e..3b20531a60 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -33,6 +33,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.file import correct_maybe_zipped
 from airflow.utils.helpers import build_airflow_url_with_query
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
@@ -262,7 +263,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         if not dag_to_wait:
             raise AirflowException(f"The external DAG {self.external_dag_id} 
does not exist.")
 
-        if not os.path.exists(dag_to_wait.fileloc):
+        if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
             raise AirflowException(f"The external DAG {self.external_dag_id} 
was deleted.")
 
         if self.external_task_ids:
diff --git a/tests/dags/test_external_task_sensor_check_existense.py 
b/tests/dags/test_external_task_sensor_check_existense.py
new file mode 100644
index 0000000000..5f6c17bf52
--- /dev/null
+++ b/tests/dags/test_external_task_sensor_check_existense.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.models import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.sensors.external_task import ExternalTaskSensor
+from tests.models import DEFAULT_DATE
+
+with DAG(dag_id="test_external_task_sensor_check_existence_ext", 
start_date=DEFAULT_DATE) as dag1:
+    EmptyOperator(task_id="empty")
+
+with DAG(dag_id="test_external_task_sensor_check_existence", 
start_date=DEFAULT_DATE) as dag2:
+    ExternalTaskSensor(
+        task_id="external_task_sensor",
+        external_dag_id="test_external_task_sensor_check_existence_ext",
+        external_task_id="empty",
+        check_existence=True,
+    )
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index ad21726496..81ef494836 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -17,8 +17,12 @@
 # under the License.
 from __future__ import annotations
 
+import hashlib
 import logging
+import os
+import tempfile
 import unittest
+import zipfile
 from datetime import time, timedelta
 
 import pytest
@@ -33,11 +37,12 @@ from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskMarker, 
ExternalTaskSensor, ExternalTaskSensorLink
 from airflow.sensors.time_sensor import TimeSensor
 from airflow.serialization.serialized_objects import SerializedBaseOperator
-from airflow.utils.session import provide_session
+from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
+from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils.db import clear_db_runs
 
 DEFAULT_DATE = datetime(2015, 1, 1)
@@ -53,6 +58,35 @@ def clean_db():
     clear_db_runs()
 
 
[email protected]
+def dag_zip_maker():
+    class DagZipMaker:
+        def __call__(self, *dag_files):
+            self.__dag_files = [os.sep.join([TEST_DAGS_FOLDER.__str__(), 
dag_file]) for dag_file in dag_files]
+            dag_files_hash = 
hashlib.md5("".join(self.__dag_files).encode()).hexdigest()
+            self.__tmp_dir = os.sep.join([tempfile.tempdir, dag_files_hash])
+
+            self.__zip_file_name = os.sep.join([self.__tmp_dir, 
f"{dag_files_hash}.zip"])
+
+            if not os.path.exists(self.__tmp_dir):
+                os.mkdir(self.__tmp_dir)
+            return self
+
+        def __enter__(self):
+            with zipfile.ZipFile(self.__zip_file_name, "x") as zf:
+                for dag_file in self.__dag_files:
+                    zf.write(dag_file, os.path.basename(dag_file))
+            dagbag = DagBag(dag_folder=self.__tmp_dir, include_examples=False)
+            dagbag.sync_to_db()
+            return dagbag
+
+        def __exit__(self, exc_type, exc_val, exc_tb):
+            os.unlink(self.__zip_file_name)
+            os.rmdir(self.__tmp_dir)
+
+    yield DagZipMaker()
+
+
 class TestExternalTaskSensor(unittest.TestCase):
     def setUp(self):
         self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
@@ -600,6 +634,14 @@ exit 0
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
 
+def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker):
+    with dag_zip_maker("test_external_task_sensor_check_existense.py") as 
dagbag:
+        with create_session() as session:
+            dag = dagbag.dags["test_external_task_sensor_check_existence"]
+            op = dag.tasks[0]
+            op._check_for_existence(session)
+
+
 def test_external_task_sensor_templated(dag_maker, app):
     with dag_maker():
         ExternalTaskSensor(

Reply via email to