tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113241669

   Ok, I am able to reproduce with this test as below where there are no import 
errors. I didn't have cluster_policy enabled in my local airflow which 
validates the owner to be non-default. With owner being non-string it causes 
error from below traceback
   
   ```python
   @patch("airflow.settings.task_policy", cluster_policies.cluster_policy)      
                                                                                
                
   def test_task_cluster_policy_unhashable_owner(self):                         
                                                                                
                
       """                                                                      
                                                                                
                
       test that file processing results in import error when task does not     
                                                                                
                
       obey cluster policy and has non-string owner                             
                                                                                
                
       """                                                                      
                                                                                
                
       dag_file = os.path.join(TEST_DAGS_FOLDER, "test_unhashable_owner.py")    
                                                                                
                
                                                                                
                                                                                
                
       dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, 
include_examples=False)                                                         
                        
       assert set() == set(dagbag.dag_ids)                                      
                                                                                
                
       assert dagbag.import_errors == {} 
   ```
   
   ```test_unhashable_owner.py
   from airflow import DAG                                                      
                                                                                
                
   from airflow.operators.empty import EmptyOperator                            
                                                                                
                
   from airflow.utils.dates import days_ago                                     
                                                                                
                
                                                                                
                                                                                
                
   with DAG(                                                                    
                                                                                
                
       dag_id="test_missing_owner",                                             
                                                                                
                
       schedule_interval="0 0 * * *",                                           
                                                                                
                
       start_date=days_ago(2),                                                  
                                                                                
                
       dagrun_timeout=timedelta(minutes=60),                                    
                                                                                
                
       tags=["example"],                                                        
                                                                                
                
   ) as dag:            DAG(dag_id: str, description: Optional[str]=None, 
schedule_interval: ScheduleInt                                                  
                      
       run_this_last = EervalArg=NOTSET, timetable: Optional[Timetable]=None, 
start_date: Optional[datet                                                      
                  
           task_id="testime]=None, end_date: Optional[datetime]=None, 
full_filepath: Optional[str]=None,                                              
                          
       )
   ```
   
   ```
   [2022-04-29 12:09:05,046] {dagbag.py:507} INFO - Filling up the DagBag from 
/opt/airflow/tests/dags/test_unhashable_owner.py
   [2022-04-29 12:09:05,048] {dagbag.py:535} ERROR - 'list' object has no 
attribute 'lower'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagbag.py", line 522, in collect_dags
       found_dags = self.process_file(filepath, 
only_if_updated=only_if_updated, safe_mode=safe_mode)
     File "/opt/airflow/airflow/models/dagbag.py", line 290, in process_file
       found_dags = self._process_modules(filepath, mods, 
file_last_changed_on_disk)
     File "/opt/airflow/airflow/models/dagbag.py", line 408, in _process_modules
       self.bag_dag(dag=dag, root_dag=dag)
     File "/opt/airflow/airflow/models/dagbag.py", line 433, in bag_dag
       self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
     File "/opt/airflow/airflow/models/dagbag.py", line 450, in _bag_dag
       settings.task_policy(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 64, in 
cluster_policy
       _check_task_rules(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 50, in 
_check_task_rules
       rule(current_task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 30, in 
task_must_have_owners
       if not task.owner or task.owner.lower() == conf.get('operators', 
'default_owner'):
   AttributeError: 'list' object has no attribute 'lower'
   
----------------------------------------------------------------------------- 
Captured log call 
-----------------------------------------------------------------------------
   INFO     airflow.models.dagbag.DagBag:dagbag.py:507 Filling up the DagBag 
from /opt/airflow/tests/dags/test_unhashable_owner.py
   ERROR    airflow.models.dagbag.DagBag:dagbag.py:535 'list' object has no 
attribute 'lower'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagbag.py", line 522, in collect_dags
       found_dags = self.process_file(filepath, 
only_if_updated=only_if_updated, safe_mode=safe_mode)
     File "/opt/airflow/airflow/models/dagbag.py", line 290, in process_file
       found_dags = self._process_modules(filepath, mods, 
file_last_changed_on_disk)
     File "/opt/airflow/airflow/models/dagbag.py", line 408, in _process_modules
       self.bag_dag(dag=dag, root_dag=dag)
     File "/opt/airflow/airflow/models/dagbag.py", line 433, in bag_dag
       self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
     File "/opt/airflow/airflow/models/dagbag.py", line 450, in _bag_dag
       settings.task_policy(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 64, in 
cluster_policy
       _check_task_rules(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 50, in 
_check_task_rules
       rule(current_task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 30, in 
task_must_have_owners
       if not task.owner or task.owner.lower() == conf.get('operators', 
'default_owner'):
   AttributeError: 'list' object has no attribute 'lower'
   
============================================================================= 
warnings summary 
==============================================================================
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to