This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d2c9e8cfc0 Export Azure Container Instance log messages to XCOM 
(#41142)
d2c9e8cfc0 is described below

commit d2c9e8cfc00f30f4b3102197c2145c6e44b3d837
Author: perry2of5 <[email protected]>
AuthorDate: Wed Aug 14 03:46:07 2024 -0700

    Export Azure Container Instance log messages to XCOM (#41142)
    
    * Add ability to export log messages into XCOM
    
    The bash and docker operators, among others, can export logs into XCOM
    to provide some feedback about the outcome of the command in the
    results. The AzureContainerInstancesOperator just detects if the
    container exits cleanly. This commit adds the ability to put either
    the last log message or all log messages into XCOM under the key
    'logs' so it can be used in future operators to decide if it failed or
    succeded.
    
    * Fix issues found by static checks
    
    * Update format to satisfy ruff-format
---
 .../azure/operators/container_instances.py         | 18 +++++
 .../azure/operators/test_container_instances.py    | 93 ++++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git a/airflow/providers/microsoft/azure/operators/container_instances.py 
b/airflow/providers/microsoft/azure/operators/container_instances.py
index 605c0e1608..b47dec1c70 100644
--- a/airflow/providers/microsoft/azure/operators/container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -86,6 +86,12 @@ class AzureContainerInstancesOperator(BaseOperator):
     :param container_timeout: max time allowed for the execution of
         the container instance.
     :param tags: azure tags as dict of str:str
+    :param xcom_all: Control if logs are pushed to XCOM similarly to how 
DockerOperator does.
+        Possible values include: 'None', 'True', 'False'. Defaults to 'None', 
meaning no logs
+        are pushed to XCOM which is the historical behaviour. 'True' means 
push all logs to XCOM
+        which may run the risk of hitting XCOM size limits. 'False' means push 
only the last line
+        of the logs to XCOM. However, the logs are pushed into XCOM under 
"logs", not return_value
+        to avoid breaking the existing behaviour.
     :param os_type: The operating system type required by the containers
         in the container group. Possible values include: 'Windows', 'Linux'
     :param restart_policy: Restart policy for all containers within the 
container group.
@@ -158,6 +164,7 @@ class AzureContainerInstancesOperator(BaseOperator):
         remove_on_error: bool = True,
         fail_if_exists: bool = True,
         tags: dict[str, str] | None = None,
+        xcom_all: bool | None = None,
         os_type: str = "Linux",
         restart_policy: str = "Never",
         ip_address: IpAddress | None = None,
@@ -187,6 +194,7 @@ class AzureContainerInstancesOperator(BaseOperator):
         self.fail_if_exists = fail_if_exists
         self._ci_hook: Any = None
         self.tags = tags
+        self.xcom_all = xcom_all
         self.os_type = os_type
         if self.os_type not in ["Linux", "Windows"]:
             raise AirflowException(
@@ -296,6 +304,16 @@ class AzureContainerInstancesOperator(BaseOperator):
             self.log.info("Container group started %s/%s", 
self.resource_group, self.name)
 
             exit_code = self._monitor_logging(self.resource_group, self.name)
+            if self.xcom_all is not None:
+                logs = self._ci_hook.get_logs(self.resource_group, self.name)
+                if logs is None:
+                    context["ti"].xcom_push(key="logs", value=[])
+                else:
+                    if self.xcom_all:
+                        context["ti"].xcom_push(key="logs", value=logs)
+                    else:
+                        # slice off the last entry in the list logs and return 
it as a list
+                        context["ti"].xcom_push(key="logs", value=logs[-1:])
 
             self.log.info("Container had exit code: %s", exit_code)
             if exit_code != 0:
diff --git 
a/tests/providers/microsoft/azure/operators/test_container_instances.py 
b/tests/providers/microsoft/azure/operators/test_container_instances.py
index 41d884e863..3c1fdbffd4 100644
--- a/tests/providers/microsoft/azure/operators/test_container_instances.py
+++ b/tests/providers/microsoft/azure/operators/test_container_instances.py
@@ -18,6 +18,8 @@
 from __future__ import annotations
 
 from collections import namedtuple
+from collections.abc import MutableMapping
+from typing import Any
 from unittest import mock
 from unittest.mock import MagicMock
 
@@ -32,6 +34,7 @@ from azure.mgmt.containerinstance.models import (
 
 from airflow.exceptions import AirflowException
 from airflow.providers.microsoft.azure.operators.container_instances import 
AzureContainerInstancesOperator
+from airflow.utils.context import Context
 
 
 def make_mock_cg(container_state, events=None):
@@ -248,6 +251,85 @@ class TestACIOperator:
 
         assert aci_mock.return_value.delete.call_count == 1
 
+    
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
+    def test_execute_with_messages_all_logs_in_xcom_logs(self, aci_mock):
+        event1 = Event()
+        event1.message = "test"
+        event2 = Event()
+        event2.message = "messages"
+        events = [event1, event2]
+        expected_cg1 = make_mock_container(
+            state="Succeeded", exit_code=0, detail_status="test", events=events
+        )
+        expected_cg2 = make_mock_container(state="Running", exit_code=0, 
detail_status="test", events=events)
+        expected_cg3 = make_mock_container(
+            state="Terminated", exit_code=0, detail_status="test", 
events=events
+        )
+
+        aci_mock.return_value.get_state.side_effect = [expected_cg1, 
expected_cg2, expected_cg3]
+        aci_mock.return_value.get_logs.return_value = ["test", "logs"]
+        aci_mock.return_value.exists.return_value = False
+
+        aci = AzureContainerInstancesOperator(
+            ci_conn_id=None,
+            registry_conn_id=None,
+            resource_group="resource-group",
+            name="container-name",
+            image="container-image",
+            region="region",
+            task_id="task",
+            xcom_all=True,
+        )
+        context = Context(ti=XcomMock())
+        aci.execute(context)
+
+        assert aci_mock.return_value.create_or_update.call_count == 1
+        assert aci_mock.return_value.get_state.call_count == 3
+        assert aci_mock.return_value.get_logs.call_count == 4
+
+        assert aci_mock.return_value.delete.call_count == 1
+        assert context["ti"].xcom_pull(key="logs") == 
aci_mock.return_value.get_logs.return_value
+
+    
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
+    def test_execute_with_messages_last_log_in_xcom_logs(self, aci_mock):
+        event1 = Event()
+        event1.message = "test"
+        event2 = Event()
+        event2.message = "messages"
+        events = [event1, event2]
+        expected_cg1 = make_mock_container(
+            state="Succeeded", exit_code=0, detail_status="test", events=events
+        )
+        expected_cg2 = make_mock_container(state="Running", exit_code=0, 
detail_status="test", events=events)
+        expected_cg3 = make_mock_container(
+            state="Terminated", exit_code=0, detail_status="test", 
events=events
+        )
+
+        aci_mock.return_value.get_state.side_effect = [expected_cg1, 
expected_cg2, expected_cg3]
+        aci_mock.return_value.get_logs.return_value = ["test", "logs"]
+        aci_mock.return_value.exists.return_value = False
+
+        aci = AzureContainerInstancesOperator(
+            ci_conn_id=None,
+            registry_conn_id=None,
+            resource_group="resource-group",
+            name="container-name",
+            image="container-image",
+            region="region",
+            task_id="task",
+            xcom_all=False,
+        )
+        context = Context(ti=XcomMock())
+        aci.execute(context)
+
+        assert aci_mock.return_value.create_or_update.call_count == 1
+        assert aci_mock.return_value.get_state.call_count == 3
+        assert aci_mock.return_value.get_logs.call_count == 4
+
+        assert aci_mock.return_value.delete.call_count == 1
+        assert context["ti"].xcom_pull(key="logs") == 
aci_mock.return_value.get_logs.return_value[-1:]
+        assert context["ti"].xcom_pull(key="logs") == ["logs"]
+
     def test_name_checker(self):
         valid_names = ["test-dash", "name-with-length---63" * 3]
 
@@ -497,3 +579,14 @@ class TestACIOperator:
         assert called_cg_container.image == "container-image"
 
         assert aci_mock.return_value.delete.call_count == 1
+
+
+class XcomMock:
+    def __init__(self) -> None:
+        self.values: MutableMapping[str, Any | None] = {}
+
+    def xcom_push(self, key: str, value: Any | None) -> None:
+        self.values[key] = value
+
+    def xcom_pull(self, key: str) -> Any:
+        return self.values[key]

Reply via email to