Lee-W commented on code in PR #34071:
URL: https://github.com/apache/airflow/pull/34071#discussion_r1316789161


##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -404,6 +479,40 @@ def start_cluster(self, json: dict) -> None:
         """
         self._do_api_call(START_CLUSTER_ENDPOINT, json)
 
+    def activate_cluster(self, json: dict, polling: int, timeout: int | None = 
None) -> None:
+        """
+        Start the cluster, and wait for it to be ready.
+
+        :param json: json dictionary containing cluster specification.
+        :param polling: polling interval in seconds.
+        :param timeout: timeout in seconds. -1 means no timeout.

Review Comment:
   ```suggestion
           :param timeout: timeout in seconds.
   ```
   
   I believe this is no longer needed



##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -114,6 +116,57 @@ def from_json(cls, data: str) -> RunState:
         return RunState(**json.loads(data))
 
 
+class ClusterState:
+    """
+    Utility class for the cluster state concept of Databricks cluster.
+    """
+
+    CLUSTER_LIFE_CYCLE_STATES = [
+        "PENDING",
+        "RUNNING",
+        "RESTARTING",
+        "RESIZING",
+        "TERMINATING",
+        "TERMINATED",
+        "ERROR",
+        "UNKNOWN",
+    ]
+
+    def __init__(self, state: str = "", state_message: str = "", *args, 
**kwargs) -> None:
+        self.state = state
+        self.state_message = state_message
+
+    @property
+    def is_terminal(self) -> bool:
+        """True if the current state is a terminal state."""
+        if self.state not in self.CLUSTER_LIFE_CYCLE_STATES:
+            raise AirflowException(
+                f"Unexpected cluster life cycle state: {self.state}"
+            )
+        return self.state in (
+            "TERMINATING", "TERMINATED", "ERROR", "UNKNOWN"
+        )
+
+    @property
+    def is_running(self) -> bool:
+        """True if the current state is running."""
+        return self.state in ("RUNNING", "RESIZING")
+
+    def __eq__(self, other) -> bool:
+        return self.state == other.state and \
+            self.state_message == other.state_message
+
+    def __repr__(self) -> str:
+        return str(self.__dict__)
+
+    def to_json(self) -> str:
+        return json.dumps(self.__dict__)

Review Comment:
   Just didn't understand when will we use it. But I'm ok with keeping them



##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -404,6 +479,40 @@ def start_cluster(self, json: dict) -> None:
         """
         self._do_api_call(START_CLUSTER_ENDPOINT, json)
 
+    def activate_cluster(self, json: dict, polling: int, timeout: int | None = 
None) -> None:
+        """
+        Start the cluster, and wait for it to be ready.
+
+        :param json: json dictionary containing cluster specification.
+        :param polling: polling interval in seconds.
+        :param timeout: timeout in seconds. -1 means no timeout.
+        """
+        cluster_id = json["cluster_id"]
+
+        api_called = False
+        elapsed_time = 0
+
+        while True:
+            run_state = self.get_cluster_state(cluster_id)
+
+            if run_state.is_running:
+                return
+            elif run_state.is_terminal:
+                if not api_called:
+                    self.start_cluster(json)
+                    api_called = True
+                else:
+                    raise AirflowException(
+                        f"Cluster {cluster_id} start failed with 
'{run_state.state}' state: {run_state.state_message}"
+                    )

Review Comment:
   ```suggestion
                   if api_called:
                       raise AirflowException(
                           f"Cluster {cluster_id} start failed with 
'{run_state.state}' state: {run_state.state_message}"
                       )
   
                   self.start_cluster(json)
                   api_called = True
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to