amoghrajesh commented on code in PR #47061:
URL: https://github.com/apache/airflow/pull/47061#discussion_r1969964615
##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -122,15 +121,22 @@ class TIDeferredStatePayload(StrictBaseModel):
),
]
classpath: str
- trigger_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
- next_method: str
+ trigger_kwargs: Annotated[dict[str, Any] | str,
Field(default_factory=dict)]
+ """
+ Kwargs to pass to the trigger constructor, either a plain dict or an
ecnrypted string.
Review Comment:
```suggestion
Kwargs to pass to the trigger constructor, either a plain dict or an
encrypted string.
```
##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -122,15 +121,22 @@ class TIDeferredStatePayload(StrictBaseModel):
),
]
classpath: str
- trigger_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
- next_method: str
+ trigger_kwargs: Annotated[dict[str, Any] | str,
Field(default_factory=dict)]
+ """
+ Kwargs to pass to the trigger constructor, either a plain dict or an
ecnrypted string.
+
+ Both forms will be passed along to the trigger, the server will not handle
either.
+ """
+
trigger_timeout: timedelta | None = None
+ method_name: str
+ """The method on the operator to call when the trigger fires."""
+ kwargs: Annotated[dict[str, Any] | str, Field(default_factory=dict)]
Review Comment:
And this one as "next_method_kwargs" -- not to sure but seemed to make it
clearer in my mind.
##########
task_sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -229,6 +229,19 @@ class DeferTask(TIDeferredStatePayload):
type: Literal["DeferTask"] = "DeferTask"
+ @field_serializer("trigger_kwargs", "kwargs", check_fields=True)
+ def _serde_kwarg_fields(self, val, _info):
Review Comment:
For val and _info too
##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -122,15 +121,22 @@ class TIDeferredStatePayload(StrictBaseModel):
),
]
classpath: str
- trigger_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
- next_method: str
+ trigger_kwargs: Annotated[dict[str, Any] | str,
Field(default_factory=dict)]
+ """
+ Kwargs to pass to the trigger constructor, either a plain dict or an
ecnrypted string.
+
+ Both forms will be passed along to the trigger, the server will not handle
either.
+ """
+
trigger_timeout: timedelta | None = None
+ method_name: str
Review Comment:
Dont we want to continue calling this "next_method"?
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -962,16 +962,18 @@ def supervise(
# TODO: Use logging providers to handle the chunked upload for us etc.
logger: FilteringBoundLogger | None = None
if log_path:
- # If we are told to write logs to a file, redirect the task logger to
it.
+ # If we are told to write logs to a file, redirect the task logger to
it. Make sure we append to the
+ # file though, otherwise when we resume we would loose the logs from
the start->deferral segment if it
Review Comment:
```suggestion
# file though, otherwise when we resume we would lose the logs from
the start->deferral segment if it
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]