vandonr-amz commented on code in PR #31646:
URL: https://github.com/apache/airflow/pull/31646#discussion_r1221994792
##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -830,11 +849,32 @@ def execute(self, context: Context) -> None:
self.log.info("Terminating JobFlow %s", self.job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
+ if self.deferrable:
+ self.defer(
+ trigger=EmrTerminateJobFlowTrigger(
+ job_flow_id=self.job_flow_id,
+ poll_interval=self.waiter_delay,
+ max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ # timeout is set to ensure that if a trigger dies, the timeout
does not restart
+ # 60 seconds is added to allow the trigger to exit gracefully
(i.e. yield TriggerEvent)
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay + 60),
+ )
+
if not response["ResponseMetadata"]["HTTPStatusCode"] == 200:
raise AirflowException(f"JobFlow termination failed: {response}")
Review Comment:
this should be done before the `defer`, right ?
##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -830,11 +849,32 @@ def execute(self, context: Context) -> None:
self.log.info("Terminating JobFlow %s", self.job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
+ if self.deferrable:
Review Comment:
oh we're adding deferrable capability to an operator that didn't have a
`wait_for_completion` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]