potiuk commented on code in PR #58702: URL: https://github.com/apache/airflow/pull/58702#discussion_r2564009897
########## scripts/in_container/bin/generate_mprocs_config.py: ########## @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Generate mprocs configuration dynamically based on environment variables.""" + +from __future__ import annotations + +import os +import sys +from pathlib import Path + + +def get_env_bool(var_name: str, default: str = "false") -> bool: + """Get environment variable as boolean.""" + return os.environ.get(var_name, default).lower() == "true" + + +def get_env(var_name: str, default: str = "") -> str: + """Get environment variable with default.""" + return os.environ.get(var_name, default) + + +def generate_mprocs_config() -> str: + """Generate mprocs YAML configuration based on environment variables.""" + procs = {} + + # Scheduler + scheduler_cmd = "airflow scheduler" + if get_env_bool("BREEZE_DEBUG_SCHEDULER"): + port = get_env("BREEZE_DEBUG_SCHEDULER_PORT", "5678") + scheduler_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow scheduler" + + procs["scheduler"] = { + "shell": scheduler_cmd, + "restart": "always", + "scrollback": 100000, + } + + # API Server or Webserver (depending on Airflow version) + use_airflow_version = get_env("USE_AIRFLOW_VERSION", "") + if not use_airflow_version.startswith("2."): + # API Server (Airflow 3.x+) + if get_env_bool("BREEZE_DEBUG_APISERVER"): + port = get_env("BREEZE_DEBUG_APISERVER_PORT", "5679") + api_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow api-server -d" + else: + dev_mode = get_env_bool("DEV_MODE") + api_cmd = "airflow api-server -d" if dev_mode else "airflow api-server" + + procs["api_server"] = { + "shell": api_cmd, + "restart": "always", + "scrollback": 100000, + } + else: + # Webserver (Airflow 2.x) + if get_env_bool("BREEZE_DEBUG_WEBSERVER"): + port = get_env("BREEZE_DEBUG_WEBSERVER_PORT", "5680") + web_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow webserver" + else: + dev_mode = get_env_bool("DEV_MODE") + web_cmd = "airflow webserver -d" if dev_mode else "airflow webserver" + + procs["webserver"] = { + "shell": web_cmd, + "restart": "always", + "scrollback": 100000, + } + + # Triggerer + triggerer_cmd = "airflow triggerer" + if get_env_bool("BREEZE_DEBUG_TRIGGERER"): + port = get_env("BREEZE_DEBUG_TRIGGERER_PORT", "5681") + triggerer_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow triggerer" + + procs["triggerer"] = { + "shell": triggerer_cmd, + "restart": "always", + "scrollback": 100000, + } + + # Celery Worker (conditional) + if get_env_bool("INTEGRATION_CELERY"): + if get_env_bool("BREEZE_DEBUG_CELERY_WORKER"): + port = get_env("BREEZE_DEBUG_CELERY_WORKER_PORT", "5682") + celery_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow celery worker" + else: + celery_cmd = "airflow celery worker" + + procs["celery_worker"] = { + "shell": celery_cmd, + "restart": "always", + "scrollback": 100000, + } + + # Flower (conditional) + if get_env_bool("INTEGRATION_CELERY") and get_env_bool("CELERY_FLOWER"): + if get_env_bool("BREEZE_DEBUG_FLOWER"): + port = get_env("BREEZE_DEBUG_FLOWER_PORT", "5683") + flower_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow celery flower" + else: + flower_cmd = "airflow celery flower" + + procs["flower"] = { + "shell": flower_cmd, + "restart": "always", + "scrollback": 100000, + } + + # Edge Worker (conditional) + executor = get_env("AIRFLOW__CORE__EXECUTOR", "") + if executor == "airflow.providers.edge3.executors.edge_executor.EdgeExecutor": + if get_env_bool("BREEZE_DEBUG_EDGE"): + port = get_env("BREEZE_DEBUG_EDGE_PORT", "5684") + edge_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow edge worker --edge-hostname breeze --queues default" + else: + # Build command with environment cleanup + edge_cmd_parts = [ + "unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN || true", + "unset AIRFLOW__CELERY__RESULT_BACKEND || true", + "unset POSTGRES_HOST_PORT || true", + "unset BACKEND || true", + "unset POSTGRES_VERSION || true", + "export AIRFLOW__LOGGING__BASE_LOG_FOLDER=edge_logs", + "airflow edge worker --edge-hostname breeze --queues default", + ] + edge_cmd = " && ".join(edge_cmd_parts) + + procs["edge_worker"] = { + "shell": edge_cmd, + "restart": "always", + "scrollback": 100000, + } + + # DAG Processor (conditional) + if get_env_bool("STANDALONE_DAG_PROCESSOR"): + if get_env_bool("BREEZE_DEBUG_DAG_PROCESSOR"): + port = get_env("BREEZE_DEBUG_DAG_PROCESSOR_PORT", "5685") + dag_proc_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow dag-processor" + else: + dag_proc_cmd = "airflow dag-processor" + + procs["dag_processor"] = { + "shell": dag_proc_cmd, + "restart": "always", + "scrollback": 100000, + } + + # Generate YAML output + yaml_lines = ["procs:"] + for name, config in procs.items(): + yaml_lines.append(f" {name}:") + # Quote the shell command if it contains special characters + shell_cmd = config["shell"] + if any(char in shell_cmd for char in ["&&", "||", ";", "|", ">", "<", "$", "&"]): + # Use double quotes and escape any double quotes in the command + shell_cmd = shell_cmd.replace('"', '\\"') + yaml_lines.append(f' shell: "{shell_cmd}"') + else: + yaml_lines.append(f" shell: {shell_cmd}") + yaml_lines.append(f" restart: {config['restart']}") + yaml_lines.append(f" scrollback: {config['scrollback']}") + yaml_lines.append("") + + return "\n".join(yaml_lines) + + +def main(): + """Main entry point.""" + # Setup similar to run_tmux + stop_airflow_path = Path("/usr/local/bin/stop_airflow") + if not stop_airflow_path.exists(): + try: + stop_airflow_path.symlink_to("/opt/airflow/scripts/in_container/stop_tmux_airflow.sh") Review Comment: Actually we can removeit completely. `stop_airflow` is only needed for `tmux` - with mprocs you just press `q` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
