dabla opened a new issue, #43156:
URL: https://github.com/apache/airflow/issues/43156
### Apache Airflow version
2.10.2
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I have 2 tasks that can possibily raise an AirflowSkipException, the first
one is raised in a task directly within the DAG, the second task that possibly
raises it is located within a TaskGroup. If the first task raises the
AirflowSkipException, everything goes smooth, but if the second task within the
TaskGroup raises the AirflowSkipException, then I get an exception:
```
[2024-10-18, 12:32:35 UTC] {standard_task_runner.py:124} ERROR - Failed to
execute job 7543209 for task ad_user_group_extraction.updated_group_ids (object
of type 'AirflowSkipException' has no len(); 4538)
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
273, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
3149, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
3173, in _execute_task
return _execute_task(self, context, task_orig)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
733, in _execute_callable
return ExecutionCallableRunner(
File
"/usr/local/lib/python3.9/site-packages/airflow/utils/operator_helpers.py",
line 252, in run
return self.func(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line
406, in wrapper
return func(self, *args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
line 288, in execute
output = hook.run(
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py",
line 459, in run
result = self._make_common_data_structure(handler(cur))
File
"/usr/local/airflow/includes/projects/ad_user_extraction/functions.py", line
85, in check_query_result
raise AirflowSkipException(
airflow.exceptions.AirflowSkipException: No rows returned from the query,
skipping the task.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 117, in _start_by_fork
ret = args.func(args, dag=self.dag)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line
115, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 483, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 256, in _run_task_by_selected_method
return _run_raw_task(args, ti)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 341, in _run_raw_task
return ti._run_raw_task(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
2995, in _run_raw_task
return _run_raw_task(
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
301, in _run_raw_task
ti.log.info(e)
File "/usr/local/lib/python3.9/logging/__init__.py", line 1446, in info
self._log(INFO, msg, args, **kwargs)
File "/usr/local/lib/python3.9/logging/__init__.py", line 1589, in _log
self.handle(record)
File "/usr/local/lib/python3.9/logging/__init__.py", line 1598, in handle
if (not self.disabled) and self.filter(record):
File "/usr/local/lib/python3.9/logging/__init__.py", line 806, in filter
result = f.filter(record)
File "/usr/local/airflow/config/log_config.py", line 30, in filter
if len(record.msg) > self.max_length:
TypeError: object of type 'AirflowSkipException' has no len()
```
### What you think should happen instead?
It seems the log_config module tries to calculate the length of a logging
message but fails there as 'AirflowSkipException' has no len() attribute
- [ ]
### How to reproduce
```
with DAG(
"user_relations_extraction",
default_args=DEFAULT_ARGS,
schedule_interval=timedelta(minutes=5),
dagrun_timeout=timedelta(hours=2),
max_active_runs=1,
max_active_tasks=3,
catchup=False,
tags=TAGS,
) as dag:
updated_user_ids_task = SQLExecuteQueryOperator(
task_id="updated_user_ids",
conn_id=MSSQL_CONN_ID,
sql="""
SELECT TOP {{ var.value.get('users_extraction.chunk_size', 100)
}} ID
FROM (
SELECT DISTINCT ID, UPDATED_ON
FROM USERS
WHERE UPDATED_ON IS NOT NULL
) AS updated_users
ORDER BY UPDATED_ON ASC
""",
handler=check_query_result, # This method raises a
AirflowSkipException when no records are returned
show_return_value_in_logs=True,
dag=dag,
)
user_registered_devices_task = MSGraphAsyncOperator.partial(
task_id="user_registered_devices",
conn_id=MSGRAPH_CONN_ID,
url="users/{userId}/registeredDevices",
result_processor=persist_response,
event_handler=event_handler,
do_xcom_push=False,
retry_delay=60,
trigger_rule=TriggerRule.ALL_SKIPPED,
dag=dag,
).expand(path_parameters=updated_user_ids_task.output.map(lambda x:
{"userId": x[0]}))
user_license_details_task = MSGraphAsyncOperator.partial(
task_id="user_license_details",
conn_id=MSGRAPH_CONN_ID,
url="users/{userId}/licenseDetails",
result_processor=persist_response,
event_handler=event_handler,
do_xcom_push=False,
retry_delay=60,
trigger_rule=TriggerRule.ALL_SKIPPED,
dag=dag,
).expand(path_parameters=updated_user_ids_task.output.map(lambda x:
{"userId": x[0]}))
user_group_members_task = MSGraphAsyncOperator.partial(
task_id="user_group_members",
conn_id=MSGRAPH_CONN_ID,
url="users/{userId}/getMemberGroups",
method="POST",
data={"securityEnabledOnly": True},
result_processor=persist_response,
event_handler=event_handler,
do_xcom_push=False,
retry_delay=60,
trigger_rule=TriggerRule.ALL_SKIPPED,
dag=dag,
).expand(path_parameters=updated_user_ids_task.output.map(lambda x:
{"userId": x[0]}))
update_dirty_users_task = SQLExecuteQueryOperator(
task_id="update_dirty_users",
conn_id=MSSQL_CONN_ID,
sql="""
UPDATE USERS
SET UPDATED_ON = NULL
WHERE ID IN (
{% set ids = ti.xcom_pull(task_ids='updated_user_ids') or
['0'] %}
{% for id in ids %}
'{{ id[0] }}'{% if not loop.last %}, {% endif %}
{% endfor %}
)
""",
show_return_value_in_logs=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)
with TaskGroup("ad_user_group_extraction") as groups_members_task:
updated_group_ids_task = SQLExecuteQueryOperator(
task_id="updated_group_ids",
conn_id="odbc_ms1744_sa_o365_dev",
sql="""
SELECT TOP {{ var.value.get('users_extraction.chunk_size',
100) }} ID
FROM (
SELECT DISTINCT ID, UPDATED_ON
FROM GROUPS
WHERE UPDATED_ON IS NOT NULL
) AS updated_groups
ORDER BY UPDATED_ON ASC
""",
handler=check_query_result,
show_return_value_in_logs=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)
```
### Operating System
RedHat
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other 3rd-party Helm chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]