kaxil commented on code in PR #68115:
URL: https://github.com/apache/airflow/pull/68115#discussion_r3376779415


##########
providers/ssh/src/airflow/providers/ssh/triggers/ssh_remote_job.py:
##########
@@ -109,81 +133,75 @@ def _get_hook(self) -> SSHHookAsync:
             host=self.remote_host,
         )
 
-    @tenacity.retry(
-        stop=tenacity.stop_after_attempt(3),
-        wait=tenacity.wait_exponential(multiplier=1, min=1, max=10),
-        retry=tenacity.retry_if_exception_type((OSError, TimeoutError, 
ConnectionError)),
-        reraise=True,
-    )
-    async def _check_completion(self, hook: SSHHookAsync) -> int | None:
+    async def _connect(self) -> SSHClientConnection:
+        """Open a reusable asyncssh connection. Separated out as a seam for 
testing."""
+        return await self._get_hook().get_conn()

Review Comment:
   Good idea, but I'd keep it out of this PR. Sharing connections across 
triggers means decoupling the connection lifecycle from the trigger (a 
ref-counted pool keyed by `conn_id`+host, and a shared drop now affects every 
trigger using it), which is a meaningfully bigger change. This PR already takes 
per-job connections from ~17 to 2.



##########
providers/ssh/docs/operators/ssh_remote_job.rst:
##########
@@ -164,6 +164,13 @@ Parameters
 
 * ``remote_os`` (str, optional): Remote OS type (``"auto"``, ``"posix"``, 
``"windows"``). Default: ``"auto"``
 * ``skip_on_exit_code`` (int or list, optional): Exit code(s) that should 
cause task to skip instead of fail
+* ``conn_timeout`` (int, optional): SSH connection timeout in seconds
+* ``banner_timeout`` (float, optional): Seconds to wait for the SSH banner. 
Default: 30.0
+* ``conn_retry_attempts`` (int, optional): How many times to attempt the 
initial SSH connection for
+  submission and cleanup before failing. Default: 5. Raise this for large 
fan-outs where the remote
+  ``sshd`` transiently refuses connections (see :ref:`High Fan-out 
<howto/operator:SSHRemoteJobOperator:fanout>`)
+* ``cleanup_retries`` (int, optional): How many times to retry remote 
directory cleanup before giving up
+  and leaving the directory in place. Default: 3

Review Comment:
   Done in b692c002. Added a link to the "Placing Limits on Mapped Tasks" 
section of the dynamic task mapping docs, and reworded the section so it's 
clear the connection storm isn't specific to mapped tasks (parallel DAG runs / 
high concurrency hit it too).



##########
providers/ssh/src/airflow/providers/ssh/hooks/ssh.py:
##########
@@ -82,6 +82,9 @@ class SSHHook(BaseHook):
         lifetime of the transport
     :param ciphers: list of ciphers to use in order of preference
     :param auth_timeout: timeout (in seconds) for the attempt to authenticate 
with the remote_host
+    :param conn_retry_attempts: number of times to attempt the initial SSH 
connection before

Review Comment:
   Added in b692c002. `execute()` now logs an actionable hint when the submit 
connection fails: it points at sshd `MaxStartups` under concurrency, 
`conn_retry_attempts`, and pools / `max_active_tis_per_dag`, then re-raises the 
original error. I scoped it to the connect step (so it doesn't fire on a job 
that connected fine but exited non-zero) and avoided matching on the error 
message/type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to