[AIRFLOW-1559] Close file handles in subprocesses All file descriptors except 0, 1 and 2 will be closed before the child process is executed. This is the default on Python 3.2 and above. This patch ensures consistent behaviour for older Python versions.
Resources will be released once the main thread disposes them, independent of the longevity of its subprocesses. Background information: * https://www.python.org/dev/peps/pep-0446/ * https://bugs.python.org/issue7213 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5a303ebb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5a303ebb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5a303ebb Branch: refs/heads/master Commit: 5a303ebbc572cee7c9c30be84ebf625357360d4b Parents: 3bde95e Author: Stephan Erb <[email protected]> Authored: Sat Nov 25 21:16:36 2017 +0100 Committer: Stephan Erb <[email protected]> Committed: Wed Nov 29 09:46:42 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 8 ++++---- airflow/configuration.py | 5 ++++- airflow/contrib/hooks/gcp_dataflow_hook.py | 8 ++++++-- airflow/contrib/hooks/ssh_hook.py | 3 ++- airflow/executors/celery_executor.py | 2 +- airflow/executors/dask_executor.py | 2 +- airflow/executors/local_executor.py | 2 +- airflow/executors/sequential_executor.py | 2 +- airflow/hooks/hive_hooks.py | 3 ++- airflow/hooks/pig_hook.py | 3 ++- airflow/minihivecluster.py | 3 ++- airflow/operators/python_operator.py | 4 +++- airflow/operators/s3_file_transform_operator.py | 2 +- airflow/security/kerberos.py | 2 +- airflow/task_runner/base_task_runner.py | 11 +++++++---- airflow/utils/helpers.py | 2 +- 16 files changed, 39 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 6d01293..4e56d54 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -771,7 +771,7 @@ def webserver(args): }, ) with ctx: - subprocess.Popen(run_args) + subprocess.Popen(run_args, close_fds=True) # Reading pid file directly, since Popen#pid doesn't # seem to return the right value with DaemonContext. @@ -790,7 +790,7 @@ def webserver(args): stdout.close() stderr.close() else: - gunicorn_master_proc = subprocess.Popen(run_args) + gunicorn_master_proc = subprocess.Popen(run_args, close_fds=True) signal.signal(signal.SIGINT, kill_proc) signal.signal(signal.SIGTERM, kill_proc) @@ -881,7 +881,7 @@ def worker(args): stderr=stderr, ) with ctx: - sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) + sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True) worker.run(**options) sp.kill() @@ -891,7 +891,7 @@ def worker(args): signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) - sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) + sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True) worker.run(**options) sp.kill() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index ff81d98..d61afb7 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -85,7 +85,10 @@ def run_command(command): Runs command and returns stdout """ process = subprocess.Popen( - shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + shlex.split(command), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True) output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore') for stream in process.communicate()] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index b1e1474..1928c3b 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -91,8 +91,12 @@ class _DataflowJob(LoggingMixin): class _Dataflow(LoggingMixin): def __init__(self, cmd): self.log.info("Running command: %s", ' '.join(cmd)) - self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + self._proc = subprocess.Popen( + cmd, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True) def _line(self, fd): if fd == self._proc.stderr.fileno(): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/contrib/hooks/ssh_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py index a85911b..4b60405 100755 --- a/airflow/contrib/hooks/ssh_hook.py +++ b/airflow/contrib/hooks/ssh_hook.py @@ -202,7 +202,8 @@ class SSHHook(BaseHook, LoggingMixin): proc = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + close_fds=True) ready = proc.stdout.read(5) assert ready == b"ready", \ "Did not get 'ready' from remote, got '{0}' instead".format(ready) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index d3809b3..4827f03 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -49,7 +49,7 @@ def execute_command(command): log = LoggingMixin().log log.info("Executing command in Celery: %s", command) try: - subprocess.check_call(command, shell=True) + subprocess.check_call(command, shell=True, close_fds=True) except subprocess.CalledProcessError as e: log.error(e) raise AirflowException('Celery command failed') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 07b8a82..70edbe5 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -44,7 +44,7 @@ class DaskExecutor(BaseExecutor): ) def airflow_run(): - return subprocess.check_call(command, shell=True) + return subprocess.check_call(command, shell=True, close_fds=True) future = self.client.submit(airflow_run, pure=False) self.futures[future] = key http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/local_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 71bee22..1d18d6d 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -79,7 +79,7 @@ class LocalWorker(multiprocessing.Process, LoggingMixin): self.log.info("%s running %s", self.__class__.__name__, command) command = "exec bash -c '{0}'".format(command) try: - subprocess.check_call(command, shell=True) + subprocess.check_call(command, shell=True, close_fds=True) state = State.SUCCESS except subprocess.CalledProcessError as e: state = State.FAILED http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/executors/sequential_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index a15450d..ec51694 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -40,7 +40,7 @@ class SequentialExecutor(BaseExecutor): self.log.info("Executing command: %s", command) try: - subprocess.check_call(command, shell=True) + subprocess.check_call(command, shell=True, close_fds=True) self.change_state(key, State.SUCCESS) except subprocess.CalledProcessError as e: self.change_state(key, State.FAILED) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index a8f9c8f..eb39469 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -204,7 +204,8 @@ class HiveCliHook(BaseHook): hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - cwd=tmp_dir) + cwd=tmp_dir, + close_fds=True) self.sp = sp stdout = '' while True: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/hooks/pig_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py index 276b37a..e7e6a68 100644 --- a/airflow/hooks/pig_hook.py +++ b/airflow/hooks/pig_hook.py @@ -67,7 +67,8 @@ class PigCliHook(BaseHook): pig_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - cwd=tmp_dir) + cwd=tmp_dir, + close_fds=True) self.sp = sp stdout = '' for line in iter(sp.stdout.readline, ''): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/minihivecluster.py ---------------------------------------------------------------------- diff --git a/airflow/minihivecluster.py b/airflow/minihivecluster.py index b975f27..c5441c6 100644 --- a/airflow/minihivecluster.py +++ b/airflow/minihivecluster.py @@ -30,7 +30,8 @@ class MiniHiveCluster(object): cmd = ["java", "-cp", classpath, self._minicluster_class] self.hive = subprocess.Popen(cmd, bufsize=0, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=True) + stderr=subprocess.PIPE, universal_newlines=True, + close_fds=True) def terminate(self): self.hive.terminate() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 18e7bce..b20afae 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -275,7 +275,9 @@ class PythonVirtualenvOperator(PythonOperator): def _execute_in_subprocess(self, cmd): try: self.log.info("Executing cmd\n{}".format(cmd)) - output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + output = subprocess.check_output(cmd, + stderr=subprocess.STDOUT, + close_fds=True) if output: self.log.info("Got output\n{}".format(output)) except subprocess.CalledProcessError as e: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/operators/s3_file_transform_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index e105e3d..a27a782 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -88,7 +88,7 @@ class S3FileTransformOperator(BaseOperator): source_s3.connection.close() transform_script_process = subprocess.Popen( [self.transform_script, f_source.name, f_dest.name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate() self.log.info("Transform script stdout %s", transform_script_stdoutdata) if transform_script_process.returncode > 0: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/security/kerberos.py ---------------------------------------------------------------------- diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index 7a169b2..55566a0 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -73,7 +73,7 @@ def perform_krb181_workaround(): log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " + " ".join(cmdv)) - ret = subprocess.call(cmdv) + ret = subprocess.call(cmdv, close_fds=True) if ret != 0: principal = "%s/%s" % (configuration.get('kerberos', 'principal'), socket.getfqdn()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/task_runner/base_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index f4b4f2d..2fa33a9 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -66,10 +66,12 @@ class BaseTaskRunner(LoggingMixin): # Give ownership of file to user; only they can read and write subprocess.call( - ['sudo', 'chown', self.run_as_user, cfg_path] + ['sudo', 'chown', self.run_as_user, cfg_path], + close_fds=True ) subprocess.call( - ['sudo', 'chmod', '600', cfg_path] + ['sudo', 'chmod', '600', cfg_path], + close_fds=True ) with os.fdopen(temp_fd, 'w') as temp_file: @@ -117,7 +119,8 @@ class BaseTaskRunner(LoggingMixin): full_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - universal_newlines=True + universal_newlines=True, + close_fds=True, ) # Start daemon thread to read subprocess logging output @@ -154,4 +157,4 @@ class BaseTaskRunner(LoggingMixin): A callback that should be called when this is done running. """ if self._cfg_path and os.path.isfile(self._cfg_path): - subprocess.call(['sudo', 'rm', self._cfg_path]) + subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a303ebb/airflow/utils/helpers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 4b8944e..6a70725 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -192,7 +192,7 @@ def kill_using_shell(logger, pid, signal=signal.SIGTERM): else: args = ["kill", "-{}".format(int(signal)), str(pid)] # PID may not exist and return a non-zero error code - logger.error(subprocess.check_output(args)) + logger.error(subprocess.check_output(args, close_fds=True)) logger.info("Killed process {} with signal {}".format(pid, signal)) return True except psutil.NoSuchProcess as e:
