potiuk commented on a change in pull request #16110: URL: https://github.com/apache/airflow/pull/16110#discussion_r650834156
########## File path: airflow/task/task_runner/base_task_runner.py ########## @@ -141,7 +143,7 @@ def run_command(self, run_with=None): universal_newlines=True, close_fds=True, env=os.environ.copy(), - preexec_fn=os.setsid, + start_new_session=True Review comment: I guess that one we should do regardless form the windows support to avoid race condition. ########## File path: airflow/utils/timeout.py ########## @@ -17,21 +17,49 @@ # under the License. import os +from threading import Timer import signal - +from airflow.utils.platform import IS_WINDOWS from airflow.exceptions import AirflowTaskTimeout from airflow.utils.log.logging_mixin import LoggingMixin +from typing import ContextManager, Type + +_timeout = ContextManager[None] -class timeout(LoggingMixin): # pylint: disable=invalid-name - """To be used in a ``with`` block and timeout its content.""" +class _timeout_windows(_timeout, LoggingMixin): def __init__(self, seconds=1, error_message='Timeout'): super().__init__() + self._timer: Timer = None Review comment: This is more correct (need also Optional import). In the past (maybe even still) `mypy` will add Optional automatically if the default is = None but it has been deprecated https://stackoverflow.com/questions/62732402/can-i-omit-optional-if-i-set-default-to-none ```suggestion self._timer: Optional[Timer] = None ``` ########## File path: setup.cfg ########## @@ -156,6 +156,9 @@ install_requires = typing-extensions>=3.7.4;python_version<"3.8" unicodecsv>=0.14.1 werkzeug~=1.0, >=1.0.1 + # needed for generating virtual python environments when running tasks + virtualenv>=20.4.3 + psycopg2>=2.8.6 Review comment: They should both be removed from here. They are already present as extras. You should simply install airflow with postgres and virtualenv extras: ``` pip install "apache-airflow[postgres,virtualenv]" --constraint \ https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt ``` ########## File path: airflow/__main__.py ########## @@ -34,6 +35,15 @@ def main(): os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache') os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab') + # if dags folder has to be set to configured value, make sure it is set properly (needed on Dask-Workers) Review comment: One more comment here. If we are talking about scheduler running on Linux/Unix and worker running on Windows, we also have problem with directory separator (`/` -> `\`) for dags that are in sub-folders. I thought we could make airflow to accept the file URI instead in the command line: https://datatracker.ietf.org/doc/html/rfc8089 but it does not support relative paths (officially at least). So probably the best approach is to make Airlfow replaces `/` to `\` on windows. ########## File path: airflow/utils/process_utils.py ########## @@ -155,11 +162,13 @@ def execute_interactive(cmd: List[str], **kwargs): """ log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) - old_tty = termios.tcgetattr(sys.stdin) - tty.setraw(sys.stdin.fileno()) + if not IS_WINDOWS: + old_tty = termios.tcgetattr(sys.stdin) + tty.setraw(sys.stdin.fileno()) + + # open pseudo-terminal to interact with subprocess + master_fd, slave_fd = pty.openpty() Review comment: I think this method is needlessly complicated and can be simplified to work same on both win/linux. We are not interested in the command output here. Simple `os.system(cmd)` should do the job on both Windows and Linux. We just need to change the cmd to be string and format it appropriately using `"` around filenames (sqlite and mysql) and find where the binaries are with `shutil.which` I tested on linux that this works (for linux but shutil.which is cross-platform and should work on windows too). ``` python -c "import os,shutil; os.system(shutil.which('sqlite3'))" ``` ########## File path: airflow/utils/process_utils.py ########## @@ -79,7 +86,7 @@ def signal_procs(sig): else: raise - if pgid == os.getpgid(0): + if IS_WINDOWS and pgid == os.getpid() or not IS_WINDOWS and pgid == os.getpgid(0): Review comment: :D Probably best to: ``` my_pgid = os.getpid() if IS_WINDOWS else os.getpgid(0) if pgid == my_pgid: ... ``` By the way maybe also `I refuse to kill myself` -> `I refuse to send signal to myself` ########## File path: airflow/models/baseoperator.py ########## @@ -955,7 +955,7 @@ def __deepcopy__(self, memo): for k, v in self.__dict__.items(): if k not in shallow_copy: - setattr(result, k, copy.deepcopy(v, memo)) # noqa + setattr(result, k, v if type(v).__name__ == 'module' else copy.deepcopy(v, memo)) # modules cannot be pickled Review comment: Would be great to get clarity here indeed . ########## File path: airflow/utils/log/file_task_handler.py ########## @@ -82,14 +83,28 @@ def _render_filename(self, ti, try_number): 'ts': ti.execution_date.isoformat(), 'try_number': try_number, } - return self.filename_jinja_template.render(**jinja_context) - - return self.filename_template.format( - dag_id=ti.dag_id, - task_id=ti.task_id, - execution_date=ti.execution_date.isoformat(), - try_number=try_number, - ) + result_name = self.filename_jinja_template.render(**jinja_context) + else: + result_name = self.filename_template.format( + dag_id=ti.dag_id, + task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), + try_number=try_number, + ) + + # replace ":" with "_" for windows systems + if is_windows() and ':' in result_name: + print(''.join([ + 'WARNING: Log file template contains ":" characters ', + 'which cannot be used on Windows systems.\n\n', + 'Please modify "airflow.cfg":\n', + '\t e.g. log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log -> ', + 'log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts | replace(":", "_") }}}}/{{{{ try_number }}}}.log\n\n', + 'The offending ":" have been replaced with "_" characeters but note that this might cause other systems ', + 'which are configured differently to not find the log files.' + ])) + result_name = result_name.replace(':', '_') + return result_name Review comment: I'd also be for it. The warning is fine to keep like it, but we could simply change default configuration of log_filename_template to contain the replace filter. This will keep backwards compatibility for people who already have airflow but all new installations will use the ones with replace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org