o-nikolas commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r3053811174


##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -99,74 +108,20 @@ def _run_worker(
         with unread_messages:
             unread_messages.value -= 1
 
-        # Handle different workload types
-        if isinstance(workload, workloads.ExecuteTask):
-            try:
-                _execute_work(log, workload, team_conf)
-                output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
-            except Exception as e:
-                log.exception("Task execution failed.")
-                output.put((workload.ti.key, TaskInstanceState.FAILED, e))
-
-        elif isinstance(workload, workloads.ExecuteCallback):
-            output.put((workload.callback.id, CallbackState.RUNNING, None))
-            try:
-                _execute_callback(log, workload, team_conf)
-                output.put((workload.callback.id, CallbackState.SUCCESS, None))
-            except Exception as e:
-                log.exception("Callback execution failed")
-                output.put((workload.callback.id, CallbackState.FAILED, e))
-
-        else:
-            raise ValueError(f"LocalExecutor does not know how to handle 
{type(workload)}")
-
-
-def _execute_work(log: Logger, workload: workloads.ExecuteTask, team_conf) -> 
None:
-    """
-    Execute command received and stores result state in queue.
-
-    :param log: Logger instance
-    :param workload: The workload to execute
-    :param team_conf: Team-specific executor configuration
-    """
-    from airflow.sdk.execution_time.supervisor import supervise
-
-    setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} 
{workload.ti.id}", log)
+        if workload.running_state is not None:
+            output.put((workload.key, workload.running_state, None))
 
-    base_url = team_conf.get("api", "base_url", fallback="/")
-    # If it's a relative URL, use localhost:8080 as the default
-    if base_url.startswith("/"):
-        base_url = f"http://localhost:8080{base_url}";
-    default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
-
-    # This will return the exit code of the task process, but we don't care 
about that, just if the
-    # _supervisor_ had an error reporting the state back (which will result in 
an exception.)
-    supervise(
-        # This is the "wrong" ti type, but it duck types the same. TODO: 
Create a protocol for this.
-        ti=workload.ti,  # type: ignore[arg-type]
-        dag_rel_path=workload.dag_rel_path,
-        bundle_info=workload.bundle_info,
-        token=workload.token,
-        server=team_conf.get("core", "execution_api_server_url", 
fallback=default_execution_api_server),
-        log_path=workload.log_path,
-        subprocess_logs_to_stdout=True,
-    )
-
-
-def _execute_callback(log: Logger, workload: workloads.ExecuteCallback, 
team_conf) -> None:
-    """
-    Execute a callback workload.
-
-    :param log: Logger instance
-    :param workload: The ExecuteCallback workload to execute
-    :param team_conf: Team-specific executor configuration
-    """
-    setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} 
{workload.callback.id}", log)
-
-    success, error_msg = execute_callback_workload(workload.callback, log)
-
-    if not success:
-        raise RuntimeError(error_msg or "Callback execution failed")
+        try:
+            BaseExecutor.run_workload(

Review Comment:
   Fair enough, it doesn't quite seem like the right responsibility of that 
class. It's meant to be a baseclass for executors themselves, all usages of it, 
so far, are really through the inheritance of the subclasses. Basically calling 
a method on a "base" class directly like this looks like a code smell to me 
`BaseExecutor.run_workload(...)`
   
   But if we're okay with that, and @ashb prefers this for some greater good, 
then I suppose I'm okay to sign off on it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to