MaiHoangViet1809 opened a new issue, #26980:
URL: https://github.com/apache/airflow/issues/26980

   ### Apache Airflow version
   
   2.4.1
   
   ### What happened
   
   When build a flow between traditional task and taskflow API, I saw this 
error log:
   ```
   [2022-10-11, 11:23:58 +07] {taskinstance.py:1851} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/utils/session.py",
 line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 2378, in xcom_push
       XCom.set(
     File 
"/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/utils/session.py",
 line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/xcom.py",
 line 206, in set
       value = cls.serialize_value(
     File 
"/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/xcom.py",
 line 597, in serialize_value
       return json.dumps(value).encode('UTF-8')
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/__init__.py", 
line 231, in dumps
       return _default_encoder.encode(obj)
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", 
line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", 
line 257, in iterencode
       return _iterencode(o, 0)
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", 
line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type _LazyXComAccess is not JSON serializable
   ```
   
   ### What you think should happen instead
   
   It a task expand call follow a S3keySensor, then I try to wrap it with 
another reduce (return a list or do aggregation), the result push should be 
reduced by default to be used in next task if that next task is not a 
map/expand task ?
   
   ### How to reproduce
   
   this is the sample code included both working and bug flow:
   
       import sys, os
       from airflow import DAG
       from datetime import datetime, date, timedelta
       from dateutil.relativedelta import relativedelta
       from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
       from airflow.sensors.base import PokeReturnValue
       
       from airflow.models.baseoperator import chain
       from airflow.utils.state import State
       from airflow.utils.trigger_rule import TriggerRule
       from airflow.decorators import dag, task
       from airflow.operators.python import get_current_context
       from airflow.exceptions import AirflowFailException
       from typing import List, Callable, Dict, Union
       from functools import partial
       
       PARAMS = {"INPUT_FOLDER": "/input",
                 "DATA_FOLDER" : "/data",
                }
       
       DEBUG = True
       
       class S3KeySensor_test(S3KeySensor):
           def poke(self, context) -> Union[bool, PokeReturnValue]:
               check_result = True # all(self._check_key(key) for key in 
self.bucket_key)
               if check_result:
                   return PokeReturnValue(is_done=True, 
xcom_value=",".join(self.bucket_key))
               else:
                   return False
       
       with DAG("AA.REPORT_BUG", start_date=datetime(2022,1,1), catchup=False, 
schedule=None, params=PARAMS) as dag:
       
           @task
           def generate_dependency():
               ctx = get_current_context()
               params = ctx["params"]   
               logical_date = ctx["logical_date"]
               return [{"name": "test1" , "bucket_key" : f"s3:/{ 
params['INPUT_FOLDER'] }/{ logical_date.strftime('%Y/%m/%d') }"}, 
                       {"name": "test2" , "bucket_key" : f"s3:/{ 
params['DATA_FOLDER'] }/{ logical_date.strftime('%Y/%m/%d') }"}, 
                      ]
           
           @task(multiple_outputs=True)
           def map_dependency(data) -> Dict[str, str]:
               return {"bucket_key": data["bucket_key"] }
           
           @task
           def reduce_task(data):
               return list(data or [])
             
           @task
           def non_reduce_task(arg_1):
               return arg_1
           
           # bug flow
           list_dependency = 
generate_dependency.override(task_id="list_dependency")()
           dependencies = 
map_dependency.override(task_id="map_dependency").expand(data=list_dependency)
           
           list_task_sensor = S3KeySensor_test.partial(task_id="check_s3", 
                                                       wildcard_match=True,
                                                       
aws_conn_id='aws_default',
                                                       timeout=300,
                                                       poke_interval=120,
                                                       mode="reschedule")\
                                              .expand_kwargs(dependencies) 
           
           bug_task = non_reduce_task.override(task_id="bug_task", retries=3, 
retry_delay=timedelta(minutes=15))(arg_1=list_task_sensor.output)
       
           
           # work flow
           list_dependency_2 = 
generate_dependency.override(task_id="list_dependency_2")()
           dependencies_2 = 
map_dependency.override(task_id="map_dependency_2").expand(data=list_dependency_2)
           
           list_task_sensor_2 = S3KeySensor_test.partial(task_id="check_s3_2", 
                                                         wildcard_match=True,
                                                         
aws_conn_id='aws_default',
                                                         timeout=300,
                                                         poke_interval=120,
                                                         mode="reschedule")\
                                                .expand_kwargs(dependencies_2) 
           
           normal_task = non_reduce_task.override(task_id="normal_task", 
retries=3, 
retry_delay=timedelta(minutes=15))(arg_1=reduce_task(list_task_sensor_2.output))
   
   
   ### Operating System
   
   REHL 7
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==4.1.0
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to