Copilot commented on code in PR #64956:
URL: https://github.com/apache/airflow/pull/64956#discussion_r3066483323
##########
providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py:
##########
@@ -77,9 +79,18 @@
class TestCloudComposerCreateEnvironmentOperator:
+ @staticmethod
+ def _mock_environment(state: int) -> mock.MagicMock:
+ environment = mock.MagicMock()
+ environment.state = state
+ return environment
Review Comment:
New helper `_mock_environment` creates a `MagicMock()` without
`spec`/`autospec`, which can hide attribute/typing mistakes in tests. Consider
using a real `Environment` instance or `mock.MagicMock(spec=Environment)` (or
`spec_set`) so the test double matches the production object more closely.
##########
providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py:
##########
@@ -137,6 +137,23 @@ def extra_links_params(self) -> dict[str, Any]:
"environment_id": self.environment_id,
}
+ def _raise_if_environment_not_running(self, environment: Environment) ->
None:
+ env_state = Environment.State(environment.state)
+ self.log.info("Environment state: %s", env_state.name)
+
+ self.log.info(
+ "Composer environment final state: raw=%s, name=%s, value=%s",
+ environment.state,
+ env_state.name,
+ env_state.value,
+ )
+
+ if env_state != Environment.State.RUNNING:
+ raise AirflowException(
+ "Create environment operation completed, but the Composer
environment is not in RUNNING state. "
Review Comment:
`_raise_if_environment_not_running` is also called from the `AlreadyExists`
path, but the raised message says "Create environment operation completed".
This makes the error misleading when the environment pre-exists; consider
rephrasing the exception text to be context-agnostic (e.g., "Composer
environment is not in RUNNING state").
##########
providers/google/tests/system/google/cloud/composer/example_cloud_composer.py:
##########
@@ -104,6 +110,61 @@ def get_project_number():
raise exc
+# [START howto_operator_composer_retry_cleanup]
+def cleanup_failed_environment_before_retry(context: dict[str, Any]) -> None:
+ task = context["task"]
+
+ hook = CloudComposerHook(
+ gcp_conn_id=task.gcp_conn_id,
+ impersonation_chain=task.impersonation_chain,
+ )
+
+ environment_id = task.environment_id
+
+ log.info(
+ "Retry cleanup started for Composer env. project_id=%s region=%s
environment_id=%s",
+ PROJECT_ID,
+ REGION,
+ environment_id,
+ )
+
+ try:
+ environment = hook.get_environment(
+ project_id=PROJECT_ID,
+ region=REGION,
+ environment_id=environment_id,
+ )
Review Comment:
`cleanup_failed_environment_before_retry` uses module-level
`PROJECT_ID`/`REGION` instead of the retried task's `project_id`/`region`. If
the operator is configured differently (templating, different region), this
callback could query/delete the wrong environment. Consider reading
`project_id`/`region` from `context['task']` for correctness and reusability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]