This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c4887bcb16 Validate DAG owner to be a string (#23359)
c4887bcb16 is described below

commit c4887bcb162aab9f381e49cecc2f212600c493de
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Mon May 2 16:28:53 2022 +0530

    Validate DAG owner to be a string (#23359)
    
    non-string values raise `AttributeError` as `task.owner.lower` is called 
with `task.owner` not being a string and the error is not passed as import 
error failing silently. Raise explicit error will be helpful to the user.
    
    closes: #23343
    related: #23343
---
 tests/cluster_policies/__init__.py           |  3 +++
 tests/dags_corrupted/test_nonstring_owner.py | 34 ++++++++++++++++++++++++++++
 tests/models/test_dagbag.py                  | 21 +++++++++++++++++
 3 files changed, 58 insertions(+)

diff --git a/tests/cluster_policies/__init__.py 
b/tests/cluster_policies/__init__.py
index 989430626f..bab74cc5d7 100644
--- a/tests/cluster_policies/__init__.py
+++ b/tests/cluster_policies/__init__.py
@@ -27,6 +27,9 @@ from airflow.models.baseoperator import BaseOperator
 
 # [START example_cluster_policy_rule]
 def task_must_have_owners(task: BaseOperator):
+    if task.owner and not isinstance(task.owner, str):
+        raise AirflowClusterPolicyViolation(f'''owner should be a string. 
Current value: {task.owner!r}''')
+
     if not task.owner or task.owner.lower() == conf.get('operators', 
'default_owner'):
         raise AirflowClusterPolicyViolation(
             f'''Task must have non-None non-default owner. Current value: 
{task.owner}'''
diff --git a/tests/dags_corrupted/test_nonstring_owner.py 
b/tests/dags_corrupted/test_nonstring_owner.py
new file mode 100644
index 0000000000..ace1e4ce0a
--- /dev/null
+++ b/tests/dags_corrupted/test_nonstring_owner.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.operators.empty import EmptyOperator
+
+with DAG(
+    dag_id="test_nonstring_owner",
+    schedule_interval="0 0 * * *",
+    start_date=datetime(2022, 1, 1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+    default_args={'owner': ['a']},
+) as dag:
+    run_this_last = EmptyOperator(
+        task_id="test_task",
+    )
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 8017be66f0..8ea7197a62 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -17,6 +17,7 @@
 import inspect
 import logging
 import os
+import pathlib
 import shutil
 import sys
 import textwrap
@@ -979,6 +980,26 @@ class TestDagBag:
         }
         assert expected_import_errors == dagbag.import_errors
 
+    @patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
+    def test_task_cluster_policy_nonstring_owner(self):
+        """
+        test that file processing results in import error when task does not
+        obey cluster policy and has owner whose type is not string.
+        """
+        TEST_DAGS_CORRUPTED_FOLDER = 
pathlib.Path(__file__).parent.with_name('dags_corrupted')
+        dag_file = os.path.join(TEST_DAGS_CORRUPTED_FOLDER, 
"test_nonstring_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, 
include_examples=False)
+        assert set() == set(dagbag.dag_ids)
+        expected_import_errors = {
+            dag_file: (
+                f"""DAG policy violation (DAG ID: test_nonstring_owner, Path: 
{dag_file}):\n"""
+                """Notices:\n"""
+                """ * owner should be a string. Current value: ['a']"""
+            )
+        }
+        assert expected_import_errors == dagbag.import_errors
+
     @patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
     def test_task_cluster_policy_obeyed(self):
         """

Reply via email to