uranusjr commented on a change in pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#discussion_r649381846
##########
File path: airflow/utils/process_utils.py
##########
@@ -79,7 +86,7 @@ def signal_procs(sig):
else:
raise
- if pgid == os.getpgid(0):
+ if IS_WINDOWS and pgid == os.getpid() or not IS_WINDOWS and pgid ==
os.getpgid(0):
Review comment:
I think this needs some parentheses? At least for readability.
##########
File path: setup.cfg
##########
@@ -156,6 +156,9 @@ install_requires =
typing-extensions>=3.7.4;python_version<"3.8"
unicodecsv>=0.14.1
werkzeug~=1.0, >=1.0.1
+ # needed for generating virtual python environments when running tasks
+ virtualenv>=20.4.3
+ psycopg2>=2.8.6
Review comment:
Why is psycopg needed?
##########
File path: airflow/utils/process_utils.py
##########
@@ -155,11 +162,13 @@ def execute_interactive(cmd: List[str], **kwargs):
"""
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
- old_tty = termios.tcgetattr(sys.stdin)
- tty.setraw(sys.stdin.fileno())
+ if not IS_WINDOWS:
+ old_tty = termios.tcgetattr(sys.stdin)
+ tty.setraw(sys.stdin.fileno())
+
+ # open pseudo-terminal to interact with subprocess
+ master_fd, slave_fd = pty.openpty()
Review comment:
I don’t think the function works after the patch (at least `slave_fd`
would become undefined). Since this function is only used to run database
commands, maybe we should mark this function as POSIX-only, and implement a
separate function for Windows. (Would it be enough to simply brige
stdin/out/err to the subprocess?)
##########
File path: airflow/utils/timeout.py
##########
@@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message + ', PID: ' + str(os.getpid())
- def handle_timeout(self, signum, frame): # pylint: disable=unused-argument
+ def handle_timeout(self, *args): # pylint: disable=unused-argument
"""Logs information and raises AirflowTaskTimeout."""
self.log.error("Process timed out, PID: %s", str(os.getpid()))
raise AirflowTaskTimeout(self.error_message)
def __enter__(self):
try:
- signal.signal(signal.SIGALRM, self.handle_timeout)
- signal.setitimer(signal.ITIMER_REAL, self.seconds)
+ if IS_WINDOWS:
+ if hasattr(self, TIMER_THREAD_ATTR) and getattr(self,
TIMER_THREAD_ATTR) is not None:
+ getattr(self, TIMER_THREAD_ATTR).cancel()
+ timer = Timer(self.seconds, self.handle_timeout)
+ setattr(self, TIMER_THREAD_ATTR, timer)
+ timer.start()
Review comment:
The exception message below (about hte current context) doesn’t apply to
the thread-based implementation, so we should move the `if IS_WINDOWS:` block
out of `try:`.
##########
File path: airflow/utils/timeout.py
##########
@@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message + ', PID: ' + str(os.getpid())
- def handle_timeout(self, signum, frame): # pylint: disable=unused-argument
+ def handle_timeout(self, *args): # pylint: disable=unused-argument
"""Logs information and raises AirflowTaskTimeout."""
self.log.error("Process timed out, PID: %s", str(os.getpid()))
raise AirflowTaskTimeout(self.error_message)
def __enter__(self):
try:
- signal.signal(signal.SIGALRM, self.handle_timeout)
- signal.setitimer(signal.ITIMER_REAL, self.seconds)
+ if IS_WINDOWS:
+ if hasattr(self, TIMER_THREAD_ATTR) and getattr(self,
TIMER_THREAD_ATTR) is not None:
+ getattr(self, TIMER_THREAD_ATTR).cancel()
+ timer = Timer(self.seconds, self.handle_timeout)
+ setattr(self, TIMER_THREAD_ATTR, timer)
+ timer.start()
Review comment:
BTW why do we need to use `has|get|set_attr` here? Why not put them on
the instance instead, they would just be unused on POSIX. Or we can do
something like
```python
_timeout = ContextManager[None]
class _timeout_windows(_timeout):
... # Implementation for Windows.
class _timeout_posix(_timeout):
... # Implementation for POSIX.
if IS_WINDOWS:
timeout: Type[_timeout] = _timeout_windows
else:
timeout = _timeout_posix
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]