kaxil commented on code in PR #68115:
URL: https://github.com/apache/airflow/pull/68115#discussion_r3376779415
##########
providers/ssh/src/airflow/providers/ssh/triggers/ssh_remote_job.py:
##########
@@ -109,81 +133,75 @@ def _get_hook(self) -> SSHHookAsync:
host=self.remote_host,
)
- @tenacity.retry(
- stop=tenacity.stop_after_attempt(3),
- wait=tenacity.wait_exponential(multiplier=1, min=1, max=10),
- retry=tenacity.retry_if_exception_type((OSError, TimeoutError,
ConnectionError)),
- reraise=True,
- )
- async def _check_completion(self, hook: SSHHookAsync) -> int | None:
+ async def _connect(self) -> SSHClientConnection:
+ """Open a reusable asyncssh connection. Separated out as a seam for
testing."""
+ return await self._get_hook().get_conn()
Review Comment:
Good idea, but I'd keep it out of this PR. Sharing connections across
triggers means decoupling the connection lifecycle from the trigger (a
ref-counted pool keyed by `conn_id`+host, and a shared drop now affects every
trigger using it), which is a meaningfully bigger change. This PR already takes
per-job connections from ~17 to 2.
##########
providers/ssh/docs/operators/ssh_remote_job.rst:
##########
@@ -164,6 +164,13 @@ Parameters
* ``remote_os`` (str, optional): Remote OS type (``"auto"``, ``"posix"``,
``"windows"``). Default: ``"auto"``
* ``skip_on_exit_code`` (int or list, optional): Exit code(s) that should
cause task to skip instead of fail
+* ``conn_timeout`` (int, optional): SSH connection timeout in seconds
+* ``banner_timeout`` (float, optional): Seconds to wait for the SSH banner.
Default: 30.0
+* ``conn_retry_attempts`` (int, optional): How many times to attempt the
initial SSH connection for
+ submission and cleanup before failing. Default: 5. Raise this for large
fan-outs where the remote
+ ``sshd`` transiently refuses connections (see :ref:`High Fan-out
<howto/operator:SSHRemoteJobOperator:fanout>`)
+* ``cleanup_retries`` (int, optional): How many times to retry remote
directory cleanup before giving up
+ and leaving the directory in place. Default: 3
Review Comment:
Done in b692c002. Added a link to the "Placing Limits on Mapped Tasks"
section of the dynamic task mapping docs, and reworded the section so it's
clear the connection storm isn't specific to mapped tasks (parallel DAG runs /
high concurrency hit it too).
##########
providers/ssh/src/airflow/providers/ssh/hooks/ssh.py:
##########
@@ -82,6 +82,9 @@ class SSHHook(BaseHook):
lifetime of the transport
:param ciphers: list of ciphers to use in order of preference
:param auth_timeout: timeout (in seconds) for the attempt to authenticate
with the remote_host
+ :param conn_retry_attempts: number of times to attempt the initial SSH
connection before
Review Comment:
Added in b692c002. `execute()` now logs an actionable hint when the submit
connection fails: it points at sshd `MaxStartups` under concurrency,
`conn_retry_attempts`, and pools / `max_active_tis_per_dag`, then re-raises the
original error. I scoped it to the connect step (so it doesn't fire on a job
that connected fine but exited non-zero) and avoided matching on the error
message/type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]