ferruzzi commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2991669552
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -99,73 +110,35 @@ def _run_worker(
with unread_messages:
unread_messages.value -= 1
- # Handle different workload types
- if isinstance(workload, workloads.ExecuteTask):
- try:
- _execute_work(log, workload, team_conf)
- output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
- except Exception as e:
- log.exception("Task execution failed.")
- output.put((workload.ti.key, TaskInstanceState.FAILED, e))
-
- elif isinstance(workload, workloads.ExecuteCallback):
- output.put((workload.callback.id, CallbackState.RUNNING, None))
- try:
- _execute_callback(log, workload, team_conf)
- output.put((workload.callback.id, CallbackState.SUCCESS, None))
- except Exception as e:
- log.exception("Callback execution failed")
- output.put((workload.callback.id, CallbackState.FAILED, e))
-
- else:
- raise ValueError(f"LocalExecutor does not know how to handle
{type(workload)}")
-
-
-def _execute_work(log: Logger, workload: workloads.ExecuteTask, team_conf) ->
None:
- """
- Execute command received and stores result state in queue.
-
- :param log: Logger instance
- :param workload: The workload to execute
- :param team_conf: Team-specific executor configuration
- """
- from airflow.sdk.execution_time.supervisor import supervise
-
- setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)}
{workload.ti.id}", log)
-
- base_url = team_conf.get("api", "base_url", fallback="/")
- # If it's a relative URL, use localhost:8080 as the default
- if base_url.startswith("/"):
- base_url = f"http://localhost:8080{base_url}"
- default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
+ if workload.running_state is not None:
+ output.put((workload.key, workload.running_state, None))
- # This will return the exit code of the task process, but we don't care
about that, just if the
- # _supervisor_ had an error reporting the state back (which will result in
an exception.)
- supervise(
- # This is the "wrong" ti type, but it duck types the same. TODO:
Create a protocol for this.
- ti=workload.ti, # type: ignore[arg-type]
- dag_rel_path=workload.dag_rel_path,
- bundle_info=workload.bundle_info,
- token=workload.token,
- server=team_conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
- log_path=workload.log_path,
- )
+ try:
+ _execute_workload(log, workload, team_conf)
+ output.put((workload.key, workload.success_state, None))
+ except Exception as e:
+ log.exception("Workload execution failed.",
workload_type=type(workload).__name__)
+ output.put((workload.key, workload.failure_state, e))
Review Comment:
Hm. I can see where you are going. I don't think it's quite that simple
though. with the two pops how I have it, one of them will pop and the other is
expected to be a no-op. a task will pop from queued_tasks and the
queued_callbacks pop will just not do anything, and vice versa. A `del` would
have a problem with the no-op side of that.
But you are right that I missed the case where it's not in either. How do
you feel about this:
```python
for workload in workload_list:
self.activity_queue.put(workload)
# A valid workload will exist in exactly one of these dicts.
# One will succeed, the other will fail gracefully and return None.
removed = self.queued_tasks.pop(workload.key, None) or
self.queued_callbacks.pop(workload.key, None)
if not removed:
raise KeyError(f"Workload {workload.key} was not found in any
queue.")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]