enewnham commented on issue #45651:
URL: https://github.com/apache/airflow/issues/45651#issuecomment-2590860058
Just realizing we relied on an undefined behavior of
`EmrCreateJobFlowOperator` in that before it would wait for the cluster to
terminate when `KeepJobFlowAliveWhenNoSteps=False`. The new behavior is that is
no longer the case.
For others who may of stubbed their toe, we wrapped the operator with our
own operator and added the following code.
```
def wait_for_terminate(self, job_flow_id: str):
# We need to wait for the job to complete before we can get the
status of the group by step
# Since KeepJobFlowAliveWhenNoSteps is set to False, the cluster
will terminate after the job is done
if self.deferrable:
self.defer(
trigger=EmrTerminateJobFlowTrigger(
job_flow_id=job_flow_id,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
aws_conn_id=self.aws_conn_id,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the
timeout does not restart
# 60 seconds is added to allow the trigger to exit
gracefully (i.e. yield TriggerEvent)
timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay + 60),
)
else:
super()._emr_hook.get_waiter("job_flow_terminated").wait(
ClusterId=self._job_flow_id,
WaiterConfig=prune_dict(
{
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
}
),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]