Copilot commented on code in PR #56158:
URL: https://github.com/apache/airflow/pull/56158#discussion_r2384326798


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -748,8 +751,11 @@ def execute(self, context: Context) -> str | None:
                 job_flow_id=self._job_flow_id,
                 log_uri=get_log_uri(emr_client=self.hook.conn, 
job_flow_id=self._job_flow_id),
             )
-        if self.wait_policy:
-            waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy]
+        if self.wait_for_completion:
+            if self.wait_policy:
+                waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy]
+            else:
+                waiter_name = 
WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION]

Review Comment:
   The logic is incorrect. When `wait_policy` is deprecated and mapped to 
`wait_for_completion`, the code should not still check `self.wait_policy` 
inside the `wait_for_completion` block. This could cause issues when both 
parameters are provided or when `wait_policy` is None but `wait_for_completion` 
is True.
   ```suggestion
               waiter_name = 
WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION]
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -698,19 +697,23 @@ def __init__(
         super().__init__(**kwargs)
         self.emr_conn_id = emr_conn_id
         self.job_flow_overrides = job_flow_overrides or {}
-        self.wait_policy = wait_policy
+        self.wait_for_completion = wait_for_completion
         self.waiter_max_attempts = waiter_max_attempts or 60
         self.waiter_delay = waiter_delay or 60
         self.deferrable = deferrable
+        self.wait_policy = wait_policy
 
-        if wait_for_completion is not None:
+        if wait_policy is not None:
             warnings.warn(
-                "`wait_for_completion` parameter is deprecated, please use 
`wait_policy` instead.",
+                "`wait_policy` parameter is deprecated, please use 
`wait_for_completion` instead.",
                 AirflowProviderDeprecationWarning,
                 stacklevel=2,
             )
             # preserve previous behaviour
-            self.wait_policy = WaitPolicy.WAIT_FOR_COMPLETION if 
wait_for_completion else None
+            self.wait_for_completion = wait_policy in (
+                WaitPolicy.WAIT_FOR_COMPLETION,
+                WaitPolicy.WAIT_FOR_STEPS_COMPLETION,
+            )

Review Comment:
   This mapping loses important information. Both `WAIT_FOR_COMPLETION` and 
`WAIT_FOR_STEPS_COMPLETION` are mapped to `True`, but they have different 
behaviors in the original wait_policy logic. The `WAIT_FOR_STEPS_COMPLETION` 
behavior is now lost when using the deprecated parameter.
   ```suggestion
               self.wait_policy = wait_policy
               if wait_policy == WaitPolicy.WAIT_FOR_COMPLETION:
                   self.wait_for_completion = True
               elif wait_policy == WaitPolicy.WAIT_FOR_STEPS_COMPLETION:
                   self.wait_for_completion = "steps"
               else:
                   self.wait_for_completion = False
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to