This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7d332c583df359a250428f19e48a2d889787c17a Author: Kaxil Naik <[email protected]> AuthorDate: Fri Jun 12 01:17:43 2020 +0100 Further validation that only task commands are run by executors (#9240) (cherry-picked from 99c534e9faf) --- airflow/executors/kubernetes_executor.py | 7 ++----- tests/executors/test_dask_executor.py | 6 +++--- tests/executors/test_kubernetes_executor.py | 3 ++- tests/executors/test_local_executor.py | 20 +++++++++++++++----- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index b62462f..8b5fdc1 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -395,11 +395,8 @@ class AirflowKubernetesScheduler(LoggingMixin): key, command, kube_executor_config = next_job dag_id, task_id, execution_date, try_number = key - if isinstance(command, str): - command = [command] - - if command[0] != "airflow": - raise ValueError('The first element of command must be equal to "airflow".') + if command[0:2] != ["airflow", "run"]: + raise ValueError('The command must start with ["airflow", "run"].') config_pod = self.worker_configuration.make_pod( namespace=self.namespace, diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py index f43a54f..2bf9d8e 100644 --- a/tests/executors/test_dask_executor.py +++ b/tests/executors/test_dask_executor.py @@ -50,12 +50,12 @@ pytestmark = pytest.mark.xfail(condition=True, reason="The Dask executor is expe class BaseDaskTest(unittest.TestCase): def assert_tasks_on_executor(self, executor): + + success_command = ['airflow', 'run', '--help'] + fail_command = ['airflow', 'run', 'false'] # start the executor executor.start() - success_command = ['true', 'some_parameter'] - fail_command = ['false', 'some_parameter'] - executor.execute_async(key='success', command=success_command) executor.execute_async(key='fail', command=fail_command) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 9073493..77299f6 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -155,7 +155,8 @@ class TestKubernetesExecutor(unittest.TestCase): # Execute a task while the Api Throws errors try_number = 1 kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number), - command='command', executor_config={}) + command=['airflow', 'run', 'true', 'some_parameter'], + executor_config={}) kubernetesExecutor.sync() kubernetesExecutor.sync() diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py index 3aebbe2..67f0b28 100644 --- a/tests/executors/test_local_executor.py +++ b/tests/executors/test_local_executor.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +import subprocess import unittest from tests.compat import mock @@ -28,13 +29,22 @@ class LocalExecutorTest(unittest.TestCase): TEST_SUCCESS_COMMANDS = 5 - def execution_parallelism(self, parallelism=0): + @mock.patch('airflow.executors.local_executor.subprocess.check_call') + def execution_parallelism(self, mock_check_call, parallelism=0): + success_command = ['airflow', 'run', 'true', 'some_parameter'] + fail_command = ['airflow', 'run', 'false'] + + def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument + if command != success_command: + raise subprocess.CalledProcessError(returncode=1, cmd=command) + else: + return 0 + + mock_check_call.side_effect = fake_execute_command executor = LocalExecutor(parallelism=parallelism) executor.start() success_key = 'success {}' - success_command = ['true', 'some_parameter'] - fail_command = ['false', 'some_parameter'] self.assertTrue(executor.result_queue.empty()) for i in range(self.TEST_SUCCESS_COMMANDS): @@ -58,11 +68,11 @@ class LocalExecutorTest(unittest.TestCase): self.assertEqual(executor.workers_used, expected) def test_execution_unlimited_parallelism(self): - self.execution_parallelism(parallelism=0) + self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter def test_execution_limited_parallelism(self): test_parallelism = 2 - self.execution_parallelism(parallelism=test_parallelism) + self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter @mock.patch('airflow.executors.local_executor.LocalExecutor.sync') @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
