agavra commented on issue #9615:
URL: https://github.com/apache/pinot/issues/9615#issuecomment-1282911839

   Had some a quick discussion with @walterddr and wanted to jot down those 
thoughts here so I don't forget:
   
   - there's two orthogonal design considerations: there's parallelism within a 
pipe/stage and there's scheduling across different pipe/stages (potentially 
across queries as well). The comment that I had posted is mostly regarding the 
latter. I think @siddharthteotia's comment is mostly about the former (though 
it has good thoughts on both).
   - implementing backpressure via GRPC on a per-mailbox level is possible, but 
we need to take care to make sure that the backpressure is piped all the way 
back to the leaf server execution itself. If you look at `QueryRunner`, there's 
currently no way to apply backpressure as it executes the entire leaf node 
requests before creating/sending data in the `MailboxSendOperator`. This is 
possible, but just requires some work
   - "Some operators need to consume everything before produce / output data. 
Example GROUP BY", for now I'll refer to these operators as stateful as opposed 
to stateless operators (SORT and the broadcast part of HASH_JOIN fall into this 
category as well).
   - I think it's important that all pipes/stages support partial execution - 
basically you can schedule a pipeline and it will do all the work that it can 
do and then terminate, even if it doesn't see an EOS block or produce any data 
(in the case of stateful operators, stateless ones will always produce data). 
The partial state is maintained so that when it is rescheduled (when another 
block is available) it can continue when it left off.
   
   Some thoughts on @siddharthteotia's comments specifically:
   
   > Yes, I think we need executors at the inner stage level to execute the 
pipelines inside the Stage.
   
   I'm a bit confused about the terminology here. Perhaps we can standardize on 
"operator" as a single unit of work, "pipeline" as the smallest schedule-able 
set of operators and "stage" as a complete remote-receive-to-remote-send set of 
pipelines. Today "pipeline" is always equivalent to a stage. I _think_ that was 
how you were using that terminology.
   
   With these definitions, I think we want executors to be independent of the 
number of stages/pipelines that are currently running on a multistage 
intermediate server. That might make QoS (quality of service) and thread pool 
management a little difficult - IMO one fixed thread pool and a priority-aware 
scheduler can get us pretty far so long as each pipeline can be scheduled 
independently and we have (see below) a mechanism to split pipelines.
   
   > Operators have a state machine something along the lines
   
   I really like this idea, this will also help us in the case where we want to 
split stages into additional local stages - and we can leverage the work 
@ankitsultana was working on in #9484 to increase parallelism without needing 
to introduce a parallel processing framework _within_ a single stage/task. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to