Copilot commented on code in PR #56158:
URL: https://github.com/apache/airflow/pull/56158#discussion_r2384326798
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -748,8 +751,11 @@ def execute(self, context: Context) -> str | None:
job_flow_id=self._job_flow_id,
log_uri=get_log_uri(emr_client=self.hook.conn,
job_flow_id=self._job_flow_id),
)
- if self.wait_policy:
- waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy]
+ if self.wait_for_completion:
+ if self.wait_policy:
+ waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy]
+ else:
+ waiter_name =
WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION]
Review Comment:
The logic is incorrect. When `wait_policy` is deprecated and mapped to
`wait_for_completion`, the code should not still check `self.wait_policy`
inside the `wait_for_completion` block. This could cause issues when both
parameters are provided or when `wait_policy` is None but `wait_for_completion`
is True.
```suggestion
waiter_name =
WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION]
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -698,19 +697,23 @@ def __init__(
super().__init__(**kwargs)
self.emr_conn_id = emr_conn_id
self.job_flow_overrides = job_flow_overrides or {}
- self.wait_policy = wait_policy
+ self.wait_for_completion = wait_for_completion
self.waiter_max_attempts = waiter_max_attempts or 60
self.waiter_delay = waiter_delay or 60
self.deferrable = deferrable
+ self.wait_policy = wait_policy
- if wait_for_completion is not None:
+ if wait_policy is not None:
warnings.warn(
- "`wait_for_completion` parameter is deprecated, please use
`wait_policy` instead.",
+ "`wait_policy` parameter is deprecated, please use
`wait_for_completion` instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
# preserve previous behaviour
- self.wait_policy = WaitPolicy.WAIT_FOR_COMPLETION if
wait_for_completion else None
+ self.wait_for_completion = wait_policy in (
+ WaitPolicy.WAIT_FOR_COMPLETION,
+ WaitPolicy.WAIT_FOR_STEPS_COMPLETION,
+ )
Review Comment:
This mapping loses important information. Both `WAIT_FOR_COMPLETION` and
`WAIT_FOR_STEPS_COMPLETION` are mapped to `True`, but they have different
behaviors in the original wait_policy logic. The `WAIT_FOR_STEPS_COMPLETION`
behavior is now lost when using the deprecated parameter.
```suggestion
self.wait_policy = wait_policy
if wait_policy == WaitPolicy.WAIT_FOR_COMPLETION:
self.wait_for_completion = True
elif wait_policy == WaitPolicy.WAIT_FOR_STEPS_COMPLETION:
self.wait_for_completion = "steps"
else:
self.wait_for_completion = False
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]