This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d2c9e8cfc0 Export Azure Container Instance log messages to XCOM
(#41142)
d2c9e8cfc0 is described below
commit d2c9e8cfc00f30f4b3102197c2145c6e44b3d837
Author: perry2of5 <[email protected]>
AuthorDate: Wed Aug 14 03:46:07 2024 -0700
Export Azure Container Instance log messages to XCOM (#41142)
* Add ability to export log messages into XCOM
The bash and docker operators, among others, can export logs into XCOM
to provide some feedback about the outcome of the command in the
results. The AzureContainerInstancesOperator just detects if the
container exits cleanly. This commit adds the ability to put either
the last log message or all log messages into XCOM under the key
'logs' so it can be used in future operators to decide if it failed or
succeded.
* Fix issues found by static checks
* Update format to satisfy ruff-format
---
.../azure/operators/container_instances.py | 18 +++++
.../azure/operators/test_container_instances.py | 93 ++++++++++++++++++++++
2 files changed, 111 insertions(+)
diff --git a/airflow/providers/microsoft/azure/operators/container_instances.py
b/airflow/providers/microsoft/azure/operators/container_instances.py
index 605c0e1608..b47dec1c70 100644
--- a/airflow/providers/microsoft/azure/operators/container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -86,6 +86,12 @@ class AzureContainerInstancesOperator(BaseOperator):
:param container_timeout: max time allowed for the execution of
the container instance.
:param tags: azure tags as dict of str:str
+ :param xcom_all: Control if logs are pushed to XCOM similarly to how
DockerOperator does.
+ Possible values include: 'None', 'True', 'False'. Defaults to 'None',
meaning no logs
+ are pushed to XCOM which is the historical behaviour. 'True' means
push all logs to XCOM
+ which may run the risk of hitting XCOM size limits. 'False' means push
only the last line
+ of the logs to XCOM. However, the logs are pushed into XCOM under
"logs", not return_value
+ to avoid breaking the existing behaviour.
:param os_type: The operating system type required by the containers
in the container group. Possible values include: 'Windows', 'Linux'
:param restart_policy: Restart policy for all containers within the
container group.
@@ -158,6 +164,7 @@ class AzureContainerInstancesOperator(BaseOperator):
remove_on_error: bool = True,
fail_if_exists: bool = True,
tags: dict[str, str] | None = None,
+ xcom_all: bool | None = None,
os_type: str = "Linux",
restart_policy: str = "Never",
ip_address: IpAddress | None = None,
@@ -187,6 +194,7 @@ class AzureContainerInstancesOperator(BaseOperator):
self.fail_if_exists = fail_if_exists
self._ci_hook: Any = None
self.tags = tags
+ self.xcom_all = xcom_all
self.os_type = os_type
if self.os_type not in ["Linux", "Windows"]:
raise AirflowException(
@@ -296,6 +304,16 @@ class AzureContainerInstancesOperator(BaseOperator):
self.log.info("Container group started %s/%s",
self.resource_group, self.name)
exit_code = self._monitor_logging(self.resource_group, self.name)
+ if self.xcom_all is not None:
+ logs = self._ci_hook.get_logs(self.resource_group, self.name)
+ if logs is None:
+ context["ti"].xcom_push(key="logs", value=[])
+ else:
+ if self.xcom_all:
+ context["ti"].xcom_push(key="logs", value=logs)
+ else:
+ # slice off the last entry in the list logs and return
it as a list
+ context["ti"].xcom_push(key="logs", value=logs[-1:])
self.log.info("Container had exit code: %s", exit_code)
if exit_code != 0:
diff --git
a/tests/providers/microsoft/azure/operators/test_container_instances.py
b/tests/providers/microsoft/azure/operators/test_container_instances.py
index 41d884e863..3c1fdbffd4 100644
--- a/tests/providers/microsoft/azure/operators/test_container_instances.py
+++ b/tests/providers/microsoft/azure/operators/test_container_instances.py
@@ -18,6 +18,8 @@
from __future__ import annotations
from collections import namedtuple
+from collections.abc import MutableMapping
+from typing import Any
from unittest import mock
from unittest.mock import MagicMock
@@ -32,6 +34,7 @@ from azure.mgmt.containerinstance.models import (
from airflow.exceptions import AirflowException
from airflow.providers.microsoft.azure.operators.container_instances import
AzureContainerInstancesOperator
+from airflow.utils.context import Context
def make_mock_cg(container_state, events=None):
@@ -248,6 +251,85 @@ class TestACIOperator:
assert aci_mock.return_value.delete.call_count == 1
+
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
+ def test_execute_with_messages_all_logs_in_xcom_logs(self, aci_mock):
+ event1 = Event()
+ event1.message = "test"
+ event2 = Event()
+ event2.message = "messages"
+ events = [event1, event2]
+ expected_cg1 = make_mock_container(
+ state="Succeeded", exit_code=0, detail_status="test", events=events
+ )
+ expected_cg2 = make_mock_container(state="Running", exit_code=0,
detail_status="test", events=events)
+ expected_cg3 = make_mock_container(
+ state="Terminated", exit_code=0, detail_status="test",
events=events
+ )
+
+ aci_mock.return_value.get_state.side_effect = [expected_cg1,
expected_cg2, expected_cg3]
+ aci_mock.return_value.get_logs.return_value = ["test", "logs"]
+ aci_mock.return_value.exists.return_value = False
+
+ aci = AzureContainerInstancesOperator(
+ ci_conn_id=None,
+ registry_conn_id=None,
+ resource_group="resource-group",
+ name="container-name",
+ image="container-image",
+ region="region",
+ task_id="task",
+ xcom_all=True,
+ )
+ context = Context(ti=XcomMock())
+ aci.execute(context)
+
+ assert aci_mock.return_value.create_or_update.call_count == 1
+ assert aci_mock.return_value.get_state.call_count == 3
+ assert aci_mock.return_value.get_logs.call_count == 4
+
+ assert aci_mock.return_value.delete.call_count == 1
+ assert context["ti"].xcom_pull(key="logs") ==
aci_mock.return_value.get_logs.return_value
+
+
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
+ def test_execute_with_messages_last_log_in_xcom_logs(self, aci_mock):
+ event1 = Event()
+ event1.message = "test"
+ event2 = Event()
+ event2.message = "messages"
+ events = [event1, event2]
+ expected_cg1 = make_mock_container(
+ state="Succeeded", exit_code=0, detail_status="test", events=events
+ )
+ expected_cg2 = make_mock_container(state="Running", exit_code=0,
detail_status="test", events=events)
+ expected_cg3 = make_mock_container(
+ state="Terminated", exit_code=0, detail_status="test",
events=events
+ )
+
+ aci_mock.return_value.get_state.side_effect = [expected_cg1,
expected_cg2, expected_cg3]
+ aci_mock.return_value.get_logs.return_value = ["test", "logs"]
+ aci_mock.return_value.exists.return_value = False
+
+ aci = AzureContainerInstancesOperator(
+ ci_conn_id=None,
+ registry_conn_id=None,
+ resource_group="resource-group",
+ name="container-name",
+ image="container-image",
+ region="region",
+ task_id="task",
+ xcom_all=False,
+ )
+ context = Context(ti=XcomMock())
+ aci.execute(context)
+
+ assert aci_mock.return_value.create_or_update.call_count == 1
+ assert aci_mock.return_value.get_state.call_count == 3
+ assert aci_mock.return_value.get_logs.call_count == 4
+
+ assert aci_mock.return_value.delete.call_count == 1
+ assert context["ti"].xcom_pull(key="logs") ==
aci_mock.return_value.get_logs.return_value[-1:]
+ assert context["ti"].xcom_pull(key="logs") == ["logs"]
+
def test_name_checker(self):
valid_names = ["test-dash", "name-with-length---63" * 3]
@@ -497,3 +579,14 @@ class TestACIOperator:
assert called_cg_container.image == "container-image"
assert aci_mock.return_value.delete.call_count == 1
+
+
+class XcomMock:
+ def __init__(self) -> None:
+ self.values: MutableMapping[str, Any | None] = {}
+
+ def xcom_push(self, key: str, value: Any | None) -> None:
+ self.values[key] = value
+
+ def xcom_pull(self, key: str) -> Any:
+ return self.values[key]