pankajkoti commented on code in PR #32029:
URL: https://github.com/apache/airflow/pull/32029#discussion_r1235207877


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -507,6 +509,7 @@ def __init__(
         max_tries: int | None = None,
         tags: dict | None = None,
         max_polling_attempts: int | None = None,
+        deferrable: bool = False,

Review Comment:
   @Lee-W is making a set of changes in PR #31712 where we're adding a 
`DEFAUTL_DEFERRABLE` constant and using it here.
   
   Good to check if we can add something similar here 
https://github.com/apache/airflow/pull/31712/files#diff-635302602dc71ef2ce1e90acad25865065066afdae24fabf3d521a0c0561389eR154



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -555,6 +559,22 @@ def execute(self, context: Context) -> str | None:
             self.client_request_token,
             self.tags,
         )
+        if self.deferrable:
+            timeout = (
+                timedelta(seconds=self.max_polling_attempts * 
self.poll_interval + 60)

Review Comment:
   Can you please help me understand why we are adding 60 seconds here? Also, 
would it make sense to have it as some constant/config param?



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -317,3 +317,75 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
                     await asyncio.sleep(int(self.poll_interval))
 
             yield TriggerEvent({"status": "success", "job_id": self.job_id})
+
+
+class EmrStepSensorTrigger(BaseTrigger):
+    """
+    Poll for the status of EMR container until reaches terminal state.
+
+    :param virtual_cluster_id: Reference Emr cluster id
+    :param job_id:  job_id to check the state
+    :param aws_conn_id: Reference to AWS connection id
+    :param poll_interval: polling period in seconds to check for the status
+    """
+
+    def __init__(
+        self,
+        job_flow_id: str,
+        step_id: str,
+        target_states: Iterable[str],
+        aws_conn_id: str = "aws_default",
+        poll_interval: int = 30,

Review Comment:
   should we call this `poke_interval` instead? I see the operator is also 
passing this as `poke_interval`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to