kaxil commented on code in PR #60297:
URL: https://github.com/apache/airflow/pull/60297#discussion_r2676746500


##########
providers/ssh/src/airflow/providers/ssh/utils/remote_job.py:
##########
@@ -0,0 +1,433 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Utilities for SSH remote job execution."""
+
+from __future__ import annotations
+
+import re
+import secrets
+import string
+from dataclasses import dataclass
+from typing import Literal
+
+
+def _validate_env_var_name(name: str) -> None:
+    """
+    Validate environment variable name for security.
+
+    :param name: Environment variable name
+    :raises ValueError: If name contains dangerous characters
+    """
+    if not name:
+        raise ValueError("Environment variable name cannot be empty")
+
+    if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name):
+        raise ValueError(
+            f"Invalid environment variable name '{name}'. "
+            "Only alphanumeric characters and underscores are allowed, "
+            "and the name must start with a letter or underscore."
+        )
+
+
+def generate_job_id(
+    dag_id: str,
+    task_id: str,
+    run_id: str,
+    try_number: int,
+    suffix_length: int = 8,
+) -> str:
+    """
+    Generate a unique job ID for remote execution.
+
+    Creates a deterministic identifier from the task context with a random 
suffix
+    to ensure uniqueness across retries and potential race conditions.
+
+    :param dag_id: The DAG identifier
+    :param task_id: The task identifier
+    :param run_id: The run identifier
+    :param try_number: The attempt number
+    :param suffix_length: Length of random suffix (default 8)
+    :return: Sanitized job ID string
+    """
+
+    def sanitize(value: str) -> str:
+        return re.sub(r"[^a-zA-Z0-9]", "_", value)[:50]
+
+    sanitized_dag = sanitize(dag_id)
+    sanitized_task = sanitize(task_id)
+    sanitized_run = sanitize(run_id)
+
+    alphabet = string.ascii_lowercase + string.digits
+    suffix = "".join(secrets.choice(alphabet) for _ in range(suffix_length))
+
+    return 
f"af_{sanitized_dag}_{sanitized_task}_{sanitized_run}_try{try_number}_{suffix}"
+
+
+@dataclass
+class RemoteJobPaths:
+    """Paths for remote job artifacts on the target system."""
+
+    job_id: str
+    remote_os: Literal["posix", "windows"]
+    base_dir: str | None = None
+
+    def __post_init__(self):
+        if self.base_dir is None:
+            if self.remote_os == "posix":
+                self.base_dir = "/tmp/airflow-ssh-jobs"
+            else:
+                self.base_dir = "$env:TEMP\\airflow-ssh-jobs"
+
+    @property
+    def sep(self) -> str:
+        """Path separator for the remote OS."""
+        return "\\" if self.remote_os == "windows" else "/"
+
+    @property
+    def job_dir(self) -> str:
+        """Directory containing all job artifacts."""
+        return f"{self.base_dir}{self.sep}{self.job_id}"
+
+    @property
+    def log_file(self) -> str:
+        """Path to stdout/stderr log file."""
+        return f"{self.job_dir}{self.sep}stdout.log"
+
+    @property
+    def exit_code_file(self) -> str:
+        """Path to exit code file (written on completion)."""
+        return f"{self.job_dir}{self.sep}exit_code"
+
+    @property
+    def exit_code_tmp_file(self) -> str:
+        """Temporary exit code file (for atomic write)."""
+        return f"{self.job_dir}{self.sep}exit_code.tmp"
+
+    @property
+    def pid_file(self) -> str:
+        """Path to PID file for the background process."""
+        return f"{self.job_dir}{self.sep}pid"
+
+    @property
+    def status_file(self) -> str:
+        """Path to optional status file for progress updates."""
+        return f"{self.job_dir}{self.sep}status"
+
+
+def build_posix_wrapper_command(
+    command: str,
+    paths: RemoteJobPaths,
+    environment: dict[str, str] | None = None,
+) -> str:
+    """
+    Build a POSIX shell wrapper that runs the command detached via nohup.
+
+    The wrapper:
+    - Creates the job directory
+    - Starts the command in the background with nohup
+    - Redirects stdout/stderr to the log file
+    - Writes the exit code atomically on completion
+    - Writes the PID for potential cancellation
+
+    :param command: The command to execute
+    :param paths: RemoteJobPaths instance with all paths
+    :param environment: Optional environment variables to set
+    :return: Shell command string to submit via SSH
+    """
+    env_exports = ""
+    if environment:
+        for key, value in environment.items():
+            _validate_env_var_name(key)
+            escaped_value = value.replace("'", "'\"'\"'")
+            env_exports += f"export {key}='{escaped_value}'\n"
+
+    escaped_command = command.replace("'", "'\"'\"'")
+
+    wrapper = f"""set -euo pipefail
+job_dir='{paths.job_dir}'
+log_file='{paths.log_file}'
+exit_code_file='{paths.exit_code_file}'
+exit_code_tmp='{paths.exit_code_tmp_file}'
+pid_file='{paths.pid_file}'
+status_file='{paths.status_file}'
+
+mkdir -p "$job_dir"
+: > "$log_file"
+
+nohup bash -c '
+set +e
+export LOG_FILE="'"$log_file"'"
+export STATUS_FILE="'"$status_file"'"
+{env_exports}{escaped_command} >>"'"$log_file"'" 2>&1
+ec=$?
+echo -n "$ec" > "'"$exit_code_tmp"'"
+mv "'"$exit_code_tmp"'" "'"$exit_code_file"'"
+exit 0
+' >/dev/null 2>&1 &
+
+echo -n $! > "$pid_file"
+echo "{paths.job_id}"
+"""
+    return wrapper
+
+
+def build_windows_wrapper_command(
+    command: str,
+    paths: RemoteJobPaths,
+    environment: dict[str, str] | None = None,
+) -> str:
+    """
+    Build a PowerShell wrapper that runs the command detached via 
Start-Process.
+
+    The wrapper:
+    - Creates the job directory
+    - Starts the command in a new detached PowerShell process
+    - Redirects stdout/stderr to the log file
+    - Writes the exit code atomically on completion
+    - Writes the PID for potential cancellation
+
+    :param command: The command to execute (PowerShell script path or command)
+    :param paths: RemoteJobPaths instance with all paths
+    :param environment: Optional environment variables to set
+    :return: PowerShell command string to submit via SSH
+    """
+    env_setup = ""
+    if environment:
+        for key, value in environment.items():
+            _validate_env_var_name(key)
+            escaped_value = value.replace("'", "''")
+            env_setup += f"$env:{key} = '{escaped_value}'; "
+
+    def ps_escape(s: str) -> str:
+        return s.replace("'", "''")
+
+    job_dir = ps_escape(paths.job_dir)
+    log_file = ps_escape(paths.log_file)
+    exit_code_file = ps_escape(paths.exit_code_file)
+    exit_code_tmp = ps_escape(paths.exit_code_tmp_file)
+    pid_file = ps_escape(paths.pid_file)
+    status_file = ps_escape(paths.status_file)
+    escaped_command = ps_escape(command)
+    job_id = ps_escape(paths.job_id)
+
+    child_script = f"""$ErrorActionPreference = 'Continue'
+$env:LOG_FILE = '{log_file}'
+$env:STATUS_FILE = '{status_file}'
+{env_setup}
+{escaped_command}
+$ec = $LASTEXITCODE
+if ($null -eq $ec) {{ $ec = 0 }}
+Set-Content -NoNewline -Path '{exit_code_tmp}' -Value $ec
+Move-Item -Force -Path '{exit_code_tmp}' -Destination '{exit_code_file}'
+"""
+    import base64
+
+    child_script_bytes = child_script.encode("utf-16-le")
+    encoded_script = base64.b64encode(child_script_bytes).decode("ascii")
+
+    wrapper = f"""$jobDir = '{job_dir}'
+New-Item -ItemType Directory -Force -Path $jobDir | Out-Null
+$log = '{log_file}'
+'' | Set-Content -Path $log
+
+$p = Start-Process -FilePath 'powershell.exe' -ArgumentList @('-NoProfile', 
'-NonInteractive', '-EncodedCommand', '{encoded_script}') 
-RedirectStandardOutput $log -RedirectStandardError $log -PassThru -WindowStyle 
Hidden
+Set-Content -NoNewline -Path '{pid_file}' -Value $p.Id
+Write-Output '{job_id}'
+"""
+    wrapper_bytes = wrapper.encode("utf-16-le")
+    encoded_wrapper = base64.b64encode(wrapper_bytes).decode("ascii")
+
+    return f"powershell.exe -NoProfile -NonInteractive -EncodedCommand 
{encoded_wrapper}"
+
+
+def build_posix_log_tail_command(log_file: str, offset: int, max_bytes: int) 
-> str:
+    """
+    Build a POSIX command to read log bytes from offset.
+
+    :param log_file: Path to the log file
+    :param offset: Byte offset to start reading from
+    :param max_bytes: Maximum bytes to read
+    :return: Shell command that outputs the log chunk
+    """
+    return f"dd if='{log_file}' bs=1 skip={offset} count={max_bytes} 
2>/dev/null || true"

Review Comment:
   Yeah, we could -- updated to use tail instead:  
https://github.com/apache/airflow/pull/60297/commits/c705a4180fd7b42624c761d7e9189b9ff77cbd93



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to