Copilot commented on code in PR #66815:
URL: https://github.com/apache/airflow/pull/66815#discussion_r3322963230


##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py:
##########
@@ -296,6 +307,29 @@ def execute(self, context: Context) -> None:
         )
         # Add task to job
         self.hook.add_single_task_to_job(job_id=self.batch_job_id, task=task)
+
+        if self.deferrable:
+            # Verify pool and nodes are in terminal state before deferral
+            pool = self.hook.connection.pool.get(self.batch_pool_id)
+            if pool.resize_errors:
+                raise RuntimeError(f"Pool resize errors: {pool.resize_errors}")
+
+            nodes = 
list(self.hook.connection.compute_node.list(self.batch_pool_id))
+            self.log.debug("Deferral pre-check: %d nodes present in pool %s", 
len(nodes), self.batch_pool_id)
+            end_time = time.time() + self.timeout
+

Review Comment:
   In deferrable mode the trigger timeout deadline is computed as `time.time() 
+ self.timeout`, but `timeout` is documented/used elsewhere as minutes (see 
AzureBatchHook.wait_for_job_tasks_to_complete). This makes deferrable runs time 
out ~60x too early (e.g. default 25 minutes becomes 25 seconds). Also prefer a 
monotonic deadline for duration measurement to avoid wall-clock adjustments; 
the trigger should compare against `time.monotonic()` as well.



##########
providers/microsoft/azure/docs/operators/batch.rst:
##########
@@ -32,6 +32,15 @@ Below is an example of using this operator to trigger a task 
on Azure Batch
     :start-after: [START howto_azure_batch_operator]
     :end-before: [END howto_azure_batch_operator]
 
+Below is an example of using this operator to trigger a task  on Azure Batch 
with a deferrable flag
+so that polling for the status of the pipeline run occurs on the Airflow 
Triggerer.

Review Comment:
   This deferrable example text looks copy/pasted from another operator: it 
mentions a “pipeline run” (Azure Batch runs jobs/tasks), and it has a double 
space in “task  on”. This is user-facing docs, so it should describe Azure 
Batch job/task polling accurately.



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/batch.py:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import time
+from collections.abc import AsyncIterator
+from typing import Any
+
+from azure.batch import models as batch_models
+
+from airflow.providers.microsoft.azure.hooks.batch import AzureBatchHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class AzureBatchTrigger(BaseTrigger):
+    """
+     Trigger when Azure Batch job tasks reach a terminal state.
+
+    :param job_id: Azure Batch job identifier.
+    :param azure_batch_conn_id: Azure Batch connection id.
+    :param end_time: Epoch timestamp when the trigger should timeout.
+    :param poll_interval: Poll interval in seconds.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        azure_batch_conn_id: str,
+        end_time: float,
+        poll_interval: int = 30,
+    ):
+        super().__init__()
+
+        self.job_id = job_id
+        self.azure_batch_conn_id = azure_batch_conn_id
+        self.end_time = end_time
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize trigger arguments and classpath."""
+        return (
+            f"{self.__class__.__module__}.{self.__class__.__name__}",
+            {
+                "job_id": self.job_id,
+                "azure_batch_conn_id": self.azure_batch_conn_id,
+                "end_time": self.end_time,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    def _get_incomplete_tasks(
+        self,
+        tasks: list[batch_models.CloudTask],
+    ) -> list[batch_models.CloudTask]:
+        """Return tasks that have not yet completed."""
+        return [task for task in tasks if task.state != 
batch_models.TaskState.completed]
+
+    def _build_trigger_event(
+        self,
+        tasks: list[batch_models.CloudTask],
+    ) -> TriggerEvent | None:
+        """
+        Convert Batch task states to TriggerEvent.
+
+        Returns None if tasks are still running.
+        """
+        if not tasks:
+            return TriggerEvent(
+                {
+                    "status": "error",
+                    "message": f"Azure Batch job {self.job_id} contains no 
tasks.",
+                    "job_id": self.job_id,
+                }
+            )
+
+        if self._get_incomplete_tasks(tasks):
+            return None
+
+        failed_tasks = [
+            task.id
+            for task in tasks
+            if task.execution_info and task.execution_info.result == 
batch_models.TaskExecutionResult.failure
+        ]
+
+        if failed_tasks:
+            return TriggerEvent(
+                {
+                    "status": "error",
+                    "message": f"Azure Batch job {self.job_id} failed.",
+                    "job_id": self.job_id,
+                    "failed_tasks": failed_tasks,
+                }
+            )
+
+        return TriggerEvent(
+            {
+                "status": "success",
+                "message": f"Azure Batch job {self.job_id} completed 
successfully.",
+                "job_id": self.job_id,
+            }
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:
+        """Poll Azure Batch job tasks until completion or timeout."""
+        hook = AzureBatchHook(
+            azure_batch_conn_id=self.azure_batch_conn_id,
+        )
+
+        try:
+            while time.time() <= self.end_time:
+                tasks = await asyncio.to_thread(lambda: 
list(hook.connection.task.list(self.job_id)))
+

Review Comment:
   This trigger uses `time.time()` to enforce the timeout (`end_time`). Airflow 
trigger implementations generally use `time.monotonic()` for deadline 
comparisons so they are not affected by system clock changes. If the operator 
passes a monotonic deadline (recommended), this loop must also use 
`time.monotonic()` for correct behavior.



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py:
##########
@@ -296,6 +307,29 @@ def execute(self, context: Context) -> None:
         )
         # Add task to job
         self.hook.add_single_task_to_job(job_id=self.batch_job_id, task=task)
+
+        if self.deferrable:
+            # Verify pool and nodes are in terminal state before deferral
+            pool = self.hook.connection.pool.get(self.batch_pool_id)
+            if pool.resize_errors:
+                raise RuntimeError(f"Pool resize errors: {pool.resize_errors}")
+
+            nodes = 
list(self.hook.connection.compute_node.list(self.batch_pool_id))
+            self.log.debug("Deferral pre-check: %d nodes present in pool %s", 
len(nodes), self.batch_pool_id)

Review Comment:
   The comment says “Verify pool and nodes are in terminal state before 
deferral”, but this block only checks for `pool.resize_errors` and logs the 
number of nodes; node readiness/terminal state is already handled by 
`wait_for_all_node_state()` above. This is misleading for future maintainers 
(and the extra `pool.get()` call duplicates the resize error check already done 
in the hook).



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/batch.py:
##########
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import time
+from collections.abc import AsyncIterator
+from typing import Any
+
+from azure.batch import models as batch_models
+
+from airflow.providers.microsoft.azure.hooks.batch import AzureBatchHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class AzureBatchTrigger(BaseTrigger):
+    """
+     Trigger when Azure Batch job tasks reach a terminal state.
+
+    :param job_id: Azure Batch job identifier.
+    :param azure_batch_conn_id: Azure Batch connection id.
+    :param end_time: Epoch timestamp when the trigger should timeout.
+    :param poll_interval: Poll interval in seconds.

Review Comment:
   The trigger docstring has an extra leading space, and `end_time` is 
described as an epoch timestamp even though the timeout logic should be based 
on a monotonic clock (to avoid wall-clock adjustments). Updating the docstring 
clarifies what should be passed from the operator and avoids confusion when 
debugging timeouts.



##########
providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_batch.py:
##########
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import time
+from unittest import mock
+
+import pytest
+from azure.batch import models as batch_models
+
+from airflow.providers.microsoft.azure.triggers.batch import AzureBatchTrigger
+from airflow.triggers.base import TriggerEvent
+
+AZURE_BATCH_CONN_ID = "azure_batch_default"
+JOB_ID = "test-job"
+POKE_INTERVAL = 5
+BATCH_END_TIME = time.time() + 60 * 60 * 24 * 7
+MODULE = "airflow.providers.microsoft.azure"

Review Comment:
   If the trigger uses a monotonic deadline (recommended), the test should also 
use `time.monotonic()` when constructing `BATCH_END_TIME`; otherwise the 
timeout tests end up mixing clock sources.



##########
providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_batch.py:
##########
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import time
+from unittest import mock
+
+import pytest
+from azure.batch import models as batch_models
+
+from airflow.providers.microsoft.azure.triggers.batch import AzureBatchTrigger
+from airflow.triggers.base import TriggerEvent
+
+AZURE_BATCH_CONN_ID = "azure_batch_default"
+JOB_ID = "test-job"
+POKE_INTERVAL = 5
+BATCH_END_TIME = time.time() + 60 * 60 * 24 * 7
+MODULE = "airflow.providers.microsoft.azure"
+
+
+class TestAzureBatchTrigger:
+    TRIGGER = AzureBatchTrigger(
+        job_id=JOB_ID,
+        azure_batch_conn_id=AZURE_BATCH_CONN_ID,
+        poll_interval=POKE_INTERVAL,
+        end_time=BATCH_END_TIME,
+    )
+
+    def test_batch_trigger_serialization(self):
+        classpath, kwargs = self.TRIGGER.serialize()
+
+        assert classpath == f"{MODULE}.triggers.batch.AzureBatchTrigger"
+
+        assert kwargs == {
+            "job_id": JOB_ID,
+            "azure_batch_conn_id": AZURE_BATCH_CONN_ID,
+            "poll_interval": POKE_INTERVAL,
+            "end_time": BATCH_END_TIME,
+        }
+
+    def test_build_trigger_event_success(self):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        event = self.TRIGGER._build_trigger_event([completed_task])
+
+        assert event is not None
+
+        assert event.payload == {
+            "status": "success",
+            "message": f"Azure Batch job {JOB_ID} completed successfully.",
+            "job_id": JOB_ID,
+        }
+
+    def test_build_trigger_event_failure(self):
+        failed_task = mock.MagicMock()
+        failed_task.id = "task1"
+        failed_task.state = batch_models.TaskState.completed
+        failed_task.execution_info.result = 
batch_models.TaskExecutionResult.failure
+
+        event = self.TRIGGER._build_trigger_event([failed_task])
+
+        assert event is not None
+
+        assert event.payload == {
+            "status": "error",
+            "message": f"Azure Batch job {JOB_ID} failed.",
+            "job_id": JOB_ID,
+            "failed_tasks": ["task1"],
+        }
+
+    def test_build_trigger_event_mixed_states(self):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        running_task = mock.MagicMock()
+        running_task.id = "task2"
+        running_task.state = batch_models.TaskState.running
+
+        event = self.TRIGGER._build_trigger_event([completed_task, 
running_task])
+
+        assert event is None
+
+    def test_build_trigger_event_empty_tasks(self):
+        event = self.TRIGGER._build_trigger_event([])
+
+        assert event is not None
+
+        assert event.payload == {
+            "status": "error",
+            "message": f"Azure Batch job {JOB_ID} contains no tasks.",
+            "job_id": JOB_ID,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.sleep")
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_non_terminal_sleeps(
+        self,
+        mock_to_thread,
+        mock_sleep,
+    ):
+        running_task = mock.MagicMock()
+        running_task.id = "task1"
+        running_task.state = batch_models.TaskState.running
+
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        mock_to_thread.side_effect = [
+            [running_task],
+            [completed_task],
+        ]
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "success",
+                    "message": f"Azure Batch job {JOB_ID} completed 
successfully.",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+        mock_sleep.assert_awaited_once_with(POKE_INTERVAL)
+
+    def test_build_trigger_event_non_terminal(self):
+        running_task = mock.MagicMock()
+        running_task.id = "task1"
+        running_task.state = batch_models.TaskState.running
+
+        event = self.TRIGGER._build_trigger_event([running_task])
+
+        assert event is None
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_success(self, mock_to_thread):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        mock_to_thread.return_value = [completed_task]
+
+        generator = self.TRIGGER.run()
+        actual = await generator.asend(None)
+
+        assert actual == TriggerEvent(
+            {
+                "status": "success",
+                "message": f"Azure Batch job {JOB_ID} completed successfully.",
+                "job_id": JOB_ID,
+            }
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_failure(self, mock_to_thread):
+        failed_task = mock.MagicMock()
+        failed_task.id = "task1"
+        failed_task.state = batch_models.TaskState.completed
+        failed_task.execution_info.result = 
batch_models.TaskExecutionResult.failure
+
+        mock_to_thread.return_value = [failed_task]
+
+        generator = self.TRIGGER.run()
+        actual = await generator.asend(None)
+
+        assert actual == TriggerEvent(
+            {
+                "status": "error",
+                "message": f"Azure Batch job {JOB_ID} failed.",
+                "job_id": JOB_ID,
+                "failed_tasks": ["task1"],
+            }
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_exception(self, mock_to_thread):
+        mock_to_thread.side_effect = Exception("API failure")
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "error",
+                    "message": "API failure",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_empty_tasks(self, mock_to_thread):
+        mock_to_thread.return_value = []
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "error",
+                    "message": f"Azure Batch job {JOB_ID} contains no tasks.",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.time")
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_timeout_job_already_succeeded(
+        self,
+        mock_to_thread,
+        mock_time,
+    ):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        mock_to_thread.return_value = [completed_task]
+
+        mock_time.time.return_value = BATCH_END_TIME + 60
+

Review Comment:
   These timeout tests patch `triggers.batch.time.time`, which will stop 
working once the trigger’s timeout loop uses `time.monotonic()` (recommended 
for duration measurement). Patch `time.monotonic` instead so the tests remain 
stable and aligned with production behavior.



##########
providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_batch.py:
##########
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import time
+from unittest import mock
+
+import pytest
+from azure.batch import models as batch_models
+
+from airflow.providers.microsoft.azure.triggers.batch import AzureBatchTrigger
+from airflow.triggers.base import TriggerEvent
+
+AZURE_BATCH_CONN_ID = "azure_batch_default"
+JOB_ID = "test-job"
+POKE_INTERVAL = 5
+BATCH_END_TIME = time.time() + 60 * 60 * 24 * 7
+MODULE = "airflow.providers.microsoft.azure"
+
+
+class TestAzureBatchTrigger:
+    TRIGGER = AzureBatchTrigger(
+        job_id=JOB_ID,
+        azure_batch_conn_id=AZURE_BATCH_CONN_ID,
+        poll_interval=POKE_INTERVAL,
+        end_time=BATCH_END_TIME,
+    )
+
+    def test_batch_trigger_serialization(self):
+        classpath, kwargs = self.TRIGGER.serialize()
+
+        assert classpath == f"{MODULE}.triggers.batch.AzureBatchTrigger"
+
+        assert kwargs == {
+            "job_id": JOB_ID,
+            "azure_batch_conn_id": AZURE_BATCH_CONN_ID,
+            "poll_interval": POKE_INTERVAL,
+            "end_time": BATCH_END_TIME,
+        }
+
+    def test_build_trigger_event_success(self):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        event = self.TRIGGER._build_trigger_event([completed_task])
+
+        assert event is not None
+
+        assert event.payload == {
+            "status": "success",
+            "message": f"Azure Batch job {JOB_ID} completed successfully.",
+            "job_id": JOB_ID,
+        }
+
+    def test_build_trigger_event_failure(self):
+        failed_task = mock.MagicMock()
+        failed_task.id = "task1"
+        failed_task.state = batch_models.TaskState.completed
+        failed_task.execution_info.result = 
batch_models.TaskExecutionResult.failure
+
+        event = self.TRIGGER._build_trigger_event([failed_task])
+
+        assert event is not None
+
+        assert event.payload == {
+            "status": "error",
+            "message": f"Azure Batch job {JOB_ID} failed.",
+            "job_id": JOB_ID,
+            "failed_tasks": ["task1"],
+        }
+
+    def test_build_trigger_event_mixed_states(self):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        running_task = mock.MagicMock()
+        running_task.id = "task2"
+        running_task.state = batch_models.TaskState.running
+
+        event = self.TRIGGER._build_trigger_event([completed_task, 
running_task])
+
+        assert event is None
+
+    def test_build_trigger_event_empty_tasks(self):
+        event = self.TRIGGER._build_trigger_event([])
+
+        assert event is not None
+
+        assert event.payload == {
+            "status": "error",
+            "message": f"Azure Batch job {JOB_ID} contains no tasks.",
+            "job_id": JOB_ID,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.sleep")
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_non_terminal_sleeps(
+        self,
+        mock_to_thread,
+        mock_sleep,
+    ):
+        running_task = mock.MagicMock()
+        running_task.id = "task1"
+        running_task.state = batch_models.TaskState.running
+
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        mock_to_thread.side_effect = [
+            [running_task],
+            [completed_task],
+        ]
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "success",
+                    "message": f"Azure Batch job {JOB_ID} completed 
successfully.",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+        mock_sleep.assert_awaited_once_with(POKE_INTERVAL)
+
+    def test_build_trigger_event_non_terminal(self):
+        running_task = mock.MagicMock()
+        running_task.id = "task1"
+        running_task.state = batch_models.TaskState.running
+
+        event = self.TRIGGER._build_trigger_event([running_task])
+
+        assert event is None
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_success(self, mock_to_thread):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        mock_to_thread.return_value = [completed_task]
+
+        generator = self.TRIGGER.run()
+        actual = await generator.asend(None)
+
+        assert actual == TriggerEvent(
+            {
+                "status": "success",
+                "message": f"Azure Batch job {JOB_ID} completed successfully.",
+                "job_id": JOB_ID,
+            }
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_failure(self, mock_to_thread):
+        failed_task = mock.MagicMock()
+        failed_task.id = "task1"
+        failed_task.state = batch_models.TaskState.completed
+        failed_task.execution_info.result = 
batch_models.TaskExecutionResult.failure
+
+        mock_to_thread.return_value = [failed_task]
+
+        generator = self.TRIGGER.run()
+        actual = await generator.asend(None)
+
+        assert actual == TriggerEvent(
+            {
+                "status": "error",
+                "message": f"Azure Batch job {JOB_ID} failed.",
+                "job_id": JOB_ID,
+                "failed_tasks": ["task1"],
+            }
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_exception(self, mock_to_thread):
+        mock_to_thread.side_effect = Exception("API failure")
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "error",
+                    "message": "API failure",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_run_empty_tasks(self, mock_to_thread):
+        mock_to_thread.return_value = []
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "error",
+                    "message": f"Azure Batch job {JOB_ID} contains no tasks.",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.time")
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_timeout_job_already_succeeded(
+        self,
+        mock_to_thread,
+        mock_time,
+    ):
+        completed_task = mock.MagicMock()
+        completed_task.id = "task1"
+        completed_task.state = batch_models.TaskState.completed
+        completed_task.execution_info.result = 
batch_models.TaskExecutionResult.success
+
+        mock_to_thread.return_value = [completed_task]
+
+        mock_time.time.return_value = BATCH_END_TIME + 60
+
+        events = [event async for event in self.TRIGGER.run()]
+
+        assert events == [
+            TriggerEvent(
+                {
+                    "status": "success",
+                    "message": f"Azure Batch job {JOB_ID} completed 
successfully.",
+                    "job_id": JOB_ID,
+                }
+            )
+        ]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.triggers.batch.time")
+    @mock.patch(f"{MODULE}.triggers.batch.asyncio.to_thread")
+    async def test_trigger_timeout(self, mock_to_thread, mock_time):
+        running_task = mock.MagicMock()
+        running_task.id = "task1"
+        running_task.state = batch_models.TaskState.running
+
+        mock_to_thread.return_value = [running_task]
+
+        mock_time.time.return_value = BATCH_END_TIME + 60
+

Review Comment:
   Same as above: this timeout test patches `time.time`, but with a monotonic 
deadline the trigger will call `time.monotonic()`. Updating the patched 
attribute keeps the test aligned with the trigger implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to