uranusjr commented on a change in pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#discussion_r640482527



##########
File path: airflow/task/task_runner/base_task_runner.py
##########
@@ -86,6 +87,10 @@ def __init__(self, local_task_job):
 
         # pylint: disable=consider-using-with
         self._error_file = NamedTemporaryFile(delete=True)
+
+        # HOTFIX: When reporting exceptions, this file was usually locked 
because it was still opened by this process
+        self._error_file.close()

Review comment:
       This looks wrong. [The documentation says this would destroy the file 
immediately](https://docs.python.org/3/library/tempfile.html#tempfile.NamedTemporaryFile),
 rendering this attribute useless.

##########
File path: airflow/operators/python.py
##########
@@ -377,9 +377,20 @@ def execute_callable(self):
                 
render_template_as_native_obj=self.dag.render_template_as_native_obj,
             )
 
+            # find python executable folder
+            candidates = [os.path.join(tmp_dir, 'bin'), os.path.join(tmp_dir, 
'scripts')]
+            python_folder = None
+            for candidate in candidates:
+                if os.path.isdir(candidate):
+                    python_folder = candidate
+                    break
+
+            if python_folder is None:
+                raise AirflowException(f'Unable to find python executable in 
"{tempdir}"')
+
             execute_in_subprocess(
                 cmd=[
-                    f'{tmp_dir}/bin/python',
+                    os.path.join(python_folder, 'python'),

Review comment:
       I think this can be simplified with `shutil.which`. Instead of trying to 
find a parent directory to `os.path.join` later:
   
   ```python
   candidate_dirs = [os.path.join(tmp_dir, "bin"), os.path.join(tmp_dir, 
"Scripts")]
   python_executable = shutil.which("python", 
path=os.pathsep.join(candidate_dirs))
   if not python_executable:
       raise AirflowException(...)
   execute_in_subprocess(cmd=[python_executable, ...])
   ```
   
   Also, I feel the `AirflowException` should show `tmp_dir` instead. Showing 
`tempdir` can be misleading because the Python executable isn’t directly in the 
temp directory, but subdirectory `tmp_dir`.

##########
File path: airflow/utils/process_utils.py
##########
@@ -20,14 +20,20 @@
 import errno
 import logging
 import os
-import pty
 import select
 import shlex
 import signal
 import subprocess
 import sys
-import termios
-import tty
+from airflow.utils.platform_utils import is_windows
+
+if is_windows():
+    from airflow.windows_extensions import termios, tty, pty
+else:
+    import pty
+    import termios
+    import tty

Review comment:
       I feel we should have a global shim (maybe a new `airflow.platform` that 
works similarly to `airflow.compat`) and use it in all places instead of doing 
`if is_windows()` everywhere.

##########
File path: airflow/utils/platform_utils.py
##########
@@ -0,0 +1,16 @@
+import platform
+
+def is_windows() -> bool:
+    """
+    Returns true if executing system is Windows
+    """
+    return platform.system() == 'Windows'

Review comment:
       Would be a good idea to `functools.lru_cache` this.

##########
File path: airflow/task/task_runner/base_task_runner.py
##########
@@ -133,16 +138,28 @@ def run_command(self, run_with=None):
 
         self.log.info("Running on host: %s", get_hostname())
         self.log.info('Running: %s', full_cmd)
-        # pylint: disable=subprocess-popen-preexec-fn,consider-using-with
-        proc = subprocess.Popen(
-            full_cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            universal_newlines=True,
-            close_fds=True,
-            env=os.environ.copy(),
-            preexec_fn=os.setsid,
-        )
+
+        if is_windows():
+            # pylint: disable=subprocess-popen-preexec-fn,consider-using-with
+            proc = subprocess.Popen(
+                full_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                universal_newlines=True,
+                close_fds=True,
+                env=os.environ.copy()
+            )
+        else:
+            # pylint: disable=subprocess-popen-preexec-fn,consider-using-with
+            proc = subprocess.Popen(
+                full_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                universal_newlines=True,
+                close_fds=True,
+                env=os.environ.copy(),
+                preexec_fn=os.setsid,       # does not exist on Windows
+            )

Review comment:
       I think we can use `start_new_session=True` instead. This options is a 
shorthand to `preexec_fn=os.setsid` if available, and silently ignored on 
Windows (if I’m understanding the documentation

##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -82,14 +83,28 @@ def _render_filename(self, ti, try_number):
                     'ts': ti.execution_date.isoformat(),
                     'try_number': try_number,
                 }
-            return self.filename_jinja_template.render(**jinja_context)
-
-        return self.filename_template.format(
-            dag_id=ti.dag_id,
-            task_id=ti.task_id,
-            execution_date=ti.execution_date.isoformat(),
-            try_number=try_number,
-        )
+            result_name = self.filename_jinja_template.render(**jinja_context)
+        else:
+            result_name = self.filename_template.format(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                execution_date=ti.execution_date.isoformat(),
+                try_number=try_number,
+            )
+
+        # replace ":" with "_" for windows systems
+        if is_windows() and ':' in result_name:
+            print(''.join([
+                'WARNING: Log file template contains ":" characters ',
+                'which cannot be used on Windows systems.\n\n',
+                'Please modify "airflow.cfg":\n',
+                '\t e.g. log_filename_template = {{{{ ti.dag_id }}}}/{{{{ 
ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log -> ',
+                'log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id 
}}}}/{{{{ ts | replace(":", "_") }}}}/{{{{ try_number }}}}.log\n\n',
+                'The offending ":" have been replaced with "_" characeters but 
note that this might cause other systems ',
+                'which are configured differently to not find the log files.'
+            ]))
+            result_name = result_name.replace(':', '_')
+        return result_name

Review comment:
       I wonder if we should just always replace `:` everywhere. It’s only 
there because of the datetime.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to