dazza-codes commented on a change in pull request #7085: [AIRFLOW-6334] Use
classes instead list of string in executors
URL: https://github.com/apache/airflow/pull/7085#discussion_r363896237
##########
File path: airflow/executors/base_executor.py
##########
@@ -94,22 +96,43 @@ def queue_task_instance(
# cfg_path is needed to propagate the config values if using
impersonation
# (run_as_user), given that there are different code paths running
tasks.
# For a long term solution we need to address AIRFLOW-1986
- command_list_to_run = task_instance.command_as_list(
- local=True,
+ deferred_run = task_instance.get_local_task_job_deferred_run(
mark_success=mark_success,
ignore_all_deps=ignore_all_deps,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state,
pool=pool,
pickle_id=pickle_id,
- cfg_path=cfg_path)
- self.queue_command(
+ cfg_path=cfg_path,
+ )
+ self._queue_deferred_run(
SimpleTaskInstance(task_instance),
- command_list_to_run,
+ deferred_run,
priority=task_instance.task.priority_weight_total,
queue=task_instance.task.queue)
+ def queue_simple_task_instance(self, simple_task_instance:
SimpleTaskInstance, simple_dag: SimpleDag):
+ """Queues simple task instance."""
+ priority = simple_task_instance.priority_weight
+ queue = simple_task_instance.queue
+
+ queue_task_run = LocalTaskJobDeferredRun(
+ dag_id=simple_task_instance.dag_id,
+ task_id=simple_task_instance.task_id,
+ execution_date=simple_task_instance.execution_date,
+ pool=simple_task_instance.pool,
+ subdir=simple_dag.full_filepath,
+ pickle_id=simple_dag.pickle_id
+ )
+
+ self._queue_deferred_run(
+ simple_task_instance,
+ queue_task_run,
+ priority=priority,
+ queue=queue
+ )
+
Review comment:
Looking at this method, it starts to look like a feature-envy code-smell -
does the refactor somehow add/move a task queue on the `SimpleTaskInstance` or
the former `command` attributes might belong on that class instead of adding a
new class for `LocalTaskJobDeferredRun` or is the latter perhaps a subclass of
`SimpleTaskInstance`?
With regard to naming, why is `SimpleTaskInstance` not named simply
`SimpleTask` because it seems weird to have a class name that refers to an
"instance" or "object" of the class (sorry, I don't know enough about this
thing to understand why it needs to have `Instance` in the class name). The
introduction of a new class just begs a few questions about how these things
are related with regard to generics and specifics and whether or not they need
clarification with regard to `Simple` and `Local` vs. `Distributed`, `Delayed`
and what the distinctions are between `Task` and `Job` (pardon my ignorance and
I could take this offline of course). If a `DelayedTask` could use a mixin of
both a `SimpleTask` and a `QueuedTask`, it might be clearer? The use of
`Deferred` vs. `Queued` vs. `Async` terms could be clarified, esp. in the
context of `asyncio` where `Async` could imply an event loop and
coop-concurrency.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services