This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 414bb20  Serve logs with Scheduler when using Local or Sequential 
Executor (#15557)
414bb20 is described below

commit 414bb20fad6c6a50c5a209f6d81f5ca3d679b083
Author: Kaxil Naik <kaxiln...@gmail.com>
AuthorDate: Thu Apr 29 16:06:06 2021 +0100

    Serve logs with Scheduler when using Local or Sequential Executor (#15557)
    
    Currently, the `serve_logs` endpoint only exists on Celery workers. This
    means if someone launches Airflow with the `LocalExecutor` and wants to
    grab the logs from the scheduler, there is no way to move that to the
    webserver if it is on a different pod/machine.
    
    This commit makes the scheduler automatically serves logs when using
    `LocalExecutor` or `SequentialExecutor`. However, it means for
    Airflow <= 2.0.2, the Helm Chart won't serve logs.
    
    closes https://github.com/apache/airflow/pull/15070
    closes https://github.com/apache/airflow/issues/13331
    closes https://github.com/apache/airflow/issues/15071
    closes https://github.com/apache/airflow/issues/14222
---
 airflow/cli/cli_parser.py                          |  1 +
 airflow/cli/commands/scheduler_command.py          | 22 +++++++
 .../templates/scheduler/scheduler-deployment.yaml  | 29 ++-------
 tests/cli/commands/test_scheduler_command.py       | 72 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 23 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index f3f0344..e1a5826 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1517,6 +1517,7 @@ airflow_commands: List[CLICommand] = [
             ARG_STDOUT,
             ARG_STDERR,
             ARG_LOG_FILE,
+            ARG_SKIP_SERVE_LOGS,
         ),
         epilog=(
             'Signals:\n'
diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index b66dafc..368db6f 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -17,6 +17,8 @@
 
 """Scheduler command"""
 import signal
+from multiprocessing import Process
+from typing import Optional
 
 import daemon
 from daemon.pidfile import TimeoutPIDLockFile
@@ -30,6 +32,8 @@ from airflow.utils.cli import process_subdir, 
setup_locations, setup_logging, si
 @cli_utils.action_logging
 def scheduler(args):
     """Starts Airflow Scheduler"""
+    skip_serve_logs = args.skip_serve_logs
+
     print(settings.HEADER)
     job = SchedulerJob(
         subdir=process_subdir(args.subdir),
@@ -50,9 +54,27 @@ def scheduler(args):
                 stderr=stderr_handle,
             )
             with ctx:
+                sub_proc = _serve_logs(skip_serve_logs)
                 job.run()
     else:
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)
+        sub_proc = _serve_logs(skip_serve_logs)
         job.run()
+
+    if sub_proc:
+        sub_proc.terminate()
+
+
+def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
+    """Starts serve_logs sub-process"""
+    from airflow.configuration import conf
+    from airflow.utils.serve_logs import serve_logs
+
+    if conf.get("core", "executor") in ["LocalExecutor", "SequentialExecutor"]:
+        if skip_serve_logs is False:
+            sub_proc = Process(target=serve_logs)
+            sub_proc.start()
+            return sub_proc
+    return None
diff --git a/chart/templates/scheduler/scheduler-deployment.yaml 
b/chart/templates/scheduler/scheduler-deployment.yaml
index 7e90442..9e452b3 100644
--- a/chart/templates/scheduler/scheduler-deployment.yaml
+++ b/chart/templates/scheduler/scheduler-deployment.yaml
@@ -141,6 +141,12 @@ spec:
                         SchedulerJob.latest_heartbeat.desc()).limit(1).first()
 
                 sys.exit(0 if job.is_alive() else 1)
+          {{- if and $local (not $elasticsearch) }}
+          # Serve logs if we're in local mode and we don't have elasticsearch 
enabled.
+          ports:
+            - name: worker-logs
+              containerPort: {{ .Values.ports.workerLogs }}
+          {{- end }}
           resources:
 {{ toYaml .Values.scheduler.resources | indent 12 }}
           volumeMounts:
@@ -178,29 +184,6 @@ spec:
           volumeMounts:
             - name: logs
               mountPath: {{ template "airflow_logs" . }}
-{{- if and $local (not $elasticsearch) }}
-        # Start the sidecar log server if we're in local mode and
-        # we don't have elasticsearch enabled.
-        - name: scheduler-logs
-          image: {{ template "airflow_image" . }}
-          imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
-          args: ["serve_logs"]
-          ports:
-            - name: worker-logs
-              containerPort: {{ .Values.ports.workerLogs }}
-          volumeMounts:
-            - name: logs
-              mountPath: {{ template "airflow_logs" . }}
-            - name: config
-              mountPath: {{ template "airflow_config_path" . }}
-              subPath: airflow.cfg
-              readOnly: true
-          envFrom:
-          {{- include "custom_airflow_environment_from" . | default "\n  []" | 
indent 10 }}
-          env:
-          {{- include "custom_airflow_environment" . | indent 10 }}
-          {{- include "standard_airflow_environment" . | indent 10 }}
-{{- end }}
 {{- if .Values.scheduler.extraContainers }}
 {{- toYaml .Values.scheduler.extraContainers | nindent 8 }}
 {{- end }}
diff --git a/tests/cli/commands/test_scheduler_command.py 
b/tests/cli/commands/test_scheduler_command.py
new file mode 100644
index 0000000..59bde70
--- /dev/null
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from parameterized import parameterized
+
+from airflow.cli import cli_parser
+from airflow.cli.commands import scheduler_command
+from airflow.utils.serve_logs import serve_logs
+from tests.test_utils.config import conf_vars
+
+
+class TestSchedulerCommand(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.parser = cli_parser.get_parser()
+
+    @parameterized.expand(
+        [
+            ("CeleryExecutor", False),
+            ("LocalExecutor", True),
+            ("SequentialExecutor", True),
+            ("KubernetesExecutor", False),
+        ]
+    )
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    def test_serve_logs_on_scheduler(
+        self,
+        executor,
+        expect_serve_logs,
+        mock_process,
+        mock_scheduler_job,
+    ):
+        args = self.parser.parse_args(['scheduler'])
+
+        with conf_vars({("core", "executor"): executor}):
+            scheduler_command.scheduler(args)
+            if expect_serve_logs:
+                mock_process.assert_called_once_with(target=serve_logs)
+            else:
+                mock_process.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ("LocalExecutor",),
+            ("SequentialExecutor",),
+        ]
+    )
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    def test_skip_serve_logs(self, executor, mock_process, mock_scheduler_job):
+        args = self.parser.parse_args(['scheduler', '--skip-serve-logs'])
+        with conf_vars({("core", "executor"): executor}):
+            scheduler_command.scheduler(args)
+            mock_process.assert_not_called()

Reply via email to