This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 87f761522e Introduce a generic export for containerized executor 
logging (#34903)
87f761522e is described below

commit 87f761522e2e80db76c873c8671cd74f8b85e5e8
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue Oct 17 11:28:29 2023 -0700

    Introduce a generic export for containerized executor logging (#34903)
    
    A new logging environment variable to enable task logs to work correctly in 
containerized
    executors. K8s has an approach for this, which is followed closely when 
making it generic
    in this change. However, I did not convert K8s executor te this new 
mechanism to keep the
    change set minimal and reduce blast radius. A follow-up change will be
    done for this.
---
 airflow/cli/commands/task_command.py    |  4 ++--
 airflow/settings.py                     |  4 ++++
 airflow/utils/log/logging_mixin.py      |  4 ++--
 tests/cli/commands/test_task_command.py | 13 +++++++++----
 4 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index e9edd0a973..5011c9b189 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -46,7 +46,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.operator import needs_expansion
 from airflow.models.param import ParamsDict
 from airflow.models.taskinstance import TaskReturnCode
-from airflow.settings import IS_K8S_EXECUTOR_POD
+from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
 from airflow.typing_compat import Literal
@@ -326,7 +326,7 @@ def _move_task_handlers_to_root(ti: TaskInstance) -> 
Generator[None, None, None]
     console_handler = next((h for h in root_logger.handlers if h.name == 
"console"), None)
     with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as 
task_helper:
         task_helper.move(root_logger)
-        if IS_K8S_EXECUTOR_POD:
+        if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
             if console_handler and console_handler not in root_logger.handlers:
                 root_logger.addHandler(console_handler)
         yield
diff --git a/airflow/settings.py b/airflow/settings.py
index a278316d5a..23bf1cbfcb 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -578,6 +578,10 @@ IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", 
"EXECUTOR") in {
     executor_constants.CELERY_KUBERNETES_EXECUTOR,
     executor_constants.LOCAL_KUBERNETES_EXECUTOR,
 }
+
+# Executors can set this to true to configure logging correctly for
+# containerized executors.
+IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", 
""))
 IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
 """Will be True if running in kubernetes executor pod."""
 
diff --git a/airflow/utils/log/logging_mixin.py 
b/airflow/utils/log/logging_mixin.py
index 5ef9885bdd..834237182c 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -209,9 +209,9 @@ class RedirectStdHandler(StreamHandler):
     @property
     def stream(self):
         """Returns current stream."""
-        from airflow.settings import IS_K8S_EXECUTOR_POD
+        from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
 
-        if IS_K8S_EXECUTOR_POD:
+        if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
             return self._orig_stream
         if self._use_stderr:
             return sys.stderr
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index 6baea3fd88..d15200b5af 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -733,8 +733,10 @@ class TestLogsfromTaskRunCommand:
                 external_executor_id="ABCD12345",
             )
 
-    @pytest.mark.parametrize("is_k8s", ["true", ""])
-    def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
+    @pytest.mark.parametrize(
+        "is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("", 
"true"), ("", "")]
+    )
+    def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, 
is_container_exec):
         """
         When running task --local as k8s executor pod, all logging should make 
it to stdout.
         Otherwise, all logging after "running TI" is redirected to logs (and 
the actual log
@@ -748,7 +750,10 @@ class TestLogsfromTaskRunCommand:
         import subprocess
 
         with mock.patch.dict(
-            "os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s, 
PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT)
+            "os.environ",
+            AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
+            AIRFLOW_IS_EXECUTOR_CONTAINER=is_container_exec,
+            PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT),
         ):
             with subprocess.Popen(
                 args=[sys.executable, "-m", "airflow", *self.task_args, "-S", 
self.dag_path],
@@ -764,7 +769,7 @@ class TestLogsfromTaskRunCommand:
                 found_start = True
             if found_start:
                 lines.append(line)
-        if is_k8s:
+        if any((is_k8s, is_container_exec)):
             # 10 is arbitrary, but, with enough padding to hopefully not be 
flakey
             assert len(lines) > 10
             self.assert_log_line("Starting attempt 1 of 1", lines)

Reply via email to