uranusjr commented on a change in pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#discussion_r640482527
##########
File path: airflow/task/task_runner/base_task_runner.py
##########
@@ -86,6 +87,10 @@ def __init__(self, local_task_job):
# pylint: disable=consider-using-with
self._error_file = NamedTemporaryFile(delete=True)
+
+ # HOTFIX: When reporting exceptions, this file was usually locked
because it was still opened by this process
+ self._error_file.close()
Review comment:
This looks wrong. [The documentation says this would destroy the file
immediately](https://docs.python.org/3/library/tempfile.html#tempfile.NamedTemporaryFile),
rendering this attribute useless.
##########
File path: airflow/operators/python.py
##########
@@ -377,9 +377,20 @@ def execute_callable(self):
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)
+ # find python executable folder
+ candidates = [os.path.join(tmp_dir, 'bin'), os.path.join(tmp_dir,
'scripts')]
+ python_folder = None
+ for candidate in candidates:
+ if os.path.isdir(candidate):
+ python_folder = candidate
+ break
+
+ if python_folder is None:
+ raise AirflowException(f'Unable to find python executable in
"{tempdir}"')
+
execute_in_subprocess(
cmd=[
- f'{tmp_dir}/bin/python',
+ os.path.join(python_folder, 'python'),
Review comment:
I think this can be simplified with `shutil.which`. Instead of trying to
find a parent directory to `os.path.join` later:
```python
candidate_dirs = [os.path.join(tmp_dir, "bin"), os.path.join(tmp_dir,
"Scripts")]
python_executable = shutil.which("python",
path=os.pathsep.join(candidate_dirs))
if not python_executable:
raise AirflowException(...)
execute_in_subprocess(cmd=[python_executable, ...])
```
Also, I feel the `AirflowException` should show `tmp_dir` instead. Showing
`tempdir` can be misleading because the Python executable isn’t directly in the
temp directory, but subdirectory `tmp_dir`.
##########
File path: airflow/utils/process_utils.py
##########
@@ -20,14 +20,20 @@
import errno
import logging
import os
-import pty
import select
import shlex
import signal
import subprocess
import sys
-import termios
-import tty
+from airflow.utils.platform_utils import is_windows
+
+if is_windows():
+ from airflow.windows_extensions import termios, tty, pty
+else:
+ import pty
+ import termios
+ import tty
Review comment:
I feel we should have a global shim (maybe a new `airflow.platform` that
works similarly to `airflow.compat`) and use it in all places instead of doing
`if is_windows()` everywhere.
##########
File path: airflow/utils/platform_utils.py
##########
@@ -0,0 +1,16 @@
+import platform
+
+def is_windows() -> bool:
+ """
+ Returns true if executing system is Windows
+ """
+ return platform.system() == 'Windows'
Review comment:
Would be a good idea to `functools.lru_cache` this.
##########
File path: airflow/task/task_runner/base_task_runner.py
##########
@@ -133,16 +138,28 @@ def run_command(self, run_with=None):
self.log.info("Running on host: %s", get_hostname())
self.log.info('Running: %s', full_cmd)
- # pylint: disable=subprocess-popen-preexec-fn,consider-using-with
- proc = subprocess.Popen(
- full_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True,
- close_fds=True,
- env=os.environ.copy(),
- preexec_fn=os.setsid,
- )
+
+ if is_windows():
+ # pylint: disable=subprocess-popen-preexec-fn,consider-using-with
+ proc = subprocess.Popen(
+ full_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True,
+ close_fds=True,
+ env=os.environ.copy()
+ )
+ else:
+ # pylint: disable=subprocess-popen-preexec-fn,consider-using-with
+ proc = subprocess.Popen(
+ full_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True,
+ close_fds=True,
+ env=os.environ.copy(),
+ preexec_fn=os.setsid, # does not exist on Windows
+ )
Review comment:
I think we can use `start_new_session=True` instead. This options is a
shorthand to `preexec_fn=os.setsid` if available, and silently ignored on
Windows (if I’m understanding the documentation
##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -82,14 +83,28 @@ def _render_filename(self, ti, try_number):
'ts': ti.execution_date.isoformat(),
'try_number': try_number,
}
- return self.filename_jinja_template.render(**jinja_context)
-
- return self.filename_template.format(
- dag_id=ti.dag_id,
- task_id=ti.task_id,
- execution_date=ti.execution_date.isoformat(),
- try_number=try_number,
- )
+ result_name = self.filename_jinja_template.render(**jinja_context)
+ else:
+ result_name = self.filename_template.format(
+ dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ execution_date=ti.execution_date.isoformat(),
+ try_number=try_number,
+ )
+
+ # replace ":" with "_" for windows systems
+ if is_windows() and ':' in result_name:
+ print(''.join([
+ 'WARNING: Log file template contains ":" characters ',
+ 'which cannot be used on Windows systems.\n\n',
+ 'Please modify "airflow.cfg":\n',
+ '\t e.g. log_filename_template = {{{{ ti.dag_id }}}}/{{{{
ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log -> ',
+ 'log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id
}}}}/{{{{ ts | replace(":", "_") }}}}/{{{{ try_number }}}}.log\n\n',
+ 'The offending ":" have been replaced with "_" characeters but
note that this might cause other systems ',
+ 'which are configured differently to not find the log files.'
+ ]))
+ result_name = result_name.replace(':', '_')
+ return result_name
Review comment:
I wonder if we should just always replace `:` everywhere. It’s only
there because of the datetime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]