This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99a6bf7834 Fix ExternalTaskSensor cant check zipped dag (#27056)
99a6bf7834 is described below
commit 99a6bf783412432416813d1c4bb41052054dd5c6
Author: Chenglong Yan <[email protected]>
AuthorDate: Wed Nov 16 20:53:01 2022 +0800
Fix ExternalTaskSensor cant check zipped dag (#27056)
closes: #14264
---
airflow/sensors/external_task.py | 3 +-
.../test_external_task_sensor_check_existense.py | 34 +++++++++++++++++
tests/sensors/test_external_task_sensor.py | 44 +++++++++++++++++++++-
3 files changed, 79 insertions(+), 2 deletions(-)
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 1c45a1790e..3b20531a60 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -33,6 +33,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.file import correct_maybe_zipped
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.session import provide_session
from airflow.utils.state import State
@@ -262,7 +263,7 @@ class ExternalTaskSensor(BaseSensorOperator):
if not dag_to_wait:
raise AirflowException(f"The external DAG {self.external_dag_id}
does not exist.")
- if not os.path.exists(dag_to_wait.fileloc):
+ if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
raise AirflowException(f"The external DAG {self.external_dag_id}
was deleted.")
if self.external_task_ids:
diff --git a/tests/dags/test_external_task_sensor_check_existense.py
b/tests/dags/test_external_task_sensor_check_existense.py
new file mode 100644
index 0000000000..5f6c17bf52
--- /dev/null
+++ b/tests/dags/test_external_task_sensor_check_existense.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.models import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.sensors.external_task import ExternalTaskSensor
+from tests.models import DEFAULT_DATE
+
+with DAG(dag_id="test_external_task_sensor_check_existence_ext",
start_date=DEFAULT_DATE) as dag1:
+ EmptyOperator(task_id="empty")
+
+with DAG(dag_id="test_external_task_sensor_check_existence",
start_date=DEFAULT_DATE) as dag2:
+ ExternalTaskSensor(
+ task_id="external_task_sensor",
+ external_dag_id="test_external_task_sensor_check_existence_ext",
+ external_task_id="empty",
+ check_existence=True,
+ )
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index ad21726496..81ef494836 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -17,8 +17,12 @@
# under the License.
from __future__ import annotations
+import hashlib
import logging
+import os
+import tempfile
import unittest
+import zipfile
from datetime import time, timedelta
import pytest
@@ -33,11 +37,12 @@ from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker,
ExternalTaskSensor, ExternalTaskSensorLink
from airflow.sensors.time_sensor import TimeSensor
from airflow.serialization.serialized_objects import SerializedBaseOperator
-from airflow.utils.session import provide_session
+from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.task_group import TaskGroup
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
+from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.db import clear_db_runs
DEFAULT_DATE = datetime(2015, 1, 1)
@@ -53,6 +58,35 @@ def clean_db():
clear_db_runs()
[email protected]
+def dag_zip_maker():
+ class DagZipMaker:
+ def __call__(self, *dag_files):
+ self.__dag_files = [os.sep.join([TEST_DAGS_FOLDER.__str__(),
dag_file]) for dag_file in dag_files]
+ dag_files_hash =
hashlib.md5("".join(self.__dag_files).encode()).hexdigest()
+ self.__tmp_dir = os.sep.join([tempfile.tempdir, dag_files_hash])
+
+ self.__zip_file_name = os.sep.join([self.__tmp_dir,
f"{dag_files_hash}.zip"])
+
+ if not os.path.exists(self.__tmp_dir):
+ os.mkdir(self.__tmp_dir)
+ return self
+
+ def __enter__(self):
+ with zipfile.ZipFile(self.__zip_file_name, "x") as zf:
+ for dag_file in self.__dag_files:
+ zf.write(dag_file, os.path.basename(dag_file))
+ dagbag = DagBag(dag_folder=self.__tmp_dir, include_examples=False)
+ dagbag.sync_to_db()
+ return dagbag
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ os.unlink(self.__zip_file_name)
+ os.rmdir(self.__tmp_dir)
+
+ yield DagZipMaker()
+
+
class TestExternalTaskSensor(unittest.TestCase):
def setUp(self):
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
@@ -600,6 +634,14 @@ exit 0
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker):
+ with dag_zip_maker("test_external_task_sensor_check_existense.py") as
dagbag:
+ with create_session() as session:
+ dag = dagbag.dags["test_external_task_sensor_check_existence"]
+ op = dag.tasks[0]
+ op._check_for_existence(session)
+
+
def test_external_task_sensor_templated(dag_maker, app):
with dag_maker():
ExternalTaskSensor(