vandonr-amz commented on code in PR #32534:
URL: https://github.com/apache/airflow/pull/32534#discussion_r1267353688


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1227,6 +1256,45 @@ def execute(self, context: Context) -> str | None:
 
         return self.job_id
 
+    def start_job_run_after_defer(self, context: Context, event: dict[str, 
Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            self.log.info("Application %s failed to start", 
self.application_id)
+        response = self.hook.conn.start_job_run(
+            clientToken=self.client_request_token,
+            applicationId=self.application_id,
+            executionRoleArn=self.execution_role_arn,
+            jobDriver=self.job_driver,
+            configurationOverrides=self.configuration_overrides,
+            name=self.name,
+            **self.config,
+        )
+
+        if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
+            raise AirflowException(f"EMR serverless job failed to start: 
{response}")
+
+        self.defer(
+            trigger=EmrServerlessStartJobTrigger(
+                application_id=self.application_id,
+                job_id=response["jobRunId"],
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                aws_conn_id=self.aws_conn_id,
+            ),
+            method_name="execute_complete",
+            timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] == "success":
+            self.log.info("Serverless job completed")
+            return event["job_id"]
+
     def on_kill(self) -> None:

Review Comment:
   little remark on this, it could be worth it to mention in the doc that the 
job is stopped if the operator is killed, EXCEPT if deferred.



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1227,6 +1256,45 @@ def execute(self, context: Context) -> str | None:
 
         return self.job_id
 
+    def start_job_run_after_defer(self, context: Context, event: dict[str, 
Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            self.log.info("Application %s failed to start", 
self.application_id)
+        response = self.hook.conn.start_job_run(
+            clientToken=self.client_request_token,
+            applicationId=self.application_id,
+            executionRoleArn=self.execution_role_arn,
+            jobDriver=self.job_driver,
+            configurationOverrides=self.configuration_overrides,
+            name=self.name,
+            **self.config,
+        )
+
+        if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
+            raise AirflowException(f"EMR serverless job failed to start: 
{response}")
+
+        self.defer(
+            trigger=EmrServerlessStartJobTrigger(
+                application_id=self.application_id,
+                job_id=response["jobRunId"],
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                aws_conn_id=self.aws_conn_id,
+            ),
+            method_name="execute_complete",
+            timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] == "success":
+            self.log.info("Serverless job completed")
+            return event["job_id"]

Review Comment:
   I think that discussion has already taken place in a different PR maybe ? 
But we ignore events where status != success, maybe we'd want to raise or at 
least log something if that happens ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to