Lee-W commented on code in PR #34071:
URL: https://github.com/apache/airflow/pull/34071#discussion_r1314776171
##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -114,6 +116,57 @@ def from_json(cls, data: str) -> RunState:
return RunState(**json.loads(data))
+class ClusterState:
+ """
+ Utility class for the cluster state concept of Databricks cluster.
+ """
+
+ CLUSTER_LIFE_CYCLE_STATES = [
+ "PENDING",
+ "RUNNING",
+ "RESTARTING",
+ "RESIZING",
+ "TERMINATING",
+ "TERMINATED",
+ "ERROR",
+ "UNKNOWN",
+ ]
+
+ def __init__(self, state: str = "", state_message: str = "", *args,
**kwargs) -> None:
+ self.state = state
+ self.state_message = state_message
+
+ @property
+ def is_terminal(self) -> bool:
+ """True if the current state is a terminal state."""
+ if self.state not in self.CLUSTER_LIFE_CYCLE_STATES:
+ raise AirflowException(
+ f"Unexpected cluster life cycle state: {self.state}"
+ )
+ return self.state in (
+ "TERMINATING", "TERMINATED", "ERROR", "UNKNOWN"
+ )
+
+ @property
+ def is_running(self) -> bool:
+ """True if the current state is running."""
+ return self.state in ("RUNNING", "RESIZING")
+
+ def __eq__(self, other) -> bool:
+ return self.state == other.state and \
+ self.state_message == other.state_message
+
+ def __repr__(self) -> str:
+ return str(self.__dict__)
+
+ def to_json(self) -> str:
+ return json.dumps(self.__dict__)
Review Comment:
May I know why these 2 methods are needed
##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -404,6 +483,35 @@ def start_cluster(self, json: dict) -> None:
"""
self._do_api_call(START_CLUSTER_ENDPOINT, json)
+ def activate_cluster(self, json: dict, polling: int, timeout: int = -1) ->
None:
+ """
+ Start the cluster, and wait for it to be ready.
+
+ :param json: json dictionary containing cluster specification.
+ :param polling: polling interval in seconds.
+ :param timeout: timeout in seconds. -1 means no timeout.
+ """
+ api_called = False
+ elapsed_time = 0
+
+ while (timeout == -1) or (elapsed_time <= timeout):
+ run_state = self.get_cluster_state(cluster_id)
+
+ if run_state.is_running:
+ return
+ elif run_state.is_terminal:
+ if not api_called:
+ self.start_cluster({'cluster_id': cluster_id})
+ api_called = True
+ else:
+ raise AirflowException(
+ f"Cluster {cluster_id} start failed with
'{run_state.state}' state: {run_state.state_message}"
+ )
+
+ # wait for cluster to start
+ time.sleep(polling)
+ elapsed_time += polling
Review Comment:
```suggestion
while True:
run_state = self.get_cluster_state(cluster_id)
if run_state.is_running:
return
elif run_state.is_terminal:
if not api_called:
self.start_cluster({'cluster_id': cluster_id})
api_called = True
else:
raise AirflowException(
f"Cluster {cluster_id} start failed with
'{run_state.state}' state: {run_state.state_message}"
)
# wait for cluster to start
time.sleep(polling)
elapsed_time += polling
if timeout and elapsed_time <= timeout:
break
```
##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -114,6 +116,57 @@ def from_json(cls, data: str) -> RunState:
return RunState(**json.loads(data))
+class ClusterState:
+ """
+ Utility class for the cluster state concept of Databricks cluster.
+ """
+
+ CLUSTER_LIFE_CYCLE_STATES = [
+ "PENDING",
+ "RUNNING",
+ "RESTARTING",
+ "RESIZING",
+ "TERMINATING",
+ "TERMINATED",
+ "ERROR",
+ "UNKNOWN",
+ ]
+
+ def __init__(self, state: str = "", state_message: str = "", *args,
**kwargs) -> None:
+ self.state = state
+ self.state_message = state_message
+
+ @property
+ def is_terminal(self) -> bool:
+ """True if the current state is a terminal state."""
+ if self.state not in self.CLUSTER_LIFE_CYCLE_STATES:
+ raise AirflowException(
+ f"Unexpected cluster life cycle state: {self.state}"
+ )
+ return self.state in (
+ "TERMINATING", "TERMINATED", "ERROR", "UNKNOWN"
+ )
+
+ @property
+ def is_running(self) -> bool:
+ """True if the current state is running."""
+ return self.state in ("RUNNING", "RESIZING")
+
+ def __eq__(self, other) -> bool:
+ return self.state == other.state and \
+ self.state_message == other.state_message
Review Comment:
```suggestion
return (
self.state == other.state and
self.state_message == other.state_message
)
```
##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -404,6 +483,35 @@ def start_cluster(self, json: dict) -> None:
"""
self._do_api_call(START_CLUSTER_ENDPOINT, json)
+ def activate_cluster(self, json: dict, polling: int, timeout: int = -1) ->
None:
+ """
+ Start the cluster, and wait for it to be ready.
+
+ :param json: json dictionary containing cluster specification.
+ :param polling: polling interval in seconds.
+ :param timeout: timeout in seconds. -1 means no timeout.
+ """
+ api_called = False
+ elapsed_time = 0
+
+ while (timeout == -1) or (elapsed_time <= timeout):
+ run_state = self.get_cluster_state(cluster_id)
+
+ if run_state.is_running:
+ return
+ elif run_state.is_terminal:
+ if not api_called:
+ self.start_cluster({'cluster_id': cluster_id})
+ api_called = True
+ else:
+ raise AirflowException(
+ f"Cluster {cluster_id} start failed with
'{run_state.state}' state: {run_state.state_message}"
+ )
+
+ # wait for cluster to start
+ time.sleep(polling)
+ elapsed_time += polling
Review Comment:
Should we do anything if it passes the timeout?
##########
airflow/providers/databricks/hooks/databricks.py:
##########
@@ -404,6 +483,35 @@ def start_cluster(self, json: dict) -> None:
"""
self._do_api_call(START_CLUSTER_ENDPOINT, json)
+ def activate_cluster(self, json: dict, polling: int, timeout: int = -1) ->
None:
Review Comment:
```suggestion
def activate_cluster(self, json: dict, polling: int, timeout: int | None
= None) -> None:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]