This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dc08893c90 DockerOperator TaskFlow - correct argyments in python
command (#39620)
dc08893c90 is described below
commit dc08893c906b6b2c57a9ec859d5d6ab329af5b30
Author: Antoine Tavant <[email protected]>
AuthorDate: Mon Jul 8 17:39:57 2024 +0200
DockerOperator TaskFlow - correct argyments in python command (#39620)
---------
Co-authored-by: Antoine TAVANT <[email protected]>
---
airflow/providers/docker/decorators/docker.py | 2 +-
tests/providers/docker/decorators/test_docker.py | 33 ++++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/docker/decorators/docker.py
b/airflow/providers/docker/decorators/docker.py
index 9aafdd1d79..d851c98aca 100644
--- a/airflow/providers/docker/decorators/docker.py
+++ b/airflow/providers/docker/decorators/docker.py
@@ -89,7 +89,7 @@ class _DockerDecoratedOperator(DecoratedOperator,
DockerOperator):
f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT",
"/tmp/script.py",
self.python_command)}
&&"""
f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in",
self.python_command)} &&'
- f"{self.python_command} /tmp/script.py /tmp/script.in
/tmp/script.out'"
+ f"{self.python_command} /tmp/script.py /tmp/script.in
/tmp/script.out none /tmp/script.out'"
)
def execute(self, context: Context):
diff --git a/tests/providers/docker/decorators/test_docker.py
b/tests/providers/docker/decorators/test_docker.py
index e4fbe15fc3..93db9f211b 100644
--- a/tests/providers/docker/decorators/test_docker.py
+++ b/tests/providers/docker/decorators/test_docker.py
@@ -16,6 +16,9 @@
# under the License.
from __future__ import annotations
+import logging
+from io import StringIO as StringBuffer
+
import pytest
from airflow.decorators import setup, task, teardown
@@ -284,3 +287,33 @@ class TestDockerDecorator:
ret = f()
assert ret.operator.docker_url == "unix://var/run/docker.sock"
+
+ def test_failing_task(self, dag_maker):
+ """Test regression #39319
+
+ Check the log content of the DockerOperator when the task fails.
+ """
+
+ @task.docker(image="python:3.9-slim", auto_remove="force")
+ def f():
+ raise ValueError("This task is expected to fail")
+
+ docker_operator_logger_name = "airflow.task.operators"
+
+ docker_operator_logger = logging.getLogger(docker_operator_logger_name)
+ log_capture_string = StringBuffer()
+ ch = logging.StreamHandler(log_capture_string)
+ docker_operator_logger.addHandler(ch)
+ with dag_maker():
+ ret = f()
+
+ dr = dag_maker.create_dagrun()
+ with pytest.raises(AirflowException):
+ ret.operator.run(start_date=dr.execution_date,
end_date=dr.execution_date)
+ ti = dr.get_task_instances()[0]
+ assert ti.state == TaskInstanceState.FAILED
+
+ log_content = str(log_capture_string.getvalue())
+ assert 'with open(sys.argv[4], "w") as file:' not in log_content
+ last_line_of_docker_operator_log = log_content.splitlines()[-1]
+ assert "ValueError: This task is expected to fail" in
last_line_of_docker_operator_log