syedahsn commented on code in PR #32534:
URL: https://github.com/apache/airflow/pull/32534#discussion_r1268426707
##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1227,6 +1256,45 @@ def execute(self, context: Context) -> str | None:
return self.job_id
+ def start_job_run_after_defer(self, context: Context, event: dict[str,
Any] | None = None) -> None:
+ if event is None:
+ self.log.error("Trigger error: event is None")
+ raise AirflowException("Trigger error: event is None")
+ elif event["status"] != "success":
+ self.log.info("Application %s failed to start",
self.application_id)
+ response = self.hook.conn.start_job_run(
+ clientToken=self.client_request_token,
+ applicationId=self.application_id,
+ executionRoleArn=self.execution_role_arn,
+ jobDriver=self.job_driver,
+ configurationOverrides=self.configuration_overrides,
+ name=self.name,
+ **self.config,
+ )
+
+ if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
+ raise AirflowException(f"EMR serverless job failed to start:
{response}")
+
+ self.defer(
+ trigger=EmrServerlessStartJobTrigger(
+ application_id=self.application_id,
+ job_id=response["jobRunId"],
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
+ if event is None:
+ self.log.error("Trigger error: event is None")
+ raise AirflowException("Trigger error: event is None")
+ elif event["status"] == "success":
+ self.log.info("Serverless job completed")
+ return event["job_id"]
+
def on_kill(self) -> None:
Review Comment:
Why would the behaviour be different in the deferred case? Even in the
deferrable case, everything is happening from the executor, so if the executor
is killed, the `on_kill` method should still be run
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]