This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ee68a25  Added windows extensions (#16110)
ee68a25 is described below

commit ee68a25facb5541000d5d6524a4009d7e3895ae1
Author: casra-developers <[email protected]>
AuthorDate: Mon Dec 20 13:33:27 2021 +0100

    Added windows extensions (#16110)
---
 airflow/task/task_runner/base_task_runner.py | 35 +++++++++++++++++++-------
 airflow/utils/configuration.py               |  3 ++-
 airflow/utils/platform.py                    |  3 +++
 airflow/utils/process_utils.py               | 11 ++++++---
 airflow/utils/timeout.py                     | 37 +++++++++++++++++++++++++++-
 5 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/airflow/task/task_runner/base_task_runner.py 
b/airflow/task/task_runner/base_task_runner.py
index 5551508..64d528b 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -19,6 +19,13 @@
 import os
 import subprocess
 import threading
+
+from airflow.utils.platform import IS_WINDOWS
+
+if not IS_WINDOWS:
+    # ignored to avoid flake complaining on Linux
+    from pwd import getpwnam  # noqa
+
 from tempfile import NamedTemporaryFile
 from typing import Optional, Union
 
@@ -136,15 +143,25 @@ class BaseTaskRunner(LoggingMixin):
         self.log.info("Running on host: %s", get_hostname())
         self.log.info('Running: %s', full_cmd)
 
-        proc = subprocess.Popen(
-            full_cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            universal_newlines=True,
-            close_fds=True,
-            env=os.environ.copy(),
-            preexec_fn=os.setsid,
-        )
+        if IS_WINDOWS:
+            proc = subprocess.Popen(
+                full_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                universal_newlines=True,
+                close_fds=True,
+                env=os.environ.copy(),
+            )
+        else:
+            proc = subprocess.Popen(
+                full_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                universal_newlines=True,
+                close_fds=True,
+                env=os.environ.copy(),
+                preexec_fn=os.setsid,
+            )
 
         # Start daemon thread to read subprocess logging output
         log_reader = threading.Thread(
diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py
index cc9273c..ec810a9 100644
--- a/airflow/utils/configuration.py
+++ b/airflow/utils/configuration.py
@@ -21,6 +21,7 @@ import os
 from tempfile import mkstemp
 
 from airflow.configuration import conf
+from airflow.utils.platform import IS_WINDOWS
 
 
 def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
@@ -44,7 +45,7 @@ def tmp_configuration_copy(chmod=0o600, include_env=True, 
include_cmds=True):
 
     with os.fdopen(temp_fd, 'w') as temp_file:
         # Set the permissions before we write anything to it.
-        if chmod is not None:
+        if chmod is not None and not IS_WINDOWS:
             os.fchmod(temp_fd, chmod)
         json.dump(cfg_dict, temp_file)
 
diff --git a/airflow/utils/platform.py b/airflow/utils/platform.py
index 0c1db2d..0b36946 100644
--- a/airflow/utils/platform.py
+++ b/airflow/utils/platform.py
@@ -20,8 +20,11 @@ import getpass
 import logging
 import os
 import pkgutil
+import platform
 import sys
 
+IS_WINDOWS = platform.system() == 'Windows'
+
 log = logging.getLogger(__name__)
 
 
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 3b12115..a369bd6 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -20,14 +20,19 @@
 import errno
 import logging
 import os
-import pty
 import select
 import shlex
 import signal
 import subprocess
 import sys
-import termios
-import tty
+
+from airflow.utils.platform import IS_WINDOWS
+
+if not IS_WINDOWS:
+    import tty
+    import termios
+    import pty
+
 from contextlib import contextmanager
 from typing import Dict, List
 
diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py
index 22a0faf..6b4464b 100644
--- a/airflow/utils/timeout.py
+++ b/airflow/utils/timeout.py
@@ -18,12 +18,41 @@
 
 import os
 import signal
+from threading import Timer
+from typing import ContextManager, Optional, Type
 
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.platform import IS_WINDOWS
 
+_timeout = ContextManager[None]
 
-class timeout(LoggingMixin):
+
+class _timeout_windows(_timeout, LoggingMixin):
+    def __init__(self, seconds=1, error_message='Timeout'):
+        super().__init__()
+        self._timer: Optional[Timer] = None
+        self.seconds = seconds
+        self.error_message = error_message + ', PID: ' + str(os.getpid())
+
+    def handle_timeout(self, *args):  # pylint: disable=unused-argument
+        """Logs information and raises AirflowTaskTimeout."""
+        self.log.error("Process timed out, PID: %s", str(os.getpid()))
+        raise AirflowTaskTimeout(self.error_message)
+
+    def __enter__(self):
+        if self._timer:
+            self._timer.cancel()
+        self._timer = Timer(self.seconds, self.handle_timeout)
+        self._timer.start()
+
+    def __exit__(self, type_, value, traceback):
+        if self._timer:
+            self._timer.cancel()
+            self._timer = None
+
+
+class _timeout_posix(_timeout, LoggingMixin):
     """To be used in a ``with`` block and timeout its content."""
 
     def __init__(self, seconds=1, error_message='Timeout'):
@@ -48,3 +77,9 @@ class timeout(LoggingMixin):
             signal.setitimer(signal.ITIMER_REAL, 0)
         except ValueError:
             self.log.warning("timeout can't be used in the current context", 
exc_info=True)
+
+
+if IS_WINDOWS:
+    timeout: Type[_timeout] = _timeout_windows
+else:
+    timeout = _timeout_posix

Reply via email to