This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 87f761522e Introduce a generic export for containerized executor
logging (#34903)
87f761522e is described below
commit 87f761522e2e80db76c873c8671cd74f8b85e5e8
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue Oct 17 11:28:29 2023 -0700
Introduce a generic export for containerized executor logging (#34903)
A new logging environment variable to enable task logs to work correctly in
containerized
executors. K8s has an approach for this, which is followed closely when
making it generic
in this change. However, I did not convert K8s executor te this new
mechanism to keep the
change set minimal and reduce blast radius. A follow-up change will be
done for this.
---
airflow/cli/commands/task_command.py | 4 ++--
airflow/settings.py | 4 ++++
airflow/utils/log/logging_mixin.py | 4 ++--
tests/cli/commands/test_task_command.py | 13 +++++++++----
4 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index e9edd0a973..5011c9b189 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -46,7 +46,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.operator import needs_expansion
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskReturnCode
-from airflow.settings import IS_K8S_EXECUTOR_POD
+from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.typing_compat import Literal
@@ -326,7 +326,7 @@ def _move_task_handlers_to_root(ti: TaskInstance) ->
Generator[None, None, None]
console_handler = next((h for h in root_logger.handlers if h.name ==
"console"), None)
with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as
task_helper:
task_helper.move(root_logger)
- if IS_K8S_EXECUTOR_POD:
+ if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
if console_handler and console_handler not in root_logger.handlers:
root_logger.addHandler(console_handler)
yield
diff --git a/airflow/settings.py b/airflow/settings.py
index a278316d5a..23bf1cbfcb 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -578,6 +578,10 @@ IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core",
"EXECUTOR") in {
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
+
+# Executors can set this to true to configure logging correctly for
+# containerized executors.
+IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER",
""))
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""
diff --git a/airflow/utils/log/logging_mixin.py
b/airflow/utils/log/logging_mixin.py
index 5ef9885bdd..834237182c 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -209,9 +209,9 @@ class RedirectStdHandler(StreamHandler):
@property
def stream(self):
"""Returns current stream."""
- from airflow.settings import IS_K8S_EXECUTOR_POD
+ from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
- if IS_K8S_EXECUTOR_POD:
+ if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
return self._orig_stream
if self._use_stderr:
return sys.stderr
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index 6baea3fd88..d15200b5af 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -733,8 +733,10 @@ class TestLogsfromTaskRunCommand:
external_executor_id="ABCD12345",
)
- @pytest.mark.parametrize("is_k8s", ["true", ""])
- def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
+ @pytest.mark.parametrize(
+ "is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("",
"true"), ("", "")]
+ )
+ def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s,
is_container_exec):
"""
When running task --local as k8s executor pod, all logging should make
it to stdout.
Otherwise, all logging after "running TI" is redirected to logs (and
the actual log
@@ -748,7 +750,10 @@ class TestLogsfromTaskRunCommand:
import subprocess
with mock.patch.dict(
- "os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT)
+ "os.environ",
+ AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
+ AIRFLOW_IS_EXECUTOR_CONTAINER=is_container_exec,
+ PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT),
):
with subprocess.Popen(
args=[sys.executable, "-m", "airflow", *self.task_args, "-S",
self.dag_path],
@@ -764,7 +769,7 @@ class TestLogsfromTaskRunCommand:
found_start = True
if found_start:
lines.append(line)
- if is_k8s:
+ if any((is_k8s, is_container_exec)):
# 10 is arbitrary, but, with enough padding to hopefully not be
flakey
assert len(lines) > 10
self.assert_log_line("Starting attempt 1 of 1", lines)