potiuk commented on a change in pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#discussion_r650834156



##########
File path: airflow/task/task_runner/base_task_runner.py
##########
@@ -141,7 +143,7 @@ def run_command(self, run_with=None):
             universal_newlines=True,
             close_fds=True,
             env=os.environ.copy(),
-            preexec_fn=os.setsid,
+            start_new_session=True

Review comment:
       I guess that one we should do regardless form the windows support to 
avoid race condition.

##########
File path: airflow/utils/timeout.py
##########
@@ -17,21 +17,49 @@
 # under the License.
 
 import os
+from threading import Timer
 import signal
-
+from airflow.utils.platform import IS_WINDOWS
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.utils.log.logging_mixin import LoggingMixin
+from typing import ContextManager, Type
+
+_timeout = ContextManager[None]
 
 
-class timeout(LoggingMixin):  # pylint: disable=invalid-name
-    """To be used in a ``with`` block and timeout its content."""
+class _timeout_windows(_timeout, LoggingMixin):
 
     def __init__(self, seconds=1, error_message='Timeout'):
         super().__init__()
+        self._timer: Timer = None

Review comment:
       This is more correct (need also Optional import).
   
   In the past (maybe even still) `mypy` will add Optional automatically if the 
default  is = None but it has been deprecated 
https://stackoverflow.com/questions/62732402/can-i-omit-optional-if-i-set-default-to-none
   
   ```suggestion
           self._timer: Optional[Timer] = None
   ```

##########
File path: setup.cfg
##########
@@ -156,6 +156,9 @@ install_requires =
     typing-extensions>=3.7.4;python_version<"3.8"
     unicodecsv>=0.14.1
     werkzeug~=1.0, >=1.0.1
+    # needed for generating virtual python environments when running tasks
+    virtualenv>=20.4.3
+    psycopg2>=2.8.6

Review comment:
       They should both be removed from here. They are already  present as 
extras. You should simply install airflow with postgres and virtualenv extras:
   
   ```
   pip install "apache-airflow[postgres,virtualenv]" --constraint \
   
https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt
   ```
   
   

##########
File path: airflow/__main__.py
##########
@@ -34,6 +35,15 @@ def main():
         os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
         os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
 
+    # if dags folder has to be set to configured value, make sure it is set 
properly (needed on Dask-Workers)

Review comment:
       One more comment here. If we are talking about scheduler running on 
Linux/Unix and worker running on Windows, we also have problem with directory 
separator (`/` -> `\`) for dags that are in sub-folders. 
   
   I thought we could make airflow to accept  the file URI  instead in the 
command line: https://datatracker.ietf.org/doc/html/rfc8089  but it does not 
support relative paths (officially at least).
   
   So probably the best approach is to make Airlfow replaces `/` to `\` on 
windows.

##########
File path: airflow/utils/process_utils.py
##########
@@ -155,11 +162,13 @@ def execute_interactive(cmd: List[str], **kwargs):
     """
     log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
 
-    old_tty = termios.tcgetattr(sys.stdin)
-    tty.setraw(sys.stdin.fileno())
+    if not IS_WINDOWS:
+        old_tty = termios.tcgetattr(sys.stdin)
+        tty.setraw(sys.stdin.fileno())
+
+        # open pseudo-terminal to interact with subprocess
+        master_fd, slave_fd = pty.openpty()

Review comment:
       I think this method is needlessly complicated and can be simplified to 
work same on both win/linux. 
   
   We are not interested in the command output here. Simple `os.system(cmd)` 
should do the job on both Windows and Linux. We just need to change the cmd to 
be string and format it appropriately using `"` around filenames (sqlite and 
mysql) and find where the binaries are with `shutil.which`
   
   I tested on linux that this works (for linux but shutil.which is 
cross-platform and should work on windows too).
   
   ```
   python -c "import os,shutil; os.system(shutil.which('sqlite3'))"
   ```

##########
File path: airflow/utils/process_utils.py
##########
@@ -79,7 +86,7 @@ def signal_procs(sig):
             else:
                 raise
 
-    if pgid == os.getpgid(0):
+    if IS_WINDOWS and pgid == os.getpid() or not IS_WINDOWS and pgid == 
os.getpgid(0):

Review comment:
       :D Probably best to:
   
   ```
   my_pgid  = os.getpid() if IS_WINDOWS else os.getpgid(0)
   if pgid == my_pgid:
     ...
   ```
   
   By the way maybe also `I refuse to kill myself` -> `I refuse to send signal 
to myself` 

##########
File path: airflow/models/baseoperator.py
##########
@@ -955,7 +955,7 @@ def __deepcopy__(self, memo):
 
         for k, v in self.__dict__.items():
             if k not in shallow_copy:
-                setattr(result, k, copy.deepcopy(v, memo))  # noqa
+                setattr(result, k, v if type(v).__name__ == 'module' else 
copy.deepcopy(v, memo))   # modules cannot be pickled

Review comment:
       Would be great to get clarity here indeed .

##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -82,14 +83,28 @@ def _render_filename(self, ti, try_number):
                     'ts': ti.execution_date.isoformat(),
                     'try_number': try_number,
                 }
-            return self.filename_jinja_template.render(**jinja_context)
-
-        return self.filename_template.format(
-            dag_id=ti.dag_id,
-            task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(),
-            try_number=try_number,
-        )
+            result_name = self.filename_jinja_template.render(**jinja_context)
+        else:
+            result_name = self.filename_template.format(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                execution_date=ti.execution_date.isoformat(),
+                try_number=try_number,
+            )
+
+        # replace ":" with "_" for windows systems
+        if is_windows() and ':' in result_name:
+            print(''.join([
+                'WARNING: Log file template contains ":" characters ',
+                'which cannot be used on Windows systems.\n\n',
+                'Please modify "airflow.cfg":\n',
+                '\t e.g. log_filename_template = {{{{ ti.dag_id }}}}/{{{{ 
ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log -> ',
+                'log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id 
}}}}/{{{{ ts | replace(":", "_") }}}}/{{{{ try_number }}}}.log\n\n',
+                'The offending ":" have been replaced with "_" characeters but 
note that this might cause other systems ',
+                'which are configured differently to not find the log files.'
+            ]))
+            result_name = result_name.replace(':', '_')
+        return result_name

Review comment:
       I'd also be for it. The warning is fine to keep like it, but we could 
simply change default configuration of log_filename_template to contain the 
replace filter. This will keep backwards compatibility for people who already 
have airflow but all new installations will use the ones with replace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to