SledgeHammer01 commented on code in PR #41073:
URL: https://github.com/apache/airflow/pull/41073#discussion_r1696922772
##########
airflow/hooks/subprocess.py:
##########
@@ -61,41 +64,68 @@ def run_command(
or stdout
"""
self.log.info("Tmp dir root location: %s", gettempdir())
- with contextlib.ExitStack() as stack:
- if cwd is None:
- cwd =
stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
-
- def pre_exec():
- # Restore default signal disposition and invoke setsid
- for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
- if hasattr(signal, sig):
- signal.signal(getattr(signal, sig), signal.SIG_DFL)
- os.setsid()
-
- self.log.info("Running command: %s", command)
-
- self.sub_process = Popen(
- command,
- stdout=PIPE,
- stderr=STDOUT,
- cwd=cwd,
- env=env if env or env == {} else os.environ,
- preexec_fn=pre_exec,
- )
-
- self.log.info("Output:")
- line = ""
- if self.sub_process is None:
- raise RuntimeError("The subprocess should be created here and
is None!")
- if self.sub_process.stdout is not None:
- for raw_line in iter(self.sub_process.stdout.readline, b""):
- line = raw_line.decode(output_encoding,
errors="backslashreplace").rstrip()
- self.log.info("%s", line)
-
- self.sub_process.wait()
-
- self.log.info("Command exited with return code %s",
self.sub_process.returncode)
- return_code: int = self.sub_process.returncode
+
+ safe_cleanup = False
+
+ try:
+ with contextlib.ExitStack() as stack:
+ if cwd is None:
+ # TemporaryDirectory will call shutil.rmtree() internally
when the context exits. On
+ # Windows, shutil.rmtree() is unreliable and there is a
race condition, where even after
+ # self.sub_process.wait(), the process will still be
holding onto the directory causing
+ # an exception. The work-around is to call shutil.rmtree()
in a retry loop. If we're not
+ # running under Windows, shutil.rmtree() is reliable and
no retry loop is needed.
+
+ cwd =
stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
+ safe_cleanup = IS_WINDOWS
+
+ def pre_exec():
+ # Restore default signal disposition and invoke setsid
+ for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
+ if hasattr(signal, sig):
+ signal.signal(getattr(signal, sig), signal.SIG_DFL)
+ os.setsid()
+
+ self.log.info("Running command: %s", command)
+
+ self.sub_process = Popen(
+ command,
+ stdout=PIPE,
+ stderr=STDOUT,
+ cwd=cwd,
+ env=env if env or env == {} else os.environ,
+ preexec_fn=pre_exec if not IS_WINDOWS else None,
+ )
+
+ self.log.info("Output:")
+ line = ""
+ if self.sub_process is None:
+ raise RuntimeError("The subprocess should be created here
and is None!")
+ if self.sub_process.stdout is not None:
+ for raw_line in iter(self.sub_process.stdout.readline,
b""):
+ line = raw_line.decode(output_encoding,
errors="backslashreplace").rstrip()
+ self.log.info("%s", line)
+
+ self.sub_process.wait()
+ self.log.info("Command exited with return code %s",
self.sub_process.returncode)
+
+ return_code: int = self.sub_process.returncode
Review Comment:
@uranusjr fixed mypy issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]