feluelle commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports. Depends on [AIRFLOW-6010] URL: https://github.com/apache/airflow/pull/6596#discussion_r350251952
########## File path: airflow/executors/executor_types.py ########## @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Aliases for common types used in executors.""" +from datetime import datetime +from queue import Queue +from typing import Any, List, Optional, Tuple, Union + +# Key used to identify task instance +# Tuple of: task_id, ?, execution_date, +from sqlalchemy import Column + +import airflow.models +from airflow.models import TaskInstance +from airflow.utils.db import provide_session + +TaskInstanceKey = Tuple[str, str, datetime, int] + +# State of the task instance. +# Stores string version of the task state. +TaskInstanceState = Tuple[TaskInstanceKey, str] + +# Command to execute - might be either string or list of strings +# with the same semantics as subprocess.Popen +CommandType = Union[str, List[str]] + + +# Task that is queued. It contains all the information that is +# needed to run the task. +# +# Tuple of: command, priority, Queue, SimpleTaskInstance +class SimpleTaskInstance: + """ + Simplified Task Instance. + + Used to send data between processes via Queues. + + """ + def __init__(self, ti: TaskInstance): + self._dag_id: str = ti.dag_id + self._task_id: str = ti.task_id + self._execution_date: datetime = ti.execution_date + self._start_date: datetime = ti.start_date + self._end_date: datetime = ti.end_date + self._try_number: int = ti.try_number + self._state: str = ti.state + self._executor_config: Any = ti.executor_config + self._run_as_user: Optional[str] = None + if hasattr(ti, 'run_as_user'): + self._run_as_user = ti.run_as_user + self._pool: Optional[str] = None + if hasattr(ti, 'pool'): + self._pool = ti.pool + self._priority_weight: Optional[int] = None + if hasattr(ti, 'priority_weight'): + self._priority_weight = ti.priority_weight + self._queue: Column = ti.queue + self._key = ti.key + + # pylint: disable=missing-docstring + @property + def dag_id(self) -> str: + return self._dag_id + + @property + def task_id(self) -> str: + return self._task_id + + @property + def execution_date(self) -> datetime: + return self._execution_date + + @property + def start_date(self) -> datetime: + return self._start_date + + @property + def end_date(self) -> datetime: + return self._end_date + + @property + def try_number(self) -> int: + return self._try_number + + @property + def state(self) -> str: + return self._state + + @property + def pool(self) -> Any: + return self._pool + + @property + def priority_weight(self) -> Optional[int]: + return self._priority_weight + + @property + def queue(self) -> Column: + return self._queue + + @property + def key(self) -> TaskInstanceKey: + return self._key + + @property + def executor_config(self): + return self._executor_config + + @provide_session + def construct_task_instance(self, session=None, lock_for_update=False): + """ + Construct a TaskInstance from the database based on the primary key + + :param session: DB session. + :param lock_for_update: if True, indicates that the database should + lock the TaskInstance (issuing a FOR UPDATE clause) until the + session is committed. + """ Review comment: `:returns: the task instance constructed` and return type is missing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
