jnstockley opened a new issue, #55464:
URL: https://github.com/apache/airflow/issues/55464

   ### Apache Airflow version
   
   3.0.6
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When trying to pull a value from the xcom the below error message shows up 
in the airflow-dag-processor. We create a custom notifier that extends the 
`BaseNotifier`, we get the `TaskInstance` as follows `ti = context['ti']`, we 
get all the task ids as follows `task_ids: list[str] = 
list(context['dag'].task_dict.keys())` and pull from the XCom as follows 
`report_df = ti.xcom_pull(key='report_df', task_ids=task_ids) if 
ti.xcom_pull(key='report_df', task_ids=task_ids) is not None else ''`
   
   ### What you think should happen instead?
   
   We should return the XCom value, or `None` if it doesn't exisit
   
   ### How to reproduce
   
   Create custom notifier that extends `BaseNotifier`, somewhere in te DAG 
optionally push a value to the XCom, add `on_success_callback` to your custom 
notifier, in the `notify()` function, pull from the XCom using the code above, 
and look at the `airflow-dag-processor` logs to see the error messages
   
   ### Operating System
   
   Debian
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==9.13.0
   apache-airflow-providers-common-compat==1.7.3
   apache-airflow-providers-common-io==1.6.2
   apache-airflow-providers-common-sql==1.27.5
   apache-airflow-providers-fab==2.4.2
   apache-airflow-providers-google==17.2.0
   apache-airflow-providers-http==5.3.4
   apache-airflow-providers-microsoft-mssql==4.3.2
   apache-airflow-providers-openai==1.6.2
   apache-airflow-providers-oracle==4.2.0
   apache-airflow-providers-postgres==6.3.0
   apache-airflow-providers-sftp==5.4.0
   apache-airflow-providers-smtp==2.2.0
   apache-airflow-providers-ssh==4.1.3
   apache-airflow-providers-standard==1.6.0
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Custom docker image which is based off the official docker images
   
   ### Anything else?
   
   Airflow DAG Processor log message
   ```
   2025-09-10 10:31:54 [error    ] Unable to decode message       [supervisor] 
body={'key': 'report_df', 'dag_id': 'Oracle-DB-Check', 'run_id': 
'manual__2025-09-10T15:31:49.020767+00:00', 'task_id': 'check_dw_prod', 
'start': None, 'stop': None, 'step': None, 'include_prior_dates': False, 
'type': 'GetXComSequenceSlice'}
   2025-09-10T15:31:54.097275837Z ╭─────────────────────────────── Traceback 
(most recent call last) ────────────────────────────────╮
   2025-09-10T15:31:54.097279587Z │ 
/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:601
   │
   2025-09-10T15:31:54.097281128Z │ in handle_requests                          
                                                     │
   2025-09-10T15:31:54.097282170Z │                                             
                                                     │
   2025-09-10T15:31:54.097283337Z │    598 │   │   │   request = yield          
                                                     │
   2025-09-10T15:31:54.097284587Z │    599 │   │   │                            
                                                     │
   2025-09-10T15:31:54.097285587Z │    600 │   │   │   try:                     
                                                     │
   2025-09-10T15:31:54.097286670Z │ ❱  601 │   │   │   │   msg = 
self.decoder.validate_python(request.body)                          │
   2025-09-10T15:31:54.097287837Z │    602 │   │   │   except Exception:        
                                                     │
   2025-09-10T15:31:54.097289003Z │    603 │   │   │   │   
log.exception("Unable to decode message", body=request.body)              │
   2025-09-10T15:31:54.097290795Z │    604 │   │   │   │   continue             
                                                     │
   2025-09-10T15:31:54.097291837Z │                                             
                                                     │
   2025-09-10T15:31:54.097292795Z │ 
╭─────────────────────────────────────────── locals 
───────────────────────────────────────────╮ │
   2025-09-10T15:31:54.097299003Z │ │     log = 
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None,         
   │ │
   2025-09-10T15:31:54.097301128Z │ │           context_class=None, 
initial_values={'logger_name': 'supervisor'},                  │ │
   2025-09-10T15:31:54.097302795Z │ │           logger_factory_args=())>        
                                                   │ │
   2025-09-10T15:31:54.097304087Z │ │     msg = MaskSecret(                     
                                                   │ │
   2025-09-10T15:31:54.097305378Z │ │           │   
value='<secret_value_hidden>',                                  │ │
   2025-09-10T15:31:54.097306670Z │ │           │   name='dispatch_api_key',    
                                                   │ │
   2025-09-10T15:31:54.097309837Z │ │           │   type='MaskSecret'           
                                                   │ │
   2025-09-10T15:31:54.097311295Z │ │           )                               
                                                   │ │
   2025-09-10T15:31:54.097312337Z │ │ request = _RequestFrame(                  
                                                   │ │
   2025-09-10T15:31:54.097313503Z │ │           │   id=1,                       
                                                   │ │
   2025-09-10T15:31:54.097314628Z │ │           │   body={                      
                                                   │ │
   2025-09-10T15:31:54.097315795Z │ │           │   │   'key': 'report_df',     
                                                   │ │
   2025-09-10T15:31:54.097321962Z │ │           │   │   'dag_id': 
'Oracle-DB-Check',                                               │ │
   2025-09-10T15:31:54.097323253Z │ │           │   │   'run_id': 
'manual__2025-09-10T15:31:49.020767+00:00',                      │ │
   2025-09-10T15:31:54.097324503Z │ │           │   │   'task_id': 
'check_dw_prod',                                                │ │
   2025-09-10T15:31:54.097325795Z │ │           │   │   'start': None,          
                                                   │ │
   2025-09-10T15:31:54.097327087Z │ │           │   │   'stop': None,           
                                                   │ │
   2025-09-10T15:31:54.097328337Z │ │           │   │   'step': None,           
                                                   │ │
   2025-09-10T15:31:54.097329462Z │ │           │   │   'include_prior_dates': 
False,                                              │ │
   2025-09-10T15:31:54.097331628Z │ │           │   │   'type': 
'GetXComSequenceSlice'                                             │ │
   2025-09-10T15:31:54.097332753Z │ │           │   }                           
                                                   │ │
   2025-09-10T15:31:54.097333837Z │ │           )                               
                                                   │ │
   2025-09-10T15:31:54.097334878Z │ │    self = <DagFileProcessorProcess 
id=UUID('01993441-38d2-7bcc-8bbd-53306533f95a') pid=177>  │ │
   2025-09-10T15:31:54.097336670Z │ 
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
 │
   2025-09-10T15:31:54.097339170Z │                                             
                                                     │
   2025-09-10T15:31:54.097340253Z │ 
/home/airflow/.local/lib/python3.12/site-packages/pydantic/type_adapter.py:421 
in                │
   2025-09-10T15:31:54.097341462Z │ validate_python                             
                                                     │
   2025-09-10T15:31:54.097342545Z │                                             
                                                     │
   2025-09-10T15:31:54.097343545Z │   418 │   │   │   │   
code='validate-by-alias-and-name-false',                                   │
   2025-09-10T15:31:54.097344837Z │   419 │   │   │   )                         
                                                     │
   2025-09-10T15:31:54.097346045Z │   420 │   │                                 
                                                     │
   2025-09-10T15:31:54.097347128Z │ ❱ 421 │   │   return 
self.validator.validate_python(                                             │
   2025-09-10T15:31:54.097348378Z │   422 │   │   │   object,                   
                                                     │
   2025-09-10T15:31:54.097349420Z │   423 │   │   │   strict=strict,            
                                                     │
   2025-09-10T15:31:54.097350378Z │   424 │   │   │   
from_attributes=from_attributes,                                               │
   2025-09-10T15:31:54.097351420Z │                                             
                                                     │
   2025-09-10T15:31:54.097352462Z │ 
╭─────────────────────────────────────────── locals 
───────────────────────────────────────────╮ │
   2025-09-10T15:31:54.097353920Z │ │                   by_alias = None         
                                                   │ │
   2025-09-10T15:31:54.097355087Z │ │                    by_name = None         
                                                   │ │
   2025-09-10T15:31:54.097357045Z │ │                    context = None         
                                                   │ │
   2025-09-10T15:31:54.097358212Z │ │ experimental_allow_partial = False        
                                                   │ │
   2025-09-10T15:31:54.097359295Z │ │            from_attributes = None         
                                                   │ │
   2025-09-10T15:31:54.097360420Z │ │                     object = {            
                                                   │ │
   2025-09-10T15:31:54.097361670Z │ │                              │   'key': 
'report_df',                                         │ │
   2025-09-10T15:31:54.097363545Z │ │                              │   
'dag_id': 'Oracle-DB-Check',                                │ │
   2025-09-10T15:31:54.097364837Z │ │                              │   
'run_id': 'manual__2025-09-10T15:31:49.020767+00:00',       │ │
   2025-09-10T15:31:54.097365962Z │ │                              │   
'task_id': 'check_dw_prod',                                 │ │
   2025-09-10T15:31:54.097367212Z │ │                              │   'start': 
None,                                              │ │
   2025-09-10T15:31:54.097368295Z │ │                              │   'stop': 
None,                                               │ │
   2025-09-10T15:31:54.097369545Z │ │                              │   'step': 
None,                                               │ │
   2025-09-10T15:31:54.097370587Z │ │                              │   
'include_prior_dates': False,                               │ │
   2025-09-10T15:31:54.097371753Z │ │                              │   'type': 
'GetXComSequenceSlice'                              │ │
   2025-09-10T15:31:54.097373795Z │ │                              }            
                                                   │ │
   2025-09-10T15:31:54.097374837Z │ │                       self = 
TypeAdapter(Annotated[Union[DagFileParsingResult,               │ │
   2025-09-10T15:31:54.097376003Z │ │                              
GetConnection, GetVariable, PutVariable, DeleteVariable,        │ │
   2025-09-10T15:31:54.097377253Z │ │                              
GetPrevSuccessfulDagRun, GetPreviousDagRun, MaskSecret],        │ │
   2025-09-10T15:31:54.097378295Z │ │                              
FieldInfo(annotation=NoneType, required=True,                   │ │
   2025-09-10T15:31:54.097379712Z │ │                              
discriminator='type')])                                         │ │
   2025-09-10T15:31:54.097380920Z │ │                     strict = None         
                                                   │ │
   2025-09-10T15:31:54.097382128Z │ 
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
 │
   2025-09-10T15:31:54.097383295Z 
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
   2025-09-10T15:31:54.097384420Z ValidationError: 1 validation error for 
   2025-09-10T15:31:54.097385337Z 
tagged-union[DagFileParsingResult,GetConnection,GetVariable,PutVariable,DeleteVariable,GetPrevSucces
   2025-09-10T15:31:54.097386462Z sfulDagRun,GetPreviousDagRun,MaskSecret]
   2025-09-10T15:31:54.097387295Z   Input tag 'GetXComSequenceSlice' found 
using 'type' does not match any of the expected tags: 
   2025-09-10T15:31:54.097388378Z 'DagFileParsingResult', 'GetConnection', 
'GetVariable', 'PutVariable', 'DeleteVariable', 
   2025-09-10T15:31:54.097389545Z 'GetPrevSuccessfulDagRun', 
'GetPreviousDagRun', 'MaskSecret' [type=union_tag_invalid, 
   2025-09-10T15:31:54.097391545Z input_value={'key': 'report_df', 'dag... 
'GetXComSequenceSlice'}, input_type=dict]
   2025-09-10T15:31:54.097392878Z     For further information visit 
https://errors.pydantic.dev/2.11/v/union_tag_invalid
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to