ellisms commented on code in PR #38287:
URL: https://github.com/apache/airflow/pull/38287#discussion_r1550288490


##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -174,17 +253,91 @@ def __init__(
         self.delay = waiter_delay
         self.max_attempts = waiter_max_attempts
 
-    def execute(self, context: Context) -> dict[str, str]:
+    def execute(self, context: Context, event: dict[str, Any] | None = None, 
**kwargs) -> dict[str, str]:
         self.log.info("Stopping Neptune cluster: %s", self.cluster_id)
 
-        # Check to make sure the cluster is not already stopped.
+        if event:
+            # returning from a previous defer, need to restore properties
+            self.cluster_id = kwargs.get("cluster_id", self.cluster_id)
+            self.deferrable = kwargs.get("defer", self.deferrable)
+            self.delay = kwargs.get("waiter_delay", self.delay)
+            self.max_attempts = kwargs.get("waiter_max_attempts", 
self.max_attempts)
+            self.wait_for_completion = kwargs.get("wait_for_completion", 
self.wait_for_completion)
+            self.aws_conn_id = kwargs.get("aws_conn_id", self.aws_conn_id)
+            self.log.info("Restored properties from deferral")
+
+        # Check to make sure the cluster is not already stopped or that its 
not in a bad state
         status = self.hook.get_cluster_status(self.cluster_id)
+        self.log.info("Current status: %s", status)
+
         if status.lower() in NeptuneHook.STOPPED_STATES:
             self.log.info("Neptune cluster %s is already stopped.", 
self.cluster_id)
             return {"db_cluster_id": self.cluster_id}
-
-        resp = 
self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.cluster_id)
-        status = resp.get("DBClusters", {}).get("Status", "Unknown")
+        elif status.lower() in NeptuneHook.ERROR_STATES:
+            # some states will not allow you to stop the cluster
+            self.log.error(
+                "Neptune cluster %s is in error state %s and cannot be 
stopped", self.cluster_id, status
+            )
+            raise AirflowException(f"Neptune cluster {self.cluster_id} is in 
error state {status}")
+
+        """
+        A cluster and its instances must be in a valid state to send the stop 
request.
+        This loop covers the case where the cluster is not available and also 
the case where
+        the cluster is available, but one or more of the instances are in an 
invalid state.
+        If either are in an invalid state, wait for the availability and retry.
+        Let the waiters handle retries and detecting the error states.
+        """
+
+        try:
+            self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.cluster_id)
+
+        # cluster must be in available state to stop it
+        except ClientError as ex:
+            code = ex.response["Error"]["Code"]
+            self.log.warning("Received client error when attempting to stop 
the cluster: %s", code)
+
+            if code in ["InvalidDBInstanceStateFault", 
"InvalidClusterStateFault"]:

Review Comment:
   Yep, let me try it and see how it goes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to