syedahsn commented on code in PR #32534:
URL: https://github.com/apache/airflow/pull/32534#discussion_r1268336471
##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1227,6 +1256,45 @@ def execute(self, context: Context) -> str | None:
return self.job_id
+ def start_job_run_after_defer(self, context: Context, event: dict[str,
Any] | None = None) -> None:
+ if event is None:
+ self.log.error("Trigger error: event is None")
+ raise AirflowException("Trigger error: event is None")
+ elif event["status"] != "success":
+ self.log.info("Application %s failed to start",
self.application_id)
+ response = self.hook.conn.start_job_run(
+ clientToken=self.client_request_token,
+ applicationId=self.application_id,
+ executionRoleArn=self.execution_role_arn,
+ jobDriver=self.job_driver,
+ configurationOverrides=self.configuration_overrides,
+ name=self.name,
+ **self.config,
+ )
+
+ if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
+ raise AirflowException(f"EMR serverless job failed to start:
{response}")
+
+ self.defer(
+ trigger=EmrServerlessStartJobTrigger(
+ application_id=self.application_id,
+ job_id=response["jobRunId"],
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
+ if event is None:
+ self.log.error("Trigger error: event is None")
+ raise AirflowException("Trigger error: event is None")
+ elif event["status"] == "success":
+ self.log.info("Serverless job completed")
+ return event["job_id"]
Review Comment:
I addressed this in another [comment
](https://github.com/apache/airflow/pull/32513#discussion_r1265494594).
Basically, my point was that the only thing setting `event["status"]` is the
Trigger, and it does not set any state beside `success` so it should never be
set to anything else. I'm testing for the `success` state just so the code is
easier to understand without having to look at the Trigger code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]