This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 414bb20 Serve logs with Scheduler when using Local or Sequential
Executor (#15557)
414bb20 is described below
commit 414bb20fad6c6a50c5a209f6d81f5ca3d679b083
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Apr 29 16:06:06 2021 +0100
Serve logs with Scheduler when using Local or Sequential Executor (#15557)
Currently, the `serve_logs` endpoint only exists on Celery workers. This
means if someone launches Airflow with the `LocalExecutor` and wants to
grab the logs from the scheduler, there is no way to move that to the
webserver if it is on a different pod/machine.
This commit makes the scheduler automatically serves logs when using
`LocalExecutor` or `SequentialExecutor`. However, it means for
Airflow <= 2.0.2, the Helm Chart won't serve logs.
closes https://github.com/apache/airflow/pull/15070
closes https://github.com/apache/airflow/issues/13331
closes https://github.com/apache/airflow/issues/15071
closes https://github.com/apache/airflow/issues/14222
---
airflow/cli/cli_parser.py | 1 +
airflow/cli/commands/scheduler_command.py | 22 +++++++
.../templates/scheduler/scheduler-deployment.yaml | 29 ++-------
tests/cli/commands/test_scheduler_command.py | 72 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 23 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index f3f0344..e1a5826 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1517,6 +1517,7 @@ airflow_commands: List[CLICommand] = [
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
+ ARG_SKIP_SERVE_LOGS,
),
epilog=(
'Signals:\n'
diff --git a/airflow/cli/commands/scheduler_command.py
b/airflow/cli/commands/scheduler_command.py
index b66dafc..368db6f 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -17,6 +17,8 @@
"""Scheduler command"""
import signal
+from multiprocessing import Process
+from typing import Optional
import daemon
from daemon.pidfile import TimeoutPIDLockFile
@@ -30,6 +32,8 @@ from airflow.utils.cli import process_subdir,
setup_locations, setup_logging, si
@cli_utils.action_logging
def scheduler(args):
"""Starts Airflow Scheduler"""
+ skip_serve_logs = args.skip_serve_logs
+
print(settings.HEADER)
job = SchedulerJob(
subdir=process_subdir(args.subdir),
@@ -50,9 +54,27 @@ def scheduler(args):
stderr=stderr_handle,
)
with ctx:
+ sub_proc = _serve_logs(skip_serve_logs)
job.run()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
+ sub_proc = _serve_logs(skip_serve_logs)
job.run()
+
+ if sub_proc:
+ sub_proc.terminate()
+
+
+def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
+ """Starts serve_logs sub-process"""
+ from airflow.configuration import conf
+ from airflow.utils.serve_logs import serve_logs
+
+ if conf.get("core", "executor") in ["LocalExecutor", "SequentialExecutor"]:
+ if skip_serve_logs is False:
+ sub_proc = Process(target=serve_logs)
+ sub_proc.start()
+ return sub_proc
+ return None
diff --git a/chart/templates/scheduler/scheduler-deployment.yaml
b/chart/templates/scheduler/scheduler-deployment.yaml
index 7e90442..9e452b3 100644
--- a/chart/templates/scheduler/scheduler-deployment.yaml
+++ b/chart/templates/scheduler/scheduler-deployment.yaml
@@ -141,6 +141,12 @@ spec:
SchedulerJob.latest_heartbeat.desc()).limit(1).first()
sys.exit(0 if job.is_alive() else 1)
+ {{- if and $local (not $elasticsearch) }}
+ # Serve logs if we're in local mode and we don't have elasticsearch
enabled.
+ ports:
+ - name: worker-logs
+ containerPort: {{ .Values.ports.workerLogs }}
+ {{- end }}
resources:
{{ toYaml .Values.scheduler.resources | indent 12 }}
volumeMounts:
@@ -178,29 +184,6 @@ spec:
volumeMounts:
- name: logs
mountPath: {{ template "airflow_logs" . }}
-{{- if and $local (not $elasticsearch) }}
- # Start the sidecar log server if we're in local mode and
- # we don't have elasticsearch enabled.
- - name: scheduler-logs
- image: {{ template "airflow_image" . }}
- imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
- args: ["serve_logs"]
- ports:
- - name: worker-logs
- containerPort: {{ .Values.ports.workerLogs }}
- volumeMounts:
- - name: logs
- mountPath: {{ template "airflow_logs" . }}
- - name: config
- mountPath: {{ template "airflow_config_path" . }}
- subPath: airflow.cfg
- readOnly: true
- envFrom:
- {{- include "custom_airflow_environment_from" . | default "\n []" |
indent 10 }}
- env:
- {{- include "custom_airflow_environment" . | indent 10 }}
- {{- include "standard_airflow_environment" . | indent 10 }}
-{{- end }}
{{- if .Values.scheduler.extraContainers }}
{{- toYaml .Values.scheduler.extraContainers | nindent 8 }}
{{- end }}
diff --git a/tests/cli/commands/test_scheduler_command.py
b/tests/cli/commands/test_scheduler_command.py
new file mode 100644
index 0000000..59bde70
--- /dev/null
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from parameterized import parameterized
+
+from airflow.cli import cli_parser
+from airflow.cli.commands import scheduler_command
+from airflow.utils.serve_logs import serve_logs
+from tests.test_utils.config import conf_vars
+
+
+class TestSchedulerCommand(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.parser = cli_parser.get_parser()
+
+ @parameterized.expand(
+ [
+ ("CeleryExecutor", False),
+ ("LocalExecutor", True),
+ ("SequentialExecutor", True),
+ ("KubernetesExecutor", False),
+ ]
+ )
+ @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+ @mock.patch("airflow.cli.commands.scheduler_command.Process")
+ def test_serve_logs_on_scheduler(
+ self,
+ executor,
+ expect_serve_logs,
+ mock_process,
+ mock_scheduler_job,
+ ):
+ args = self.parser.parse_args(['scheduler'])
+
+ with conf_vars({("core", "executor"): executor}):
+ scheduler_command.scheduler(args)
+ if expect_serve_logs:
+ mock_process.assert_called_once_with(target=serve_logs)
+ else:
+ mock_process.assert_not_called()
+
+ @parameterized.expand(
+ [
+ ("LocalExecutor",),
+ ("SequentialExecutor",),
+ ]
+ )
+ @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+ @mock.patch("airflow.cli.commands.scheduler_command.Process")
+ def test_skip_serve_logs(self, executor, mock_process, mock_scheduler_job):
+ args = self.parser.parse_args(['scheduler', '--skip-serve-logs'])
+ with conf_vars({("core", "executor"): executor}):
+ scheduler_command.scheduler(args)
+ mock_process.assert_not_called()