BasPH commented on code in PR #60297:
URL: https://github.com/apache/airflow/pull/60297#discussion_r2675989705


##########
providers/ssh/src/airflow/providers/ssh/operators/ssh_remote_job.py:
##########
@@ -0,0 +1,456 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""SSH Remote Job Operator for deferrable remote command execution."""
+
+from __future__ import annotations
+
+from collections.abc import Container, Sequence
+from datetime import timedelta
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Literal
+
+from airflow.providers.common.compat.sdk import AirflowException, 
AirflowSkipException, BaseOperator
+from airflow.providers.ssh.hooks.ssh import SSHHook
+from airflow.providers.ssh.triggers.ssh_remote_job import SSHRemoteJobTrigger
+from airflow.providers.ssh.utils.remote_job import (
+    RemoteJobPaths,
+    build_posix_cleanup_command,
+    build_posix_kill_command,
+    build_posix_os_detection_command,
+    build_posix_wrapper_command,
+    build_windows_cleanup_command,
+    build_windows_kill_command,
+    build_windows_os_detection_command,
+    build_windows_wrapper_command,
+    generate_job_id,
+)
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class SSHRemoteJobOperator(BaseOperator):
+    r"""
+    Execute a command on a remote host via SSH with deferrable monitoring.
+
+    This operator submits a job to run detached on the remote host, then
+    uses a trigger to asynchronously monitor the job status and stream logs.
+    This approach is resilient to network interruptions as the remote job
+    continues running independently of the SSH connection.
+
+    The remote job is wrapped to:
+    - Run detached from the SSH session (via nohup on POSIX, Start-Process on 
Windows)
+    - Redirect stdout/stderr to a log file
+    - Write the exit code to a file on completion
+
+    :param ssh_conn_id: SSH connection ID from Airflow Connections
+    :param command: Command to execute on the remote host (templated)
+    :param remote_host: Override the host from the connection (templated)
+    :param environment: Environment variables to set for the command 
(templated)
+    :param remote_base_dir: Base directory for job artifacts (templated).
+        Defaults to /tmp/airflow-ssh-jobs on POSIX, 
C:\\Windows\\Temp\\airflow-ssh-jobs on Windows
+    :param poll_interval: Seconds between status polls (default: 5)
+    :param log_chunk_size: Max bytes to read per poll (default: 65536)
+    :param timeout: Hard timeout in seconds for the entire operation
+    :param cleanup: When to clean up remote job directory:
+        'never', 'on_success', or 'always' (default: 'never')
+    :param remote_os: Remote operating system: 'auto', 'posix', or 'windows' 
(default: 'auto')
+    :param skip_on_exit_code: Exit codes that should skip the task instead of 
failing
+    :param conn_timeout: SSH connection timeout in seconds
+    :param banner_timeout: Timeout waiting for SSH banner in seconds
+    """
+
+    template_fields: Sequence[str] = ("command", "environment", "remote_host", 
"remote_base_dir")
+    template_ext: Sequence[str] = (
+        ".sh",
+        ".bash",
+        ".ps1",
+    )
+    template_fields_renderers = {
+        "command": "bash",
+        "environment": "python",
+    }
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        ssh_conn_id: str,
+        command: str,
+        remote_host: str | None = None,
+        environment: dict[str, str] | None = None,
+        remote_base_dir: str | None = None,
+        poll_interval: int = 5,
+        log_chunk_size: int = 65536,
+        timeout: int | None = None,
+        cleanup: Literal["never", "on_success", "always"] = "never",
+        remote_os: Literal["auto", "posix", "windows"] = "auto",
+        skip_on_exit_code: int | Container[int] | None = None,
+        conn_timeout: int | None = None,
+        banner_timeout: float = 30.0,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_conn_id = ssh_conn_id
+        self.command = command
+        self.remote_host = remote_host
+        self.environment = environment
+
+        if remote_base_dir is not None:
+            self._validate_base_dir(remote_base_dir)
+        self.remote_base_dir = remote_base_dir
+
+        self.poll_interval = poll_interval
+        self.log_chunk_size = log_chunk_size
+        self.timeout = timeout
+        self.cleanup = cleanup
+        self.remote_os = remote_os
+        self.conn_timeout = conn_timeout
+        self.banner_timeout = banner_timeout
+        self.skip_on_exit_code = (
+            skip_on_exit_code
+            if isinstance(skip_on_exit_code, Container)
+            else [skip_on_exit_code]
+            if skip_on_exit_code is not None
+            else []
+        )
+
+        self._job_id: str | None = None
+        self._paths: RemoteJobPaths | None = None
+        self._detected_os: Literal["posix", "windows"] | None = None
+
+    @staticmethod
+    def _validate_base_dir(path: str) -> None:
+        """
+        Validate the remote base directory path for security.
+
+        :param path: Path to validate
+        :raises ValueError: If path contains dangerous patterns
+        """
+        if not path:
+            raise ValueError("remote_base_dir cannot be empty")
+
+        if ".." in path:
+            raise ValueError(f"remote_base_dir cannot contain '..' (path 
traversal not allowed). Got: {path}")
+
+        if "\x00" in path:
+            raise ValueError("remote_base_dir cannot contain null bytes")
+
+        dangerous_patterns = ["/etc", "/bin", "/sbin", "/boot", "C:\\Windows", 
"C:\\Program Files"]
+        for pattern in dangerous_patterns:
+            if pattern in path:
+                import warnings
+
+                warnings.warn(
+                    f"remote_base_dir '{path}' contains potentially sensitive 
path '{pattern}'. "
+                    "Ensure you have appropriate permissions.",
+                    UserWarning,
+                    stacklevel=3,
+                )
+
+    @cached_property
+    def ssh_hook(self) -> SSHHook:
+        """Create the SSH hook for command submission."""
+        return SSHHook(
+            ssh_conn_id=self.ssh_conn_id,
+            remote_host=self.remote_host or "",
+            conn_timeout=self.conn_timeout,
+            banner_timeout=self.banner_timeout,
+        )
+
+    def _detect_remote_os(self) -> Literal["posix", "windows"]:
+        """
+        Detect the remote operating system.
+
+        Uses a two-stage detection:
+        1. Try POSIX detection via `uname` (works on Linux, macOS, BSD, 
Solaris, AIX, etc.)
+        2. Try Windows detection via PowerShell
+        3. Raise error if both fail
+        """
+        if self.remote_os != "auto":
+            return self.remote_os
+
+        self.log.info("Auto-detecting remote operating system...")
+        with self.ssh_hook.get_conn() as ssh_client:
+            try:
+                exit_status, stdout, _ = self.ssh_hook.exec_ssh_client_command(
+                    ssh_client,
+                    build_posix_os_detection_command(),
+                    get_pty=False,
+                    environment=None,
+                    timeout=10,
+                )
+                if exit_status == 0 and stdout:
+                    output = stdout.decode("utf-8", 
errors="replace").strip().lower()
+                    posix_systems = [
+                        "linux",
+                        "darwin",
+                        "freebsd",
+                        "openbsd",
+                        "netbsd",
+                        "sunos",
+                        "aix",
+                        "hp-ux",
+                    ]
+                    if any(system in output for system in posix_systems):
+                        self.log.info("Detected POSIX system: %s", output)
+                        return "posix"
+            except Exception as e:
+                self.log.debug("POSIX detection failed: %s", e)
+
+            try:
+                exit_status, stdout, _ = self.ssh_hook.exec_ssh_client_command(
+                    ssh_client,
+                    build_windows_os_detection_command(),
+                    get_pty=False,
+                    environment=None,
+                    timeout=10,
+                )
+                if exit_status == 0 and stdout:
+                    output = stdout.decode("utf-8", errors="replace").strip()
+                    if "WINDOWS" in output.upper():
+                        self.log.info("Detected Windows system")
+                        return "windows"
+            except Exception as e:
+                self.log.debug("Windows detection failed: %s", e)
+
+            raise AirflowException(
+                "Could not auto-detect remote OS. Please explicitly set 
remote_os='posix' or 'windows'"
+            )
+
+    def execute(self, context: Context) -> None:
+        """
+        Submit the remote job and defer to the trigger for monitoring.
+
+        :param context: Airflow task context
+        """
+        if not self.command:
+            raise AirflowException("SSH operator error: command not 
specified.")
+
+        self._detected_os = self._detect_remote_os()
+        self.log.info("Remote OS: %s", self._detected_os)
+
+        ti = context["ti"]
+        self._job_id = generate_job_id(
+            dag_id=ti.dag_id,
+            task_id=ti.task_id,
+            run_id=ti.run_id,
+            try_number=ti.try_number,
+        )
+        self.log.info("Generated job ID: %s", self._job_id)
+
+        self._paths = RemoteJobPaths(
+            job_id=self._job_id,
+            remote_os=self._detected_os,
+            base_dir=self.remote_base_dir,
+        )
+
+        if self._detected_os == "posix":
+            wrapper_cmd = build_posix_wrapper_command(
+                command=self.command,
+                paths=self._paths,
+                environment=self.environment,
+            )
+        else:
+            wrapper_cmd = build_windows_wrapper_command(
+                command=self.command,
+                paths=self._paths,
+                environment=self.environment,
+            )
+
+        self.log.info("Submitting remote job to %s", self.ssh_hook.remote_host 
or "configured host")
+        with self.ssh_hook.get_conn() as ssh_client:
+            exit_status, stdout, stderr = 
self.ssh_hook.exec_ssh_client_command(
+                ssh_client,
+                wrapper_cmd,
+                get_pty=False,
+                environment=None,
+                timeout=60,
+            )
+
+            if exit_status != 0:
+                stderr_str = stderr.decode("utf-8", errors="replace") if 
stderr else ""
+                raise AirflowException(
+                    f"Failed to submit remote job. Exit code: {exit_status}. 
Error: {stderr_str}"
+                )
+
+            returned_job_id = stdout.decode("utf-8", errors="replace").strip() 
if stdout else ""
+            if returned_job_id != self._job_id:
+                self.log.warning("Job ID mismatch. Expected: %s, Got: %s", 
self._job_id, returned_job_id)
+
+        self.log.info("Remote job submitted successfully. Job ID: %s", 
self._job_id)
+        self.log.info("Job directory: %s", self._paths.job_dir)
+
+        if self.do_xcom_push:
+            ti.xcom_push(
+                key="ssh_remote_job",
+                value={
+                    "job_id": self._job_id,
+                    "job_dir": self._paths.job_dir,
+                    "log_file": self._paths.log_file,
+                    "exit_code_file": self._paths.exit_code_file,
+                    "pid_file": self._paths.pid_file,
+                    "remote_os": self._detected_os,
+                },
+            )
+
+        self.defer(
+            trigger=SSHRemoteJobTrigger(
+                ssh_conn_id=self.ssh_conn_id,
+                remote_host=self.remote_host,
+                job_id=self._job_id,
+                job_dir=self._paths.job_dir,
+                log_file=self._paths.log_file,
+                exit_code_file=self._paths.exit_code_file,
+                remote_os=self._detected_os,
+                poll_interval=self.poll_interval,
+                log_chunk_size=self.log_chunk_size,
+                log_offset=0,
+            ),
+            method_name="execute_complete",
+            timeout=timedelta(seconds=self.timeout) if self.timeout else None,
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any]) -> 
None:
+        """
+        Handle trigger events and re-defer if job is still running.
+
+        :param context: Airflow task context
+        :param event: Event data from the trigger
+        """
+        if not event:
+            raise AirflowException("Received null event from trigger")
+
+        required_keys = ["job_id", "job_dir", "log_file", "exit_code_file", 
"remote_os", "done"]
+        missing_keys = [key for key in required_keys if key not in event]
+        if missing_keys:
+            raise AirflowException(
+                f"Invalid trigger event: missing required keys {missing_keys}. 
Event: {event}"
+            )
+
+        log_chunk = event.get("log_chunk", "")
+        if log_chunk:
+            for line in log_chunk.splitlines():
+                self.log.info("[remote] %s", line)
+
+        if not event.get("done", False):
+            self.log.debug("Job still running, continuing to monitor...")
+            self.defer(
+                trigger=SSHRemoteJobTrigger(
+                    ssh_conn_id=self.ssh_conn_id,
+                    remote_host=self.remote_host,
+                    job_id=event["job_id"],
+                    job_dir=event["job_dir"],
+                    log_file=event["log_file"],
+                    exit_code_file=event["exit_code_file"],
+                    remote_os=event["remote_os"],
+                    poll_interval=self.poll_interval,
+                    log_chunk_size=self.log_chunk_size,
+                    log_offset=event.get("log_offset", 0),
+                ),
+                method_name="execute_complete",
+                timeout=timedelta(seconds=self.timeout) if self.timeout else 
None,
+            )
+            return
+
+        exit_code = event.get("exit_code")
+        job_dir = event.get("job_dir", "")
+        remote_os = event.get("remote_os", "posix")
+
+        self.log.info("Remote job completed with exit code: %s", exit_code)
+
+        should_cleanup = self.cleanup == "always" or (self.cleanup == 
"on_success" and exit_code == 0)
+        if should_cleanup and job_dir:
+            self._cleanup_remote_job(job_dir, remote_os)
+
+        if exit_code is None:
+            raise AirflowException(f"Remote job failed: {event.get('message', 
'Unknown error')}")
+
+        if exit_code in self.skip_on_exit_code:
+            raise AirflowSkipException(f"Remote job returned skip exit code: 
{exit_code}")
+
+        if exit_code != 0:
+            raise AirflowException(f"Remote job failed with exit code: 
{exit_code}")
+
+        self.log.info("Remote job completed successfully")
+
+    def _cleanup_remote_job(self, job_dir: str, remote_os: str) -> None:
+        """Clean up the remote job directory."""
+        self.log.info("Cleaning up remote job directory: %s", job_dir)
+        try:
+            if remote_os == "posix":
+                cleanup_cmd = build_posix_cleanup_command(job_dir)
+            else:
+                cleanup_cmd = build_windows_cleanup_command(job_dir)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                self.ssh_hook.exec_ssh_client_command(
+                    ssh_client,
+                    cleanup_cmd,
+                    get_pty=False,
+                    environment=None,
+                    timeout=30,
+                )
+            self.log.info("Remote cleanup completed")
+        except Exception as e:
+            self.log.warning("Failed to clean up remote job directory: %s", e)
+
+    def on_kill(self) -> None:
+        """
+        Attempt to kill the remote process when the task is killed.
+
+        Since the operator instance may have been rehydrated after deferral,

Review Comment:
   What does "rehydrated" mean here?



##########
providers/ssh/src/airflow/providers/ssh/operators/ssh_remote_job.py:
##########
@@ -0,0 +1,456 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""SSH Remote Job Operator for deferrable remote command execution."""
+
+from __future__ import annotations
+
+from collections.abc import Container, Sequence
+from datetime import timedelta
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Literal
+
+from airflow.providers.common.compat.sdk import AirflowException, 
AirflowSkipException, BaseOperator
+from airflow.providers.ssh.hooks.ssh import SSHHook
+from airflow.providers.ssh.triggers.ssh_remote_job import SSHRemoteJobTrigger
+from airflow.providers.ssh.utils.remote_job import (
+    RemoteJobPaths,
+    build_posix_cleanup_command,
+    build_posix_kill_command,
+    build_posix_os_detection_command,
+    build_posix_wrapper_command,
+    build_windows_cleanup_command,
+    build_windows_kill_command,
+    build_windows_os_detection_command,
+    build_windows_wrapper_command,
+    generate_job_id,
+)
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class SSHRemoteJobOperator(BaseOperator):
+    r"""
+    Execute a command on a remote host via SSH with deferrable monitoring.
+
+    This operator submits a job to run detached on the remote host, then
+    uses a trigger to asynchronously monitor the job status and stream logs.
+    This approach is resilient to network interruptions as the remote job
+    continues running independently of the SSH connection.
+
+    The remote job is wrapped to:
+    - Run detached from the SSH session (via nohup on POSIX, Start-Process on 
Windows)
+    - Redirect stdout/stderr to a log file
+    - Write the exit code to a file on completion
+
+    :param ssh_conn_id: SSH connection ID from Airflow Connections
+    :param command: Command to execute on the remote host (templated)
+    :param remote_host: Override the host from the connection (templated)
+    :param environment: Environment variables to set for the command 
(templated)
+    :param remote_base_dir: Base directory for job artifacts (templated).
+        Defaults to /tmp/airflow-ssh-jobs on POSIX, 
C:\\Windows\\Temp\\airflow-ssh-jobs on Windows
+    :param poll_interval: Seconds between status polls (default: 5)
+    :param log_chunk_size: Max bytes to read per poll (default: 65536)
+    :param timeout: Hard timeout in seconds for the entire operation
+    :param cleanup: When to clean up remote job directory:
+        'never', 'on_success', or 'always' (default: 'never')
+    :param remote_os: Remote operating system: 'auto', 'posix', or 'windows' 
(default: 'auto')
+    :param skip_on_exit_code: Exit codes that should skip the task instead of 
failing
+    :param conn_timeout: SSH connection timeout in seconds
+    :param banner_timeout: Timeout waiting for SSH banner in seconds
+    """
+
+    template_fields: Sequence[str] = ("command", "environment", "remote_host", 
"remote_base_dir")
+    template_ext: Sequence[str] = (
+        ".sh",
+        ".bash",
+        ".ps1",
+    )
+    template_fields_renderers = {
+        "command": "bash",
+        "environment": "python",
+    }
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        ssh_conn_id: str,
+        command: str,
+        remote_host: str | None = None,
+        environment: dict[str, str] | None = None,
+        remote_base_dir: str | None = None,
+        poll_interval: int = 5,
+        log_chunk_size: int = 65536,
+        timeout: int | None = None,
+        cleanup: Literal["never", "on_success", "always"] = "never",
+        remote_os: Literal["auto", "posix", "windows"] = "auto",
+        skip_on_exit_code: int | Container[int] | None = None,
+        conn_timeout: int | None = None,
+        banner_timeout: float = 30.0,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_conn_id = ssh_conn_id
+        self.command = command
+        self.remote_host = remote_host
+        self.environment = environment
+
+        if remote_base_dir is not None:
+            self._validate_base_dir(remote_base_dir)
+        self.remote_base_dir = remote_base_dir
+
+        self.poll_interval = poll_interval
+        self.log_chunk_size = log_chunk_size
+        self.timeout = timeout
+        self.cleanup = cleanup
+        self.remote_os = remote_os
+        self.conn_timeout = conn_timeout
+        self.banner_timeout = banner_timeout
+        self.skip_on_exit_code = (
+            skip_on_exit_code
+            if isinstance(skip_on_exit_code, Container)
+            else [skip_on_exit_code]
+            if skip_on_exit_code is not None
+            else []
+        )
+
+        self._job_id: str | None = None
+        self._paths: RemoteJobPaths | None = None
+        self._detected_os: Literal["posix", "windows"] | None = None
+
+    @staticmethod
+    def _validate_base_dir(path: str) -> None:
+        """
+        Validate the remote base directory path for security.
+
+        :param path: Path to validate
+        :raises ValueError: If path contains dangerous patterns
+        """
+        if not path:
+            raise ValueError("remote_base_dir cannot be empty")
+
+        if ".." in path:
+            raise ValueError(f"remote_base_dir cannot contain '..' (path 
traversal not allowed). Got: {path}")
+
+        if "\x00" in path:
+            raise ValueError("remote_base_dir cannot contain null bytes")
+
+        dangerous_patterns = ["/etc", "/bin", "/sbin", "/boot", "C:\\Windows", 
"C:\\Program Files"]
+        for pattern in dangerous_patterns:
+            if pattern in path:
+                import warnings
+
+                warnings.warn(
+                    f"remote_base_dir '{path}' contains potentially sensitive 
path '{pattern}'. "
+                    "Ensure you have appropriate permissions.",
+                    UserWarning,
+                    stacklevel=3,
+                )
+
+    @cached_property
+    def ssh_hook(self) -> SSHHook:
+        """Create the SSH hook for command submission."""
+        return SSHHook(
+            ssh_conn_id=self.ssh_conn_id,
+            remote_host=self.remote_host or "",
+            conn_timeout=self.conn_timeout,
+            banner_timeout=self.banner_timeout,
+        )
+
+    def _detect_remote_os(self) -> Literal["posix", "windows"]:
+        """
+        Detect the remote operating system.
+
+        Uses a two-stage detection:
+        1. Try POSIX detection via `uname` (works on Linux, macOS, BSD, 
Solaris, AIX, etc.)
+        2. Try Windows detection via PowerShell
+        3. Raise error if both fail
+        """
+        if self.remote_os != "auto":
+            return self.remote_os
+
+        self.log.info("Auto-detecting remote operating system...")
+        with self.ssh_hook.get_conn() as ssh_client:
+            try:
+                exit_status, stdout, _ = self.ssh_hook.exec_ssh_client_command(
+                    ssh_client,
+                    build_posix_os_detection_command(),
+                    get_pty=False,
+                    environment=None,
+                    timeout=10,
+                )
+                if exit_status == 0 and stdout:
+                    output = stdout.decode("utf-8", 
errors="replace").strip().lower()
+                    posix_systems = [
+                        "linux",
+                        "darwin",
+                        "freebsd",
+                        "openbsd",
+                        "netbsd",
+                        "sunos",
+                        "aix",
+                        "hp-ux",
+                    ]
+                    if any(system in output for system in posix_systems):
+                        self.log.info("Detected POSIX system: %s", output)
+                        return "posix"
+            except Exception as e:
+                self.log.debug("POSIX detection failed: %s", e)
+
+            try:
+                exit_status, stdout, _ = self.ssh_hook.exec_ssh_client_command(
+                    ssh_client,
+                    build_windows_os_detection_command(),
+                    get_pty=False,
+                    environment=None,
+                    timeout=10,
+                )
+                if exit_status == 0 and stdout:
+                    output = stdout.decode("utf-8", errors="replace").strip()
+                    if "WINDOWS" in output.upper():
+                        self.log.info("Detected Windows system")
+                        return "windows"
+            except Exception as e:
+                self.log.debug("Windows detection failed: %s", e)
+
+            raise AirflowException(
+                "Could not auto-detect remote OS. Please explicitly set 
remote_os='posix' or 'windows'"
+            )
+
+    def execute(self, context: Context) -> None:
+        """
+        Submit the remote job and defer to the trigger for monitoring.
+
+        :param context: Airflow task context
+        """
+        if not self.command:
+            raise AirflowException("SSH operator error: command not 
specified.")
+
+        self._detected_os = self._detect_remote_os()
+        self.log.info("Remote OS: %s", self._detected_os)
+
+        ti = context["ti"]
+        self._job_id = generate_job_id(
+            dag_id=ti.dag_id,
+            task_id=ti.task_id,
+            run_id=ti.run_id,
+            try_number=ti.try_number,
+        )
+        self.log.info("Generated job ID: %s", self._job_id)
+
+        self._paths = RemoteJobPaths(
+            job_id=self._job_id,
+            remote_os=self._detected_os,
+            base_dir=self.remote_base_dir,
+        )
+
+        if self._detected_os == "posix":
+            wrapper_cmd = build_posix_wrapper_command(
+                command=self.command,
+                paths=self._paths,
+                environment=self.environment,
+            )
+        else:
+            wrapper_cmd = build_windows_wrapper_command(
+                command=self.command,
+                paths=self._paths,
+                environment=self.environment,
+            )
+
+        self.log.info("Submitting remote job to %s", self.ssh_hook.remote_host 
or "configured host")

Review Comment:
   Is it even possible to have an empty `remote_host` at this point?



##########
providers/ssh/src/airflow/providers/ssh/utils/remote_job.py:
##########
@@ -0,0 +1,433 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Utilities for SSH remote job execution."""
+
+from __future__ import annotations
+
+import re
+import secrets
+import string
+from dataclasses import dataclass
+from typing import Literal
+
+
+def _validate_env_var_name(name: str) -> None:
+    """
+    Validate environment variable name for security.
+
+    :param name: Environment variable name
+    :raises ValueError: If name contains dangerous characters
+    """
+    if not name:
+        raise ValueError("Environment variable name cannot be empty")
+
+    if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name):
+        raise ValueError(
+            f"Invalid environment variable name '{name}'. "
+            "Only alphanumeric characters and underscores are allowed, "
+            "and the name must start with a letter or underscore."
+        )
+
+
+def generate_job_id(
+    dag_id: str,
+    task_id: str,
+    run_id: str,
+    try_number: int,
+    suffix_length: int = 8,
+) -> str:
+    """
+    Generate a unique job ID for remote execution.
+
+    Creates a deterministic identifier from the task context with a random 
suffix
+    to ensure uniqueness across retries and potential race conditions.
+
+    :param dag_id: The DAG identifier
+    :param task_id: The task identifier
+    :param run_id: The run identifier
+    :param try_number: The attempt number
+    :param suffix_length: Length of random suffix (default 8)
+    :return: Sanitized job ID string
+    """
+
+    def sanitize(value: str) -> str:
+        return re.sub(r"[^a-zA-Z0-9]", "_", value)[:50]
+
+    sanitized_dag = sanitize(dag_id)
+    sanitized_task = sanitize(task_id)
+    sanitized_run = sanitize(run_id)
+
+    alphabet = string.ascii_lowercase + string.digits
+    suffix = "".join(secrets.choice(alphabet) for _ in range(suffix_length))
+
+    return 
f"af_{sanitized_dag}_{sanitized_task}_{sanitized_run}_try{try_number}_{suffix}"
+
+
+@dataclass
+class RemoteJobPaths:
+    """Paths for remote job artifacts on the target system."""
+
+    job_id: str
+    remote_os: Literal["posix", "windows"]
+    base_dir: str | None = None
+
+    def __post_init__(self):
+        if self.base_dir is None:
+            if self.remote_os == "posix":
+                self.base_dir = "/tmp/airflow-ssh-jobs"
+            else:
+                self.base_dir = "$env:TEMP\\airflow-ssh-jobs"
+
+    @property
+    def sep(self) -> str:
+        """Path separator for the remote OS."""
+        return "\\" if self.remote_os == "windows" else "/"
+
+    @property
+    def job_dir(self) -> str:
+        """Directory containing all job artifacts."""
+        return f"{self.base_dir}{self.sep}{self.job_id}"
+
+    @property
+    def log_file(self) -> str:
+        """Path to stdout/stderr log file."""
+        return f"{self.job_dir}{self.sep}stdout.log"
+
+    @property
+    def exit_code_file(self) -> str:
+        """Path to exit code file (written on completion)."""
+        return f"{self.job_dir}{self.sep}exit_code"
+
+    @property
+    def exit_code_tmp_file(self) -> str:
+        """Temporary exit code file (for atomic write)."""
+        return f"{self.job_dir}{self.sep}exit_code.tmp"
+
+    @property
+    def pid_file(self) -> str:
+        """Path to PID file for the background process."""
+        return f"{self.job_dir}{self.sep}pid"
+
+    @property
+    def status_file(self) -> str:
+        """Path to optional status file for progress updates."""
+        return f"{self.job_dir}{self.sep}status"
+
+
+def build_posix_wrapper_command(
+    command: str,
+    paths: RemoteJobPaths,
+    environment: dict[str, str] | None = None,
+) -> str:
+    """
+    Build a POSIX shell wrapper that runs the command detached via nohup.
+
+    The wrapper:
+    - Creates the job directory
+    - Starts the command in the background with nohup
+    - Redirects stdout/stderr to the log file
+    - Writes the exit code atomically on completion
+    - Writes the PID for potential cancellation
+
+    :param command: The command to execute
+    :param paths: RemoteJobPaths instance with all paths
+    :param environment: Optional environment variables to set
+    :return: Shell command string to submit via SSH
+    """
+    env_exports = ""
+    if environment:
+        for key, value in environment.items():
+            _validate_env_var_name(key)
+            escaped_value = value.replace("'", "'\"'\"'")
+            env_exports += f"export {key}='{escaped_value}'\n"
+
+    escaped_command = command.replace("'", "'\"'\"'")
+
+    wrapper = f"""set -euo pipefail
+job_dir='{paths.job_dir}'
+log_file='{paths.log_file}'
+exit_code_file='{paths.exit_code_file}'
+exit_code_tmp='{paths.exit_code_tmp_file}'
+pid_file='{paths.pid_file}'
+status_file='{paths.status_file}'
+
+mkdir -p "$job_dir"
+: > "$log_file"
+
+nohup bash -c '
+set +e
+export LOG_FILE="'"$log_file"'"
+export STATUS_FILE="'"$status_file"'"
+{env_exports}{escaped_command} >>"'"$log_file"'" 2>&1
+ec=$?
+echo -n "$ec" > "'"$exit_code_tmp"'"
+mv "'"$exit_code_tmp"'" "'"$exit_code_file"'"
+exit 0
+' >/dev/null 2>&1 &
+
+echo -n $! > "$pid_file"
+echo "{paths.job_id}"
+"""
+    return wrapper
+
+
+def build_windows_wrapper_command(
+    command: str,
+    paths: RemoteJobPaths,
+    environment: dict[str, str] | None = None,
+) -> str:
+    """
+    Build a PowerShell wrapper that runs the command detached via 
Start-Process.
+
+    The wrapper:
+    - Creates the job directory
+    - Starts the command in a new detached PowerShell process
+    - Redirects stdout/stderr to the log file
+    - Writes the exit code atomically on completion
+    - Writes the PID for potential cancellation
+
+    :param command: The command to execute (PowerShell script path or command)
+    :param paths: RemoteJobPaths instance with all paths
+    :param environment: Optional environment variables to set
+    :return: PowerShell command string to submit via SSH
+    """
+    env_setup = ""
+    if environment:
+        for key, value in environment.items():
+            _validate_env_var_name(key)
+            escaped_value = value.replace("'", "''")
+            env_setup += f"$env:{key} = '{escaped_value}'; "
+
+    def ps_escape(s: str) -> str:
+        return s.replace("'", "''")
+
+    job_dir = ps_escape(paths.job_dir)
+    log_file = ps_escape(paths.log_file)
+    exit_code_file = ps_escape(paths.exit_code_file)
+    exit_code_tmp = ps_escape(paths.exit_code_tmp_file)
+    pid_file = ps_escape(paths.pid_file)
+    status_file = ps_escape(paths.status_file)
+    escaped_command = ps_escape(command)
+    job_id = ps_escape(paths.job_id)
+
+    child_script = f"""$ErrorActionPreference = 'Continue'
+$env:LOG_FILE = '{log_file}'
+$env:STATUS_FILE = '{status_file}'
+{env_setup}
+{escaped_command}
+$ec = $LASTEXITCODE
+if ($null -eq $ec) {{ $ec = 0 }}
+Set-Content -NoNewline -Path '{exit_code_tmp}' -Value $ec
+Move-Item -Force -Path '{exit_code_tmp}' -Destination '{exit_code_file}'
+"""
+    import base64
+
+    child_script_bytes = child_script.encode("utf-16-le")
+    encoded_script = base64.b64encode(child_script_bytes).decode("ascii")
+
+    wrapper = f"""$jobDir = '{job_dir}'
+New-Item -ItemType Directory -Force -Path $jobDir | Out-Null
+$log = '{log_file}'
+'' | Set-Content -Path $log
+
+$p = Start-Process -FilePath 'powershell.exe' -ArgumentList @('-NoProfile', 
'-NonInteractive', '-EncodedCommand', '{encoded_script}') 
-RedirectStandardOutput $log -RedirectStandardError $log -PassThru -WindowStyle 
Hidden
+Set-Content -NoNewline -Path '{pid_file}' -Value $p.Id
+Write-Output '{job_id}'
+"""
+    wrapper_bytes = wrapper.encode("utf-16-le")
+    encoded_wrapper = base64.b64encode(wrapper_bytes).decode("ascii")
+
+    return f"powershell.exe -NoProfile -NonInteractive -EncodedCommand 
{encoded_wrapper}"
+
+
+def build_posix_log_tail_command(log_file: str, offset: int, max_bytes: int) 
-> str:
+    """
+    Build a POSIX command to read log bytes from offset.
+
+    :param log_file: Path to the log file
+    :param offset: Byte offset to start reading from
+    :param max_bytes: Maximum bytes to read
+    :return: Shell command that outputs the log chunk
+    """
+    return f"dd if='{log_file}' bs=1 skip={offset} count={max_bytes} 
2>/dev/null || true"

Review Comment:
   `bs=1` will require 1M read/write system context switches to read a 1MB log 
file. Should we bump it to e.g. 4096?
   
   Small example:
   ```bash
   $ dd if=/dev/zero of=perf_test_1mb.tmp bs=1M count=1 status=none
   
   $ time dd if=perf_test_1mb.tmp of=/dev/null bs=1 2>/dev/null
   
   real 0m0.947s
   user 0m0.250s
   sys  0m0.691s
   
   $ time dd if=perf_test_1mb.tmp of=/dev/null bs=4096 2>/dev/null
   
   real 0m0.013s
   user 0m0.003s
   sys  0m0.007s
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to