dabla opened a new issue, #43156:
URL: https://github.com/apache/airflow/issues/43156

   ### Apache Airflow version
   
   2.10.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I have 2 tasks that can possibily raise an AirflowSkipException, the first 
one is raised in a task directly within the DAG, the second task that possibly 
raises it is located within a TaskGroup.  If the first task raises the 
AirflowSkipException, everything goes smooth, but if the second task within the 
TaskGroup raises the AirflowSkipException, then I get an exception:
   
   ```
   [2024-10-18, 12:32:35 UTC] {standard_task_runner.py:124} ERROR - Failed to 
execute job 7543209 for task ad_user_group_extraction.updated_group_ids (object 
of type 'AirflowSkipException' has no len(); 4538)
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
273, in _run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
3149, in _execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
3173, in _execute_task
       return _execute_task(self, context, task_orig)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
767, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
733, in _execute_callable
       return ExecutionCallableRunner(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/utils/operator_helpers.py", 
line 252, in run
       return self.func(*args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 
406, in wrapper
       return func(self, *args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
 line 288, in execute
       output = hook.run(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py",
 line 459, in run
       result = self._make_common_data_structure(handler(cur))
     File 
"/usr/local/airflow/includes/projects/ad_user_extraction/functions.py", line 
85, in check_query_result
       raise AirflowSkipException(
   airflow.exceptions.AirflowSkipException: No rows returned from the query, 
skipping the task.
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 117, in _start_by_fork
       ret = args.func(args, dag=self.dag)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
115, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", 
line 483, in task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", 
line 256, in _run_task_by_selected_method
       return _run_raw_task(args, ti)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", 
line 341, in _run_raw_task
       return ti._run_raw_task(
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
2995, in _run_raw_task
       return _run_raw_task(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
301, in _run_raw_task
       ti.log.info(e)
     File "/usr/local/lib/python3.9/logging/__init__.py", line 1446, in info
       self._log(INFO, msg, args, **kwargs)
     File "/usr/local/lib/python3.9/logging/__init__.py", line 1589, in _log
       self.handle(record)
     File "/usr/local/lib/python3.9/logging/__init__.py", line 1598, in handle
       if (not self.disabled) and self.filter(record):
     File "/usr/local/lib/python3.9/logging/__init__.py", line 806, in filter
       result = f.filter(record)
     File "/usr/local/airflow/config/log_config.py", line 30, in filter
       if len(record.msg) > self.max_length:
   TypeError: object of type 'AirflowSkipException' has no len()
   ```
   
   ### What you think should happen instead?
   
   It seems the log_config module tries to calculate the length of a logging 
message but fails there as 'AirflowSkipException' has no len() attribute
   
   - [ ] 
   
   ### How to reproduce
   
   ```
   with DAG(
       "user_relations_extraction",
       default_args=DEFAULT_ARGS,
       schedule_interval=timedelta(minutes=5),
       dagrun_timeout=timedelta(hours=2),
       max_active_runs=1,
       max_active_tasks=3,
       catchup=False,
       tags=TAGS,
   ) as dag:
   
       updated_user_ids_task = SQLExecuteQueryOperator(
           task_id="updated_user_ids",
           conn_id=MSSQL_CONN_ID,
           sql="""
               SELECT TOP {{ var.value.get('users_extraction.chunk_size', 100) 
}} ID 
               FROM (
                   SELECT DISTINCT ID, UPDATED_ON 
                   FROM USERS 
                   WHERE UPDATED_ON IS NOT NULL
               ) AS updated_users 
               ORDER BY UPDATED_ON ASC
               """,
           handler=check_query_result,  # This method raises a 
AirflowSkipException when no records are returned
           show_return_value_in_logs=True,
           dag=dag,
       )
   
       user_registered_devices_task = MSGraphAsyncOperator.partial(
           task_id="user_registered_devices",
           conn_id=MSGRAPH_CONN_ID,
           url="users/{userId}/registeredDevices",
           result_processor=persist_response,
           event_handler=event_handler,
           do_xcom_push=False,
           retry_delay=60,
           trigger_rule=TriggerRule.ALL_SKIPPED,
           dag=dag,
       ).expand(path_parameters=updated_user_ids_task.output.map(lambda x: 
{"userId": x[0]}))
   
       user_license_details_task = MSGraphAsyncOperator.partial(
           task_id="user_license_details",
           conn_id=MSGRAPH_CONN_ID,
           url="users/{userId}/licenseDetails",
           result_processor=persist_response,
           event_handler=event_handler,
           do_xcom_push=False,
           retry_delay=60,
           trigger_rule=TriggerRule.ALL_SKIPPED,
           dag=dag,
       ).expand(path_parameters=updated_user_ids_task.output.map(lambda x: 
{"userId": x[0]}))
   
       user_group_members_task = MSGraphAsyncOperator.partial(
           task_id="user_group_members",
           conn_id=MSGRAPH_CONN_ID,
           url="users/{userId}/getMemberGroups",
           method="POST",
           data={"securityEnabledOnly": True},
           result_processor=persist_response,
           event_handler=event_handler,
           do_xcom_push=False,
           retry_delay=60,
           trigger_rule=TriggerRule.ALL_SKIPPED,
           dag=dag,
       ).expand(path_parameters=updated_user_ids_task.output.map(lambda x: 
{"userId": x[0]}))
   
       update_dirty_users_task = SQLExecuteQueryOperator(
           task_id="update_dirty_users",
           conn_id=MSSQL_CONN_ID,
           sql="""
               UPDATE USERS
               SET UPDATED_ON = NULL
               WHERE ID IN (
                   {% set ids = ti.xcom_pull(task_ids='updated_user_ids') or 
['0'] %}
                   {% for id in ids %}
                       '{{ id[0] }}'{% if not loop.last %}, {% endif %}
                   {% endfor %}
               )
               """,
           show_return_value_in_logs=True,
           trigger_rule=TriggerRule.ALL_DONE,
           dag=dag,
       )
   
       with TaskGroup("ad_user_group_extraction") as groups_members_task:
           updated_group_ids_task = SQLExecuteQueryOperator(
               task_id="updated_group_ids",
               conn_id="odbc_ms1744_sa_o365_dev",
               sql="""
                   SELECT TOP {{ var.value.get('users_extraction.chunk_size', 
100) }} ID 
                   FROM (
                       SELECT DISTINCT ID, UPDATED_ON 
                       FROM GROUPS 
                       WHERE UPDATED_ON IS NOT NULL
                   ) AS updated_groups 
                   ORDER BY UPDATED_ON ASC
                   """,
               handler=check_query_result,
               show_return_value_in_logs=True,
               trigger_rule=TriggerRule.ALL_DONE,
               dag=dag,
           )
   ```
   
   ### Operating System
   
   RedHat
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to