This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 21371b6 Introduce BaseExecutor.validate_command to avoid duplication (#10033) 21371b6 is described below commit 21371b6f398ccdccbe3d5c7995fbf3d6848c1a4d Author: Tomek Urbaszek <turbas...@gmail.com> AuthorDate: Tue Jul 28 14:49:36 2020 +0200 Introduce BaseExecutor.validate_command to avoid duplication (#10033) --- airflow/executors/base_executor.py | 6 ++++++ airflow/executors/celery_executor.py | 4 +--- airflow/executors/dask_executor.py | 3 +-- airflow/executors/local_executor.py | 3 +-- airflow/executors/sequential_executor.py | 5 +---- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index ddcea54..80f96ca 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -264,3 +264,9 @@ class BaseExecutor(LoggingMixin): This method is called when the daemon receives a SIGTERM """ raise NotImplementedError() + + @staticmethod + def validate_command(command: List[str]) -> None: + """Check if the command to execute is airflow comnand""" + if command[0:3] != ["airflow", "tasks", "run"]: + raise ValueError('The command must start with ["airflow", "tasks", "run"].') diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 96771ae..81e6072 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -71,9 +71,7 @@ app = Celery( @app.task def execute_command(command_to_exec: CommandType) -> None: """Executes command.""" - if command_to_exec[0:3] != ["airflow", "tasks", "run"]: - raise ValueError('The command must start with ["airflow", "tasks", "run"].') - + BaseExecutor.validate_command(command_to_exec) log.info("Executing command in Celery: %s", command_to_exec) env = os.environ.copy() try: diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 1ae9ef3..76f32d4 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -72,8 +72,7 @@ class DaskExecutor(BaseExecutor): queue: Optional[str] = None, executor_config: Optional[Any] = None) -> None: - if command[0:3] != ["airflow", "tasks", "run"]: - raise ValueError('The command must start with ["airflow", "tasks", "run"].') + self.validate_command(command) def airflow_run(): return subprocess.check_call(command, close_fds=True) diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 7bf8ddc..4800e66 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -287,8 +287,7 @@ class LocalExecutor(BaseExecutor): if not self.impl: raise AirflowException(NOT_STARTED_MESSAGE) - if command[0:3] != ["airflow", "tasks", "run"]: - raise ValueError('The command must start with ["airflow", "tasks", "run"].') + self.validate_command(command) self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 906e117..18a4747 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -49,10 +49,7 @@ class SequentialExecutor(BaseExecutor): command: CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None) -> None: - - if command[0:3] != ["airflow", "tasks", "run"]: - raise ValueError('The command must start with ["airflow", "tasks", "run"].') - + self.validate_command(command) self.commands_to_run.append((key, command)) def sync(self) -> None: