potiuk commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799638037



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -128,78 +131,32 @@ def get_hook(self) -> SSHHook:
 
         return self.ssh_hook
 
-    def get_ssh_client(self) -> SSHClient:
+    def get_ssh_client(self) -> "SSHClient":
         # Remember to use context manager or call .close() on this when done
         self.log.info('Creating ssh_client')
         return self.get_hook().get_conn()
 
-    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> 
Tuple[int, bytes, bytes]:
-        self.log.info("Running command: %s", command)
-
-        # set timeout taken as params
-        stdin, stdout, stderr = ssh_client.exec_command(
-            command=command,
-            get_pty=self.get_pty,
-            timeout=self.timeout,
-            environment=self.environment,
+    def exec_ssh_client_command(self, ssh_client: "SSHClient", command: str):
+        warnings.warn(
+            'exec_ssh_client_command method on SSHOperator is deprecated, call 
'
+            '`ssh_hook.exec_ssh_client_command` instead',
+            DeprecationWarning,
+        )
+        assert self.ssh_hook
+        return self.ssh_hook.exec_ssh_client_command(
+            ssh_client, command, timeout=self.timeout, 
environment=self.environment, get_pty=self.get_pty
         )
-        # get channels
-        channel = stdout.channel
-
-        # closing stdin
-        stdin.close()
-        channel.shutdown_write()
-
-        agg_stdout = b''
-        agg_stderr = b''
-
-        # capture any initial output in case channel is closed already
-        stdout_buffer_length = len(stdout.channel.in_buffer)
-
-        if stdout_buffer_length > 0:
-            agg_stdout += stdout.channel.recv(stdout_buffer_length)
-
-        # read from both stdout and stderr
-        while not channel.closed or channel.recv_ready() or 
channel.recv_stderr_ready():
-            readq, _, _ = select([channel], [], [], self.cmd_timeout)
-            for recv in readq:
-                if recv.recv_ready():
-                    line = stdout.channel.recv(len(recv.in_buffer))
-                    agg_stdout += line
-                    self.log.info(line.decode('utf-8', 'replace').strip('\n'))
-                if recv.recv_stderr_ready():
-                    line = 
stderr.channel.recv_stderr(len(recv.in_stderr_buffer))
-                    agg_stderr += line
-                    self.log.warning(line.decode('utf-8', 
'replace').strip('\n'))
-            if (
-                stdout.channel.exit_status_ready()
-                and not stderr.channel.recv_stderr_ready()
-                and not stdout.channel.recv_ready()
-            ):
-                stdout.channel.shutdown_read()
-                try:
-                    stdout.channel.close()
-                except Exception:
-                    # there is a race that when shutdown_read has been called 
and when
-                    # you try to close the connection, the socket is already 
closed
-                    # We should ignore such errors (but we should log them 
with warning)
-                    self.log.warning("Ignoring exception on close", 
exc_info=True)
-                break
-
-        stdout.close()
-        stderr.close()
-
-        exit_status = stdout.channel.recv_exit_status()
-
-        return exit_status, agg_stdout, agg_stderr
 
     def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
         if exit_status != 0:
             error_msg = stderr.decode('utf-8')
             raise AirflowException(f"error running cmd: {self.command}, error: 
{error_msg}")
 
-    def run_ssh_client_command(self, ssh_client: SSHClient, command: str) -> 
bytes:
-        exit_status, agg_stdout, agg_stderr = 
self.exec_ssh_client_command(ssh_client, command)
+    def run_ssh_client_command(self, ssh_client: "SSHClient", command: str) -> 
bytes:
+        assert self.ssh_hook

Review comment:
       are we cool with asserts now ? I am cool in this case BTW. 

##########
File path: airflow/providers/ssh/hooks/ssh.py
##########
@@ -428,3 +429,70 @@ def _pkey_from_private_key(self, private_key: str, 
passphrase: Optional[str] = N
             'Ensure key provided is valid for one of the following'
             'key formats: RSA, DSS, ECDSA, or Ed25519'
         )
+

Review comment:
       Wow. why was it in the operator? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to