TAKEDA-Takashi opened a new pull request, #63400:
URL: https://github.com/apache/airflow/pull/63400

   ## Description
   
   `LambdaInvokeFunctionOperator.execute()` currently always returns the Lambda 
response payload as a string via `payload_stream.read().decode()`. When a 
Lambda function returns a JSON object (e.g., `{"bucket": "my-bucket", "prefix": 
"data/"}`), the XCom value is stored as the string `'{"bucket": "my-bucket", 
"prefix": "data/"}'`, not as a Python dict.
   
   This makes it impossible to access individual keys from the response in 
downstream task templates:
   
   ```python
   # Raises TypeError because XCom value is a string, not a dict
   bucket = "{{ ti.xcom_pull(task_ids='get_config')['bucket'] }}"
   ```
   
   This PR adds a `deserialize_payload` boolean parameter (default `False` for 
backward compatibility) that, when set to `True`, applies `json.loads()` to the 
payload before returning it from `execute()`.
   
   ```python
   # In execute():
   payload = payload_stream.read().decode()
   if self.deserialize_payload:
       payload = json.loads(payload)
   return payload
   ```
   
   ## Use case / motivation
   
   I'm using [Amazon MWAA 
Serverless](https://docs.aws.amazon.com/mwaa/latest/mwaa-serverless-userguide/what-is-mwaa-serverless.html),
 which defines workflows in YAML rather than Python DAGs. In this environment:
   
   - There is no `PythonOperator` available to post-process XCom values with 
`json.loads()`
   - `render_template_as_native_obj` cannot be set (no DAG-level Python 
configuration)
   - Subclassing the operator is not possible (YAML references operators by 
class path)
   - Jinja2 `| fromjson` filter is not registered
   
   The only workaround is to split the Lambda invocation into separate tasks — 
one per output value — which increases both complexity and cost (MWAA 
Serverless charges per task instance with a 1-minute minimum).
   
   A simple `deserialize_payload: true` in the YAML definition solves this 
entirely:
   
   ```yaml
   get_config:
     operator: 
airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
     task_id: get_config
     function_name: get-config
     payload: '{}'
     deserialize_payload: true
   use_config:
     operator: ...
     bucket: "{{ ti.xcom_pull(task_ids='get_config')['bucket'] }}"
     dependencies:
       - get_config
   ```
   
   A callable-based approach like `SimpleHttpOperator`'s `response_filter` 
would not work in YAML-only environments since callables cannot be specified in 
YAML workflow definitions. A simple boolean flag is more appropriate for this 
use case, and also benefits standard Airflow users who currently need an extra 
task or custom operator subclass just to parse the Lambda response.
   
   ## Tests
   
   - Added test for `deserialize_payload=True`: verifies that `execute()` 
returns a dict
   - Added test for default behavior (`deserialize_payload=False`): verifies 
backward compatibility (returns string)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to