This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 21371b6  Introduce BaseExecutor.validate_command to avoid duplication 
(#10033)
21371b6 is described below

commit 21371b6f398ccdccbe3d5c7995fbf3d6848c1a4d
Author: Tomek Urbaszek <turbas...@gmail.com>
AuthorDate: Tue Jul 28 14:49:36 2020 +0200

    Introduce BaseExecutor.validate_command to avoid duplication (#10033)
---
 airflow/executors/base_executor.py       | 6 ++++++
 airflow/executors/celery_executor.py     | 4 +---
 airflow/executors/dask_executor.py       | 3 +--
 airflow/executors/local_executor.py      | 3 +--
 airflow/executors/sequential_executor.py | 5 +----
 5 files changed, 10 insertions(+), 11 deletions(-)

diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index ddcea54..80f96ca 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -264,3 +264,9 @@ class BaseExecutor(LoggingMixin):
         This method is called when the daemon receives a SIGTERM
         """
         raise NotImplementedError()
+
+    @staticmethod
+    def validate_command(command: List[str]) -> None:
+        """Check if the command to execute is airflow comnand"""
+        if command[0:3] != ["airflow", "tasks", "run"]:
+            raise ValueError('The command must start with ["airflow", "tasks", 
"run"].')
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 96771ae..81e6072 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -71,9 +71,7 @@ app = Celery(
 @app.task
 def execute_command(command_to_exec: CommandType) -> None:
     """Executes command."""
-    if command_to_exec[0:3] != ["airflow", "tasks", "run"]:
-        raise ValueError('The command must start with ["airflow", "tasks", 
"run"].')
-
+    BaseExecutor.validate_command(command_to_exec)
     log.info("Executing command in Celery: %s", command_to_exec)
     env = os.environ.copy()
     try:
diff --git a/airflow/executors/dask_executor.py 
b/airflow/executors/dask_executor.py
index 1ae9ef3..76f32d4 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -72,8 +72,7 @@ class DaskExecutor(BaseExecutor):
                       queue: Optional[str] = None,
                       executor_config: Optional[Any] = None) -> None:
 
-        if command[0:3] != ["airflow", "tasks", "run"]:
-            raise ValueError('The command must start with ["airflow", "tasks", 
"run"].')
+        self.validate_command(command)
 
         def airflow_run():
             return subprocess.check_call(command, close_fds=True)
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 7bf8ddc..4800e66 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -287,8 +287,7 @@ class LocalExecutor(BaseExecutor):
         if not self.impl:
             raise AirflowException(NOT_STARTED_MESSAGE)
 
-        if command[0:3] != ["airflow", "tasks", "run"]:
-            raise ValueError('The command must start with ["airflow", "tasks", 
"run"].')
+        self.validate_command(command)
 
         self.impl.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
 
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 906e117..18a4747 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -49,10 +49,7 @@ class SequentialExecutor(BaseExecutor):
                       command: CommandType,
                       queue: Optional[str] = None,
                       executor_config: Optional[Any] = None) -> None:
-
-        if command[0:3] != ["airflow", "tasks", "run"]:
-            raise ValueError('The command must start with ["airflow", "tasks", 
"run"].')
-
+        self.validate_command(command)
         self.commands_to_run.append((key, command))
 
     def sync(self) -> None:

Reply via email to