jnstockley opened a new issue, #55464:
URL: https://github.com/apache/airflow/issues/55464
### Apache Airflow version
3.0.6
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
When trying to pull a value from the xcom the below error message shows up
in the airflow-dag-processor. We create a custom notifier that extends the
`BaseNotifier`, we get the `TaskInstance` as follows `ti = context['ti']`, we
get all the task ids as follows `task_ids: list[str] =
list(context['dag'].task_dict.keys())` and pull from the XCom as follows
`report_df = ti.xcom_pull(key='report_df', task_ids=task_ids) if
ti.xcom_pull(key='report_df', task_ids=task_ids) is not None else ''`
### What you think should happen instead?
We should return the XCom value, or `None` if it doesn't exisit
### How to reproduce
Create custom notifier that extends `BaseNotifier`, somewhere in te DAG
optionally push a value to the XCom, add `on_success_callback` to your custom
notifier, in the `notify()` function, pull from the XCom using the code above,
and look at the `airflow-dag-processor` logs to see the error messages
### Operating System
Debian
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.13.0
apache-airflow-providers-common-compat==1.7.3
apache-airflow-providers-common-io==1.6.2
apache-airflow-providers-common-sql==1.27.5
apache-airflow-providers-fab==2.4.2
apache-airflow-providers-google==17.2.0
apache-airflow-providers-http==5.3.4
apache-airflow-providers-microsoft-mssql==4.3.2
apache-airflow-providers-openai==1.6.2
apache-airflow-providers-oracle==4.2.0
apache-airflow-providers-postgres==6.3.0
apache-airflow-providers-sftp==5.4.0
apache-airflow-providers-smtp==2.2.0
apache-airflow-providers-ssh==4.1.3
apache-airflow-providers-standard==1.6.0
### Deployment
Other Docker-based deployment
### Deployment details
Custom docker image which is based off the official docker images
### Anything else?
Airflow DAG Processor log message
```
2025-09-10 10:31:54 [error ] Unable to decode message [supervisor]
body={'key': 'report_df', 'dag_id': 'Oracle-DB-Check', 'run_id':
'manual__2025-09-10T15:31:49.020767+00:00', 'task_id': 'check_dw_prod',
'start': None, 'stop': None, 'step': None, 'include_prior_dates': False,
'type': 'GetXComSequenceSlice'}
2025-09-10T15:31:54.097275837Z ╭─────────────────────────────── Traceback
(most recent call last) ────────────────────────────────╮
2025-09-10T15:31:54.097279587Z │
/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:601
│
2025-09-10T15:31:54.097281128Z │ in handle_requests
│
2025-09-10T15:31:54.097282170Z │
│
2025-09-10T15:31:54.097283337Z │ 598 │ │ │ request = yield
│
2025-09-10T15:31:54.097284587Z │ 599 │ │ │
│
2025-09-10T15:31:54.097285587Z │ 600 │ │ │ try:
│
2025-09-10T15:31:54.097286670Z │ ❱ 601 │ │ │ │ msg =
self.decoder.validate_python(request.body) │
2025-09-10T15:31:54.097287837Z │ 602 │ │ │ except Exception:
│
2025-09-10T15:31:54.097289003Z │ 603 │ │ │ │
log.exception("Unable to decode message", body=request.body) │
2025-09-10T15:31:54.097290795Z │ 604 │ │ │ │ continue
│
2025-09-10T15:31:54.097291837Z │
│
2025-09-10T15:31:54.097292795Z │
╭─────────────────────────────────────────── locals
───────────────────────────────────────────╮ │
2025-09-10T15:31:54.097299003Z │ │ log =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None,
│ │
2025-09-10T15:31:54.097301128Z │ │ context_class=None,
initial_values={'logger_name': 'supervisor'}, │ │
2025-09-10T15:31:54.097302795Z │ │ logger_factory_args=())>
│ │
2025-09-10T15:31:54.097304087Z │ │ msg = MaskSecret(
│ │
2025-09-10T15:31:54.097305378Z │ │ │
value='<secret_value_hidden>', │ │
2025-09-10T15:31:54.097306670Z │ │ │ name='dispatch_api_key',
│ │
2025-09-10T15:31:54.097309837Z │ │ │ type='MaskSecret'
│ │
2025-09-10T15:31:54.097311295Z │ │ )
│ │
2025-09-10T15:31:54.097312337Z │ │ request = _RequestFrame(
│ │
2025-09-10T15:31:54.097313503Z │ │ │ id=1,
│ │
2025-09-10T15:31:54.097314628Z │ │ │ body={
│ │
2025-09-10T15:31:54.097315795Z │ │ │ │ 'key': 'report_df',
│ │
2025-09-10T15:31:54.097321962Z │ │ │ │ 'dag_id':
'Oracle-DB-Check', │ │
2025-09-10T15:31:54.097323253Z │ │ │ │ 'run_id':
'manual__2025-09-10T15:31:49.020767+00:00', │ │
2025-09-10T15:31:54.097324503Z │ │ │ │ 'task_id':
'check_dw_prod', │ │
2025-09-10T15:31:54.097325795Z │ │ │ │ 'start': None,
│ │
2025-09-10T15:31:54.097327087Z │ │ │ │ 'stop': None,
│ │
2025-09-10T15:31:54.097328337Z │ │ │ │ 'step': None,
│ │
2025-09-10T15:31:54.097329462Z │ │ │ │ 'include_prior_dates':
False, │ │
2025-09-10T15:31:54.097331628Z │ │ │ │ 'type':
'GetXComSequenceSlice' │ │
2025-09-10T15:31:54.097332753Z │ │ │ }
│ │
2025-09-10T15:31:54.097333837Z │ │ )
│ │
2025-09-10T15:31:54.097334878Z │ │ self = <DagFileProcessorProcess
id=UUID('01993441-38d2-7bcc-8bbd-53306533f95a') pid=177> │ │
2025-09-10T15:31:54.097336670Z │
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
│
2025-09-10T15:31:54.097339170Z │
│
2025-09-10T15:31:54.097340253Z │
/home/airflow/.local/lib/python3.12/site-packages/pydantic/type_adapter.py:421
in │
2025-09-10T15:31:54.097341462Z │ validate_python
│
2025-09-10T15:31:54.097342545Z │
│
2025-09-10T15:31:54.097343545Z │ 418 │ │ │ │
code='validate-by-alias-and-name-false', │
2025-09-10T15:31:54.097344837Z │ 419 │ │ │ )
│
2025-09-10T15:31:54.097346045Z │ 420 │ │
│
2025-09-10T15:31:54.097347128Z │ ❱ 421 │ │ return
self.validator.validate_python( │
2025-09-10T15:31:54.097348378Z │ 422 │ │ │ object,
│
2025-09-10T15:31:54.097349420Z │ 423 │ │ │ strict=strict,
│
2025-09-10T15:31:54.097350378Z │ 424 │ │ │
from_attributes=from_attributes, │
2025-09-10T15:31:54.097351420Z │
│
2025-09-10T15:31:54.097352462Z │
╭─────────────────────────────────────────── locals
───────────────────────────────────────────╮ │
2025-09-10T15:31:54.097353920Z │ │ by_alias = None
│ │
2025-09-10T15:31:54.097355087Z │ │ by_name = None
│ │
2025-09-10T15:31:54.097357045Z │ │ context = None
│ │
2025-09-10T15:31:54.097358212Z │ │ experimental_allow_partial = False
│ │
2025-09-10T15:31:54.097359295Z │ │ from_attributes = None
│ │
2025-09-10T15:31:54.097360420Z │ │ object = {
│ │
2025-09-10T15:31:54.097361670Z │ │ │ 'key':
'report_df', │ │
2025-09-10T15:31:54.097363545Z │ │ │
'dag_id': 'Oracle-DB-Check', │ │
2025-09-10T15:31:54.097364837Z │ │ │
'run_id': 'manual__2025-09-10T15:31:49.020767+00:00', │ │
2025-09-10T15:31:54.097365962Z │ │ │
'task_id': 'check_dw_prod', │ │
2025-09-10T15:31:54.097367212Z │ │ │ 'start':
None, │ │
2025-09-10T15:31:54.097368295Z │ │ │ 'stop':
None, │ │
2025-09-10T15:31:54.097369545Z │ │ │ 'step':
None, │ │
2025-09-10T15:31:54.097370587Z │ │ │
'include_prior_dates': False, │ │
2025-09-10T15:31:54.097371753Z │ │ │ 'type':
'GetXComSequenceSlice' │ │
2025-09-10T15:31:54.097373795Z │ │ }
│ │
2025-09-10T15:31:54.097374837Z │ │ self =
TypeAdapter(Annotated[Union[DagFileParsingResult, │ │
2025-09-10T15:31:54.097376003Z │ │
GetConnection, GetVariable, PutVariable, DeleteVariable, │ │
2025-09-10T15:31:54.097377253Z │ │
GetPrevSuccessfulDagRun, GetPreviousDagRun, MaskSecret], │ │
2025-09-10T15:31:54.097378295Z │ │
FieldInfo(annotation=NoneType, required=True, │ │
2025-09-10T15:31:54.097379712Z │ │
discriminator='type')]) │ │
2025-09-10T15:31:54.097380920Z │ │ strict = None
│ │
2025-09-10T15:31:54.097382128Z │
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
│
2025-09-10T15:31:54.097383295Z
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
2025-09-10T15:31:54.097384420Z ValidationError: 1 validation error for
2025-09-10T15:31:54.097385337Z
tagged-union[DagFileParsingResult,GetConnection,GetVariable,PutVariable,DeleteVariable,GetPrevSucces
2025-09-10T15:31:54.097386462Z sfulDagRun,GetPreviousDagRun,MaskSecret]
2025-09-10T15:31:54.097387295Z Input tag 'GetXComSequenceSlice' found
using 'type' does not match any of the expected tags:
2025-09-10T15:31:54.097388378Z 'DagFileParsingResult', 'GetConnection',
'GetVariable', 'PutVariable', 'DeleteVariable',
2025-09-10T15:31:54.097389545Z 'GetPrevSuccessfulDagRun',
'GetPreviousDagRun', 'MaskSecret' [type=union_tag_invalid,
2025-09-10T15:31:54.097391545Z input_value={'key': 'report_df', 'dag...
'GetXComSequenceSlice'}, input_type=dict]
2025-09-10T15:31:54.097392878Z For further information visit
https://errors.pydantic.dev/2.11/v/union_tag_invalid
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]