This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4887bcb16 Validate DAG owner to be a string (#23359)
c4887bcb16 is described below
commit c4887bcb162aab9f381e49cecc2f212600c493de
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Mon May 2 16:28:53 2022 +0530
Validate DAG owner to be a string (#23359)
non-string values raise `AttributeError` as `task.owner.lower` is called
with `task.owner` not being a string and the error is not passed as import
error failing silently. Raise explicit error will be helpful to the user.
closes: #23343
related: #23343
---
tests/cluster_policies/__init__.py | 3 +++
tests/dags_corrupted/test_nonstring_owner.py | 34 ++++++++++++++++++++++++++++
tests/models/test_dagbag.py | 21 +++++++++++++++++
3 files changed, 58 insertions(+)
diff --git a/tests/cluster_policies/__init__.py
b/tests/cluster_policies/__init__.py
index 989430626f..bab74cc5d7 100644
--- a/tests/cluster_policies/__init__.py
+++ b/tests/cluster_policies/__init__.py
@@ -27,6 +27,9 @@ from airflow.models.baseoperator import BaseOperator
# [START example_cluster_policy_rule]
def task_must_have_owners(task: BaseOperator):
+ if task.owner and not isinstance(task.owner, str):
+ raise AirflowClusterPolicyViolation(f'''owner should be a string.
Current value: {task.owner!r}''')
+
if not task.owner or task.owner.lower() == conf.get('operators',
'default_owner'):
raise AirflowClusterPolicyViolation(
f'''Task must have non-None non-default owner. Current value:
{task.owner}'''
diff --git a/tests/dags_corrupted/test_nonstring_owner.py
b/tests/dags_corrupted/test_nonstring_owner.py
new file mode 100644
index 0000000000..ace1e4ce0a
--- /dev/null
+++ b/tests/dags_corrupted/test_nonstring_owner.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.operators.empty import EmptyOperator
+
+with DAG(
+ dag_id="test_nonstring_owner",
+ schedule_interval="0 0 * * *",
+ start_date=datetime(2022, 1, 1),
+ dagrun_timeout=timedelta(minutes=60),
+ tags=["example"],
+ default_args={'owner': ['a']},
+) as dag:
+ run_this_last = EmptyOperator(
+ task_id="test_task",
+ )
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 8017be66f0..8ea7197a62 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -17,6 +17,7 @@
import inspect
import logging
import os
+import pathlib
import shutil
import sys
import textwrap
@@ -979,6 +980,26 @@ class TestDagBag:
}
assert expected_import_errors == dagbag.import_errors
+ @patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
+ def test_task_cluster_policy_nonstring_owner(self):
+ """
+ test that file processing results in import error when task does not
+ obey cluster policy and has owner whose type is not string.
+ """
+ TEST_DAGS_CORRUPTED_FOLDER =
pathlib.Path(__file__).parent.with_name('dags_corrupted')
+ dag_file = os.path.join(TEST_DAGS_CORRUPTED_FOLDER,
"test_nonstring_owner.py")
+
+ dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False,
include_examples=False)
+ assert set() == set(dagbag.dag_ids)
+ expected_import_errors = {
+ dag_file: (
+ f"""DAG policy violation (DAG ID: test_nonstring_owner, Path:
{dag_file}):\n"""
+ """Notices:\n"""
+ """ * owner should be a string. Current value: ['a']"""
+ )
+ }
+ assert expected_import_errors == dagbag.import_errors
+
@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
def test_task_cluster_policy_obeyed(self):
"""