TAKEDA-Takashi opened a new pull request, #63400:
URL: https://github.com/apache/airflow/pull/63400
## Description
`LambdaInvokeFunctionOperator.execute()` currently always returns the Lambda
response payload as a string via `payload_stream.read().decode()`. When a
Lambda function returns a JSON object (e.g., `{"bucket": "my-bucket", "prefix":
"data/"}`), the XCom value is stored as the string `'{"bucket": "my-bucket",
"prefix": "data/"}'`, not as a Python dict.
This makes it impossible to access individual keys from the response in
downstream task templates:
```python
# Raises TypeError because XCom value is a string, not a dict
bucket = "{{ ti.xcom_pull(task_ids='get_config')['bucket'] }}"
```
This PR adds a `deserialize_payload` boolean parameter (default `False` for
backward compatibility) that, when set to `True`, applies `json.loads()` to the
payload before returning it from `execute()`.
```python
# In execute():
payload = payload_stream.read().decode()
if self.deserialize_payload:
payload = json.loads(payload)
return payload
```
## Use case / motivation
I'm using [Amazon MWAA
Serverless](https://docs.aws.amazon.com/mwaa/latest/mwaa-serverless-userguide/what-is-mwaa-serverless.html),
which defines workflows in YAML rather than Python DAGs. In this environment:
- There is no `PythonOperator` available to post-process XCom values with
`json.loads()`
- `render_template_as_native_obj` cannot be set (no DAG-level Python
configuration)
- Subclassing the operator is not possible (YAML references operators by
class path)
- Jinja2 `| fromjson` filter is not registered
The only workaround is to split the Lambda invocation into separate tasks —
one per output value — which increases both complexity and cost (MWAA
Serverless charges per task instance with a 1-minute minimum).
A simple `deserialize_payload: true` in the YAML definition solves this
entirely:
```yaml
get_config:
operator:
airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: get_config
function_name: get-config
payload: '{}'
deserialize_payload: true
use_config:
operator: ...
bucket: "{{ ti.xcom_pull(task_ids='get_config')['bucket'] }}"
dependencies:
- get_config
```
A callable-based approach like `SimpleHttpOperator`'s `response_filter`
would not work in YAML-only environments since callables cannot be specified in
YAML workflow definitions. A simple boolean flag is more appropriate for this
use case, and also benefits standard Airflow users who currently need an extra
task or custom operator subclass just to parse the Lambda response.
## Tests
- Added test for `deserialize_payload=True`: verifies that `execute()`
returns a dict
- Added test for default behavior (`deserialize_payload=False`): verifies
backward compatibility (returns string)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]