ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799640218
##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -128,78 +131,32 @@ def get_hook(self) -> SSHHook:
return self.ssh_hook
- def get_ssh_client(self) -> SSHClient:
+ def get_ssh_client(self) -> "SSHClient":
# Remember to use context manager or call .close() on this when done
self.log.info('Creating ssh_client')
return self.get_hook().get_conn()
- def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) ->
Tuple[int, bytes, bytes]:
- self.log.info("Running command: %s", command)
-
- # set timeout taken as params
- stdin, stdout, stderr = ssh_client.exec_command(
- command=command,
- get_pty=self.get_pty,
- timeout=self.timeout,
- environment=self.environment,
+ def exec_ssh_client_command(self, ssh_client: "SSHClient", command: str):
+ warnings.warn(
+ 'exec_ssh_client_command method on SSHOperator is deprecated, call
'
+ '`ssh_hook.exec_ssh_client_command` instead',
+ DeprecationWarning,
+ )
+ assert self.ssh_hook
+ return self.ssh_hook.exec_ssh_client_command(
+ ssh_client, command, timeout=self.timeout,
environment=self.environment, get_pty=self.get_pty
)
- # get channels
- channel = stdout.channel
-
- # closing stdin
- stdin.close()
- channel.shutdown_write()
-
- agg_stdout = b''
- agg_stderr = b''
-
- # capture any initial output in case channel is closed already
- stdout_buffer_length = len(stdout.channel.in_buffer)
-
- if stdout_buffer_length > 0:
- agg_stdout += stdout.channel.recv(stdout_buffer_length)
-
- # read from both stdout and stderr
- while not channel.closed or channel.recv_ready() or
channel.recv_stderr_ready():
- readq, _, _ = select([channel], [], [], self.cmd_timeout)
- for recv in readq:
- if recv.recv_ready():
- line = stdout.channel.recv(len(recv.in_buffer))
- agg_stdout += line
- self.log.info(line.decode('utf-8', 'replace').strip('\n'))
- if recv.recv_stderr_ready():
- line =
stderr.channel.recv_stderr(len(recv.in_stderr_buffer))
- agg_stderr += line
- self.log.warning(line.decode('utf-8',
'replace').strip('\n'))
- if (
- stdout.channel.exit_status_ready()
- and not stderr.channel.recv_stderr_ready()
- and not stdout.channel.recv_ready()
- ):
- stdout.channel.shutdown_read()
- try:
- stdout.channel.close()
- except Exception:
- # there is a race that when shutdown_read has been called
and when
- # you try to close the connection, the socket is already
closed
- # We should ignore such errors (but we should log them
with warning)
- self.log.warning("Ignoring exception on close",
exc_info=True)
- break
-
- stdout.close()
- stderr.close()
-
- exit_status = stdout.channel.recv_exit_status()
-
- return exit_status, agg_stdout, agg_stderr
def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
if exit_status != 0:
error_msg = stderr.decode('utf-8')
raise AirflowException(f"error running cmd: {self.command}, error:
{error_msg}")
- def run_ssh_client_command(self, ssh_client: SSHClient, command: str) ->
bytes:
- exit_status, agg_stdout, agg_stderr =
self.exec_ssh_client_command(ssh_client, command)
+ def run_ssh_client_command(self, ssh_client: "SSHClient", command: str) ->
bytes:
+ assert self.ssh_hook
Review comment:
I took the case of "this is just for typing, to protect against none,
and has no harm if it doesn't exist"
I'll update the guidance (possibly to say "don't use asserts for anything
that wouldn't fail in other ways"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]