I think we all very much agree  that first we need to define
responsibilities for those :). How about someone starts a nice design
doc with graphs etc. we can discuss :) ?

On Thu, Mar 31, 2022 at 11:11 PM Ping Zhang <[email protected]> wrote:
>>
>> Airflow would benefit overall from a clearer distinction of what part 
>> (scheduler, executor, task handler, local task job, triggerer, etc.) is 
>> responsible for updates to each state so we start getting a clearer picture
>
>
> Andrew, this is exactly the point. Currently, there is not a clear interface 
> for state change handling in each component. The main reason that I was 
> proposing using the scheduler to deal with the stuck queued state is the 
> executor currently does not handle ti state changes and the executor events 
> are handled by the scheduler. I think we should have a clear interface and 
> responsibility for each component (scheduler, executor, airflow run --local 
> process, airflow run --raw process, trigger) first before we decide the best 
> approach.
>
> Thanks,
>
> Ping
>
>
> On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin 
> <[email protected]> wrote:
>>
>> Yes, that was roughly the idea Jarek - if the executor is only running 
>> inside the scheduler and has no external process component, it'd be nice to 
>> have a part of the "executor interface" that got called periodically for 
>> cleanup (QUEUED or otherwise). In my internal executor experiments, we've 
>> had to use a separate process for this, though that has its own advantages.
>>
>> I think one good thing to establish, though, would be that only executor 
>> code touches task instances in that state (as part of a general overall rule 
>> that only one component is responsible for each state) - I think Airflow 
>> would benefit overall from a clearer distinction of what part (scheduler, 
>> executor, task handler, local task job, triggerer, etc.) is responsible for 
>> updates to each state so we start getting a clearer picture of where any 
>> bugs could be in distributed state machine terms.
>>
>> Andrew
>>
>> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <[email protected]> wrote:
>>>
>>> > 2. scheduler invokes that method periodically.
>>>
>>> I think this is not the right approach. I think I see what Andrew
>>> means here, but I think we should not assume that the scheduler will
>>> periodically call some method. Depending on the executor
>>> implementation (say for example future Fargate Executor or Cloud Run
>>> executor). Cleaning queued tasks might actually be done differently
>>> (there might be notification in the executor itself for the tasks that
>>> are queued and stuck and Scheduler might not need to periodically
>>> query it.
>>>
>>> I'd say a better approach (and possibly Andrew that's what you had in
>>> mind) is to have a separate method in the "executor" protocol -
>>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
>>> one in BaseExecutor now) could do periodic cleanup. But the future
>>> Fargate Executor could have it implemented differently.
>>>
>>> I think we already have a few methods like that in BaseExecutor that
>>> also have some implementation that will not really be useful in other
>>> executors, so deriving an executor from BaseExecutor which has some
>>> implementation that will likely need to be overridden in other
>>> executors. I think we should start with what Andrew proposed (I
>>> think). Take the existing executors, extract really an
>>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
>>> some common behaviour for executors and make sure we got it right -
>>> probably at the time we (or someone else) writes a new executor. Just
>>> to make sure we are not trying to make "common" code for something
>>> that is not really "common".
>>>
>>> But maybe I am misinterpreting the intentions :)
>>>
>>> J.

Reply via email to