This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ee68a25 Added windows extensions (#16110)
ee68a25 is described below
commit ee68a25facb5541000d5d6524a4009d7e3895ae1
Author: casra-developers <[email protected]>
AuthorDate: Mon Dec 20 13:33:27 2021 +0100
Added windows extensions (#16110)
---
airflow/task/task_runner/base_task_runner.py | 35 +++++++++++++++++++-------
airflow/utils/configuration.py | 3 ++-
airflow/utils/platform.py | 3 +++
airflow/utils/process_utils.py | 11 ++++++---
airflow/utils/timeout.py | 37 +++++++++++++++++++++++++++-
5 files changed, 75 insertions(+), 14 deletions(-)
diff --git a/airflow/task/task_runner/base_task_runner.py
b/airflow/task/task_runner/base_task_runner.py
index 5551508..64d528b 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -19,6 +19,13 @@
import os
import subprocess
import threading
+
+from airflow.utils.platform import IS_WINDOWS
+
+if not IS_WINDOWS:
+ # ignored to avoid flake complaining on Linux
+ from pwd import getpwnam # noqa
+
from tempfile import NamedTemporaryFile
from typing import Optional, Union
@@ -136,15 +143,25 @@ class BaseTaskRunner(LoggingMixin):
self.log.info("Running on host: %s", get_hostname())
self.log.info('Running: %s', full_cmd)
- proc = subprocess.Popen(
- full_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True,
- close_fds=True,
- env=os.environ.copy(),
- preexec_fn=os.setsid,
- )
+ if IS_WINDOWS:
+ proc = subprocess.Popen(
+ full_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True,
+ close_fds=True,
+ env=os.environ.copy(),
+ )
+ else:
+ proc = subprocess.Popen(
+ full_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True,
+ close_fds=True,
+ env=os.environ.copy(),
+ preexec_fn=os.setsid,
+ )
# Start daemon thread to read subprocess logging output
log_reader = threading.Thread(
diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py
index cc9273c..ec810a9 100644
--- a/airflow/utils/configuration.py
+++ b/airflow/utils/configuration.py
@@ -21,6 +21,7 @@ import os
from tempfile import mkstemp
from airflow.configuration import conf
+from airflow.utils.platform import IS_WINDOWS
def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
@@ -44,7 +45,7 @@ def tmp_configuration_copy(chmod=0o600, include_env=True,
include_cmds=True):
with os.fdopen(temp_fd, 'w') as temp_file:
# Set the permissions before we write anything to it.
- if chmod is not None:
+ if chmod is not None and not IS_WINDOWS:
os.fchmod(temp_fd, chmod)
json.dump(cfg_dict, temp_file)
diff --git a/airflow/utils/platform.py b/airflow/utils/platform.py
index 0c1db2d..0b36946 100644
--- a/airflow/utils/platform.py
+++ b/airflow/utils/platform.py
@@ -20,8 +20,11 @@ import getpass
import logging
import os
import pkgutil
+import platform
import sys
+IS_WINDOWS = platform.system() == 'Windows'
+
log = logging.getLogger(__name__)
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 3b12115..a369bd6 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -20,14 +20,19 @@
import errno
import logging
import os
-import pty
import select
import shlex
import signal
import subprocess
import sys
-import termios
-import tty
+
+from airflow.utils.platform import IS_WINDOWS
+
+if not IS_WINDOWS:
+ import tty
+ import termios
+ import pty
+
from contextlib import contextmanager
from typing import Dict, List
diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py
index 22a0faf..6b4464b 100644
--- a/airflow/utils/timeout.py
+++ b/airflow/utils/timeout.py
@@ -18,12 +18,41 @@
import os
import signal
+from threading import Timer
+from typing import ContextManager, Optional, Type
from airflow.exceptions import AirflowTaskTimeout
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.platform import IS_WINDOWS
+_timeout = ContextManager[None]
-class timeout(LoggingMixin):
+
+class _timeout_windows(_timeout, LoggingMixin):
+ def __init__(self, seconds=1, error_message='Timeout'):
+ super().__init__()
+ self._timer: Optional[Timer] = None
+ self.seconds = seconds
+ self.error_message = error_message + ', PID: ' + str(os.getpid())
+
+ def handle_timeout(self, *args): # pylint: disable=unused-argument
+ """Logs information and raises AirflowTaskTimeout."""
+ self.log.error("Process timed out, PID: %s", str(os.getpid()))
+ raise AirflowTaskTimeout(self.error_message)
+
+ def __enter__(self):
+ if self._timer:
+ self._timer.cancel()
+ self._timer = Timer(self.seconds, self.handle_timeout)
+ self._timer.start()
+
+ def __exit__(self, type_, value, traceback):
+ if self._timer:
+ self._timer.cancel()
+ self._timer = None
+
+
+class _timeout_posix(_timeout, LoggingMixin):
"""To be used in a ``with`` block and timeout its content."""
def __init__(self, seconds=1, error_message='Timeout'):
@@ -48,3 +77,9 @@ class timeout(LoggingMixin):
signal.setitimer(signal.ITIMER_REAL, 0)
except ValueError:
self.log.warning("timeout can't be used in the current context",
exc_info=True)
+
+
+if IS_WINDOWS:
+ timeout: Type[_timeout] = _timeout_windows
+else:
+ timeout = _timeout_posix