[ 
https://issues.apache.org/jira/browse/AIRFLOW-2888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580727#comment-16580727
 ] 

ASF GitHub Bot commented on AIRFLOW-2888:
-----------------------------------------

bolkedebruin closed pull request #3740: [AIRFLOW-2888] Remove shell=True and 
bash from task launch
URL: https://github.com/apache/incubator-airflow/pull/3740
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index 4fda57663f..af10729085 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -5,6 +5,13 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Rename of BashTaskRunner to StandardTaskRunner
+
+BashTaskRunner has been renamed to StandardTaskRunner. It is the default task 
runner
+so you might need to update your config.
+
+`task_runner = StandardTaskRunner`
+
 ## Airflow 1.10
 
 Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` 
in your environment or
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 7a86e1f069..76c66c90f6 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -140,7 +140,7 @@ donot_pickle = False
 dagbag_import_timeout = 30
 
 # The class to use for running task instances in a subprocess
-task_runner = BashTaskRunner
+task_runner = StandardTaskRunner
 
 # If set, tasks without a `run_as_user` argument will be run with this user
 # Can be used to de-elevate a sudo user running Airflow when executing tasks
diff --git a/airflow/contrib/executors/mesos_executor.py 
b/airflow/contrib/executors/mesos_executor.py
index ff974ffc3c..0609d71cf2 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -162,7 +162,7 @@ def resourceOffers(self, driver, offers):
 
                 command = mesos_pb2.CommandInfo()
                 command.shell = True
-                command.value = cmd
+                command.value = " ".join(cmd)
                 task.command.MergeFrom(command)
 
                 # If docker image for airflow is specified in config then pull 
that
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index 88a5cf0a40..482a823809 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -203,8 +203,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, 
task_id, execution_da
             image=kube_executor_config.image or self.kube_config.kube_image,
             image_pull_policy=(kube_executor_config.image_pull_policy or
                                self.kube_config.kube_image_pull_policy),
-            cmds=['bash', '-cx', '--'],
-            args=[airflow_command],
+            cmds=airflow_command,
             labels={
                 'airflow-worker': worker_uuid,
                 'dag_id': dag_id,
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py 
b/airflow/contrib/task_runner/cgroup_task_runner.py
index faa2407f09..78a240f2db 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -117,7 +117,7 @@ def start(self):
                 "creating another one",
                 cgroups.get("cpu"), cgroups.get("memory")
             )
-            self.process = self.run_command(['bash', '-c'], join_args=True)
+            self.process = self.run_command()
             return
 
         # Create a unique cgroup name
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 701ac66f8b..8baed1a250 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -75,7 +75,7 @@ def queue_task_instance(
         # cfg_path is needed to propagate the config values if using 
impersonation
         # (run_as_user), given that there are different code paths running 
tasks.
         # For a long term solution we need to address AIRFLOW-1986
-        command = task_instance.command(
+        command = task_instance.command_as_list(
             local=True,
             mark_success=mark_success,
             ignore_all_deps=ignore_all_deps,
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 481daa5826..03a4b3b792 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -56,7 +56,7 @@ def execute_command(command):
     log.info("Executing command in Celery: %s", command)
     env = os.environ.copy()
     try:
-        subprocess.check_call(command, shell=True, stderr=subprocess.STDOUT,
+        subprocess.check_call(command, stderr=subprocess.STDOUT,
                               close_fds=True, env=env)
     except subprocess.CalledProcessError as e:
         log.exception('execute_command encountered a CalledProcessError')
@@ -84,7 +84,7 @@ def execute_async(self, key, command,
         self.log.info("[celery] queuing {key} through celery, "
                       "queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(
-            args=[command], queue=queue)
+            args=command, queue=queue)
         self.last_state[key] = celery_states.PENDING
 
     def sync(self):
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 0c85262324..291d6e1277 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -82,9 +82,8 @@ def execute_work(self, key, command):
         if key is None:
             return
         self.log.info("%s running %s", self.__class__.__name__, command)
-        command = "exec bash -c '{0}'".format(command)
         try:
-            subprocess.check_call(command, shell=True, close_fds=True)
+            subprocess.check_call(command, close_fds=True)
             state = State.SUCCESS
         except subprocess.CalledProcessError as e:
             state = State.FAILED
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 9c0d8ecf0c..1542e3318e 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -45,7 +45,7 @@ def sync(self):
             self.log.info("Executing command: %s", command)
 
             try:
-                subprocess.check_call(command, shell=True, close_fds=True)
+                subprocess.check_call(command, close_fds=True)
                 self.change_state(key, State.SUCCESS)
             except subprocess.CalledProcessError as e:
                 self.change_state(key, State.FAILED)
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7c21dd9ad2..d51e7c2537 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1325,7 +1325,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag, task_instanc
         # actually enqueue them
         for task_instance in task_instances:
             simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)
-            command = " ".join(TI.generate_command(
+            command = TI.generate_command(
                 task_instance.dag_id,
                 task_instance.task_id,
                 task_instance.execution_date,
@@ -1337,7 +1337,7 @@ def _enqueue_task_instances_with_queued_state(self, 
simple_dag_bag, task_instanc
                 ignore_ti_state=False,
                 pool=task_instance.pool,
                 file_path=simple_dag.full_filepath,
-                pickle_id=simple_dag.pickle_id))
+                pickle_id=simple_dag.pickle_id)
 
             priority = task_instance.priority_weight
             queue = task_instance.queue
diff --git a/airflow/task/task_runner/__init__.py 
b/airflow/task/task_runner/__init__.py
index 0edc020d41..5a30cf5dc4 100644
--- a/airflow/task/task_runner/__init__.py
+++ b/airflow/task/task_runner/__init__.py
@@ -18,7 +18,7 @@
 # under the License.
 
 from airflow import configuration
-from airflow.task.task_runner.bash_task_runner import BashTaskRunner
+from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
 from airflow.exceptions import AirflowException
 
 _TASK_RUNNER = configuration.conf.get('core', 'TASK_RUNNER')
@@ -34,8 +34,8 @@ def get_task_runner(local_task_job):
     :return: The task runner to use to run the task.
     :rtype: airflow.task.task_runner.base_task_runner.BaseTaskRunner
     """
-    if _TASK_RUNNER == "BashTaskRunner":
-        return BashTaskRunner(local_task_job)
+    if _TASK_RUNNER == "StandardTaskRunner":
+        return StandardTaskRunner(local_task_job)
     elif _TASK_RUNNER == "CgroupTaskRunner":
         from airflow.contrib.task_runner.cgroup_task_runner import 
CgroupTaskRunner
         return CgroupTaskRunner(local_task_job)
diff --git a/airflow/task/task_runner/base_task_runner.py 
b/airflow/task/task_runner/base_task_runner.py
index 337f6a63ba..0b195047cb 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -106,7 +106,7 @@ def _read_task_logs(self, stream):
                           self._task_instance.job_id, 
self._task_instance.task_id,
                           line.rstrip('\n'))
 
-    def run_command(self, run_with, join_args=False):
+    def run_command(self, run_with=None, join_args=False):
         """
         Run the task command
 
@@ -119,8 +119,10 @@ def run_command(self, run_with, join_args=False):
         :return: the process that was run
         :rtype: subprocess.Popen
         """
+        run_with = run_with or []
         cmd = [" ".join(self._command)] if join_args else self._command
         full_cmd = run_with + cmd
+
         self.log.info('Running: %s', full_cmd)
         proc = subprocess.Popen(
             full_cmd,
diff --git a/airflow/task/task_runner/bash_task_runner.py 
b/airflow/task/task_runner/standard_task_runner.py
similarity index 85%
rename from airflow/task/task_runner/bash_task_runner.py
rename to airflow/task/task_runner/standard_task_runner.py
index 4ddcac5982..6b65fc35c5 100644
--- a/airflow/task/task_runner/bash_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -23,15 +23,15 @@
 from airflow.utils.helpers import reap_process_group
 
 
-class BashTaskRunner(BaseTaskRunner):
+class StandardTaskRunner(BaseTaskRunner):
     """
     Runs the raw Airflow task by invoking through the Bash shell.
     """
     def __init__(self, local_task_job):
-        super(BashTaskRunner, self).__init__(local_task_job)
+        super(StandardTaskRunner, self).__init__(local_task_job)
 
     def start(self):
-        self.process = self.run_command(['bash', '-c'], join_args=True)
+        self.process = self.run_command()
 
     def return_code(self):
         return self.process.poll()
@@ -41,4 +41,4 @@ def terminate(self):
             reap_process_group(self.process.pid, self.log)
 
     def on_finish(self):
-        super(BashTaskRunner, self).on_finish()
+        super(StandardTaskRunner, self).on_finish()
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index 7937b1b8a3..9bf051f580 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -55,8 +55,8 @@ def assert_tasks_on_executor(self, executor):
         # start the executor
         executor.start()
 
-        success_command = 'echo 1'
-        fail_command = 'exit 1'
+        success_command = ['true', ]
+        fail_command = ['false', ]
 
         executor.execute_async(key='success', command=success_command)
         executor.execute_async(key='fail', command=fail_command)
diff --git a/tests/executors/test_celery_executor.py 
b/tests/executors/test_celery_executor.py
index 1169c8d11b..7e82e4b8ae 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,14 +27,15 @@
 # leave this it is used by the test worker
 import celery.contrib.testing.tasks
 
+
 class CeleryExecutorTest(unittest.TestCase):
     def test_celery_integration(self):
         executor = CeleryExecutor()
         executor.start()
         with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
 
-            success_command = 'echo 1'
-            fail_command = 'exit 1'
+            success_command = ['true', ]
+            fail_command = ['false', ]
 
             executor.execute_async(key='success', command=success_command)
             # errors are propagated for some reason
diff --git a/tests/executors/test_local_executor.py 
b/tests/executors/test_local_executor.py
index 91d1ec8196..846e132561 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,11 +33,11 @@ def execution_parallelism(self, parallelism=0):
         executor.start()
 
         success_key = 'success {}'
-        success_command = 'echo {}'
-        fail_command = 'exit 1'
+        success_command = ['true', ]
+        fail_command = ['false', ]
 
         for i in range(self.TEST_SUCCESS_COMMANDS):
-            key, command = success_key.format(i), success_command.format(i)
+            key, command = success_key.format(i), success_command
             executor.execute_async(key=key, command=command)
             executor.running[key] = True
 
diff --git a/tests/task/task_runner/test_bash_task_runner.py 
b/tests/task/task_runner/test_standard_task_runner.py
similarity index 94%
rename from tests/task/task_runner/test_bash_task_runner.py
rename to tests/task/task_runner/test_standard_task_runner.py
index 24c0819fdb..38a4e41e37 100644
--- a/tests/task/task_runner/test_bash_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,7 @@
 from airflow import models, settings
 from airflow.jobs import LocalTaskJob
 from airflow.models import TaskInstance as TI
-from airflow.task.task_runner import BashTaskRunner
+from airflow.task.task_runner import StandardTaskRunner
 from airflow.utils import timezone
 from airflow.utils.state import State
 
@@ -61,7 +61,7 @@
 }
 
 
-class TestBashTaskRunner(unittest.TestCase):
+class TestStandardTaskRunner(unittest.TestCase):
     def setUp(self):
         dictConfig(LOGGING_CONFIG)
 
@@ -71,7 +71,7 @@ def test_start_and_terminate(self):
         local_task_job.task_instance.run_as_user = None
         local_task_job.task_instance.command_as_list.return_value = ['sleep', 
'1000']
 
-        runner = BashTaskRunner(local_task_job)
+        runner = StandardTaskRunner(local_task_job)
         runner.start()
 
         pgid = os.getpgid(runner.process.pid)
@@ -119,7 +119,7 @@ def test_on_kill(self):
         ti = TI(task=task, execution_date=DEFAULT_DATE)
         job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)
 
-        runner = BashTaskRunner(job1)
+        runner = StandardTaskRunner(job1)
         runner.start()
 
         # give the task some time to startup


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Do not use Shell=True and bash to launch tasks
> ----------------------------------------------
>
>                 Key: AIRFLOW-2888
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2888
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Bolke de Bruin
>            Priority: Major
>
> Using shell=True is a security risk and there is no need to use bash



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to