Copilot commented on code in PR #60651:
URL: https://github.com/apache/airflow/pull/60651#discussion_r2705568382


##########
providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/triggers/winrm.py:
##########
@@ -0,0 +1,156 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hook for winrm remote execution."""

Review Comment:
   The docstring incorrectly describes this module as 'Hook for winrm remote 
execution' when it contains trigger classes, not hooks. Update to 'Trigger for 
winrm remote execution.'
   ```suggestion
   """Trigger for winrm remote execution."""
   ```



##########
providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py:
##########
@@ -78,37 +86,87 @@ def __init__(
         self.command = command
         self.ps_path = ps_path
         self.output_encoding = output_encoding
-        self.timeout = timeout
+        self.timeout = timeout.total_seconds() if isinstance(timeout, 
timedelta) else timeout
+        self.poll_interval = (
+            poll_interval.total_seconds()
+            if isinstance(poll_interval, timedelta)
+            else poll_interval
+            if poll_interval is not None
+            else 1.0
+        )
         self.expected_return_code = expected_return_code
         self.working_directory = working_directory
+        self.deferrable = deferrable
 
-    def execute(self, context: Context) -> list | str:
-        if self.ssh_conn_id and not self.winrm_hook:
-            self.log.info("Hook not found, creating...")
-            self.winrm_hook = WinRMHook(ssh_conn_id=self.ssh_conn_id)
-
+    @property
+    def hook(self) -> WinRMHook:
         if not self.winrm_hook:
-            raise AirflowException("Cannot operate without winrm_hook or 
ssh_conn_id.")
+            if self.ssh_conn_id and not self.winrm_hook:
+                self.log.info("Hook not found, creating...")
+                self.winrm_hook = WinRMHook(ssh_conn_id=self.ssh_conn_id)
+
+            if not self.winrm_hook:
+                raise AirflowException("Cannot operate without winrm_hook.")
 
-        if self.remote_host is not None:
-            self.winrm_hook.remote_host = self.remote_host
+            if not self.winrm_hook.ssh_conn_id and self.deferrable:
+                raise AirflowException("Cannot operate in deferrable mode 
without ssh_conn_id.")
 
-        if not self.command:
-            raise AirflowException("No command specified so nothing to execute 
here.")
+            if self.remote_host is not None:
+                self.winrm_hook.remote_host = self.remote_host
 
-        return_code, stdout_buffer, stderr_buffer = self.winrm_hook.run(
+        return self.winrm_hook
+
+    def execute(self, context: Context) -> list | str:
+        if self.deferrable:
+            if self.hook.ssh_conn_id is None:
+                raise AirflowException("ssh_conn_id must be defined in 
deferrable mode!")
+

Review Comment:
   This validation duplicates the check already performed in the `hook` 
property at line 111. The property should handle all validation, and this 
redundant check can be removed.
   ```suggestion
   
   ```



##########
providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/hooks/winrm.py:
##########
@@ -243,55 +250,104 @@ def run(
         :param working_directory: specify working directory.
         :return: returns a tuple containing return_code, stdout and stderr in 
order.
         """
-        winrm_client = self.get_conn()
-        self.log.info("Establishing WinRM connection to host: %s", 
self.remote_host)
-        try:
-            shell_id = 
winrm_client.open_shell(working_directory=working_directory)
-        except Exception as error:
-            error_msg = f"Error connecting to host: {self.remote_host}, error: 
{error}"
-            self.log.error(error_msg)
-            raise AirflowException(error_msg)
+        conn = self.get_conn()
+        shell_id, command_id = self._run_command(
+            conn=conn,
+            command=command,
+            ps_path=ps_path,
+            working_directory=working_directory,
+        )
 
         try:
-            if ps_path is not None:
-                self.log.info("Running command as powershell script: '%s'...", 
command)
-                encoded_ps = 
b64encode(command.encode("utf_16_le")).decode("ascii")
-                command_id = winrm_client.run_command(shell_id, f"{ps_path} 
-encodedcommand {encoded_ps}")
-            else:
-                self.log.info("Running command: '%s'...", command)
-                command_id = winrm_client.run_command(shell_id, command)
-
-                # See: 
https://github.com/diyan/pywinrm/blob/master/winrm/protocol.py
+            command_done = False
             stdout_buffer = []
             stderr_buffer = []
-            command_done = False
+            return_code: int | None = None
+
+            # See: 
https://github.com/diyan/pywinrm/blob/master/winrm/protocol.py
             while not command_done:
-                # this is an expected error when waiting for a long-running 
process, just silently retry
-                with suppress(WinRMOperationTimeoutError):
-                    (
-                        stdout,
-                        stderr,
-                        return_code,
-                        command_done,
-                    ) = winrm_client.get_command_output_raw(shell_id, 
command_id)
-
-                    # Only buffer stdout if we need to so that we minimize 
memory usage.
-                    if return_output:
-                        stdout_buffer.append(stdout)
-                    stderr_buffer.append(stderr)
-
-                    for line in stdout.decode(output_encoding).splitlines():
-                        self.log.info(line)
-                    for line in stderr.decode(output_encoding).splitlines():
-                        self.log.warning(line)
-
-            winrm_client.cleanup_command(shell_id, command_id)
+                (
+                    stdout,
+                    stderr,
+                    return_code,
+                    command_done,
+                ) = self.get_command_output(conn, shell_id, command_id, 
output_encoding)
+
+                # Only buffer stdout if we need to so that we minimize memory 
usage.
+                if return_output:
+                    stdout_buffer.append(stdout)
+                stderr_buffer.append(stderr)
 
             return return_code, stdout_buffer, stderr_buffer
         except Exception as e:
             raise AirflowException(f"WinRM operator error: {e}")
         finally:
-            winrm_client.close_shell(shell_id)
+            conn.cleanup_command(shell_id, command_id)
+            conn.close_shell(shell_id)
+
+    def run_command(
+        self,
+        command: str | None,
+        ps_path: str | None = None,
+        working_directory: str | None = None,
+    ) -> tuple[str, str]:
+        return self._run_command(self.get_conn(), command, ps_path, 
working_directory)
+
+    def _run_command(
+        self,
+        conn: Protocol,
+        command: str | None,
+        ps_path: str | None = None,
+        working_directory: str | None = None,
+    ) -> tuple[str, str]:
+        if not command:
+            raise AirflowException("No command specified so nothing to execute 
here.")
+
+        try:
+            shell_id = conn.open_shell(working_directory=working_directory)
+
+            if ps_path is not None:
+                self.log.info("Running command as powershell script: '%s'...", 
command)
+                encoded_ps = 
b64encode(command.encode("utf_16_le")).decode("ascii")
+                command_id = conn.run_command(shell_id, f"{ps_path} 
-encodedcommand {encoded_ps}")
+            else:
+                self.log.info("Running command: '%s'...", command)
+                command_id = conn.run_command(shell_id, command)
+        except Exception as error:
+            error_msg = f"Error connecting to host: {self.remote_host}, error: 
{error}"
+            self.log.error(error_msg)
+            raise AirflowException(error_msg)
+
+        return shell_id, command_id
+
+    def get_command_output(
+        self, conn: Protocol, shell_id: str, command_id: str, output_encoding: 
str = "utf-8"
+    ) -> tuple[bytes, bytes, int | None, bool]:
+        with suppress(WinRMOperationTimeoutError):
+            (
+                stdout,
+                stderr,
+                return_code,
+                command_done,
+            ) = conn.get_command_output_raw(shell_id, command_id)
+
+            self.log.debug("return_code: ", return_code)
+            self.log.debug("command_done: ", command_done)

Review Comment:
   The log.debug calls use incorrect formatting. The format string should 
include placeholders (e.g., '%s') to properly format the values. Change to 
`self.log.debug('return_code: %s', return_code)` and 
`self.log.debug('command_done: %s', command_done)`.
   ```suggestion
               self.log.debug("return_code: %s", return_code)
               self.log.debug("command_done: %s", command_done)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to