Taragolis commented on code in PR #29447:
URL: https://github.com/apache/airflow/pull/29447#discussion_r1145096127
##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -471,39 +470,40 @@ def __init__(
self.awslogs_region = self.region
self.arn: str | None = None
+ self.started_by: str | None = None
+
self.retry_args = quota_retry
self.task_log_fetcher: EcsTaskLogFetcher | None = None
self.wait_for_completion = wait_for_completion
- @provide_session
- def execute(self, context, session=None):
+ def execute(self, context):
self.log.info(
"Running ECS Task - Task definition: %s - on cluster %s",
self.task_definition, self.cluster
)
self.log.info("EcsOperator overrides: %s", self.overrides)
if self.reattach:
- self._try_reattach_task(context)
+ # Generate deterministic UUID which refers to unique
TaskInstanceKey
+ ti: TaskInstance = context["ti"]
+ self.started_by = generate_uuid(*map(str, ti.key.primary))
+ if self.do_xcom_push:
+ ti.xcom_push("started_by", self.started_by)
Review Comment:
All XComs values associated to the task cleared before it started. For
current implementation (in `main`) EcsRunTaskOperator use non-documented
feature (non-public interface usage) for save XCom to not existed task which do
not cleared when you reset task run. And potentially it also could damage user
Airflow Database (e.g. constraints violations)
This value only for the reference for example if user want to find this task
in ECS (e.g. in AWS Console / AWS SDK) by provide `startedBy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]