michalursa commented on pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#issuecomment-791153213


   > Before digging into the details too much, my main issue with what I see is 
that I don't agree with making hash aggregation a callable function through 
`CallFunction`.
   > 
   > In the context of a query engine, the interface for this operator looks 
something like:
   > 
   > ```
   > class ExecNode {
   >  public:
   >   virtual void Push(int node_index, const ExecBatch& batch) = 0;
   >   virtual void PushDone(int node_index) = 0;
   > };
   > 
   > class HashAggregationNode : public ExecNode {
   >   ...
   > }; 
   > ```
   > 
   > (some query engines use a "pull"-based model, in which the data flow is 
inverted — there are pros and cons to both approaches, see 
https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf)
   > 
   > If you want a simple one-shot version of the algorithm (rather than a 
general "streaming" one like the above), then you can break the input data into 
record batches of the desired size (e.g. 4K - 64K rows, depending on 
heuristics) and then push the chunks into the node that you create (note that 
the HashAggregationNode should push its result into a terminal "OutputNode" 
when you invoke `hash_agg_node->PushDone(0)`).
   > 
   > The API can be completely crude / preliminary, but would it be possible to 
use the query-engine-type approach for this? I think it would be best to start 
taking some strides in this direction rather than bolting this onto the array 
function execution machinery which doesn't make sense in a query processing 
context (because aggregation is fundamentally a streaming algorithm)
   > 
   > On the hash aggregation functions themselves, perhaps it makes sense to 
add a `HASH_AGGREGATE` function type and define the kernel interface for these 
functions, then look up these functions using the general dispatch machinery?
   
   I like these points. Let me break into subtopics what I think we are facing 
now and what we should think about for the future. I read the comment above as 
advocating bringing support for two concepts: a) relational operators (that are 
building blocks of a query execution pipeline or a more general query execution 
plan) and b) pipelines / streaming processing. 
   
   **1. How we got here**
   
   Part of the problem is that we don't have a problem just yet. We didn't bite 
a big enough chunk of work to force us to do the appropriate refactoring. And 
the problem I am referring to is that of pipelining, which we do not have 
support for yet. From what I understand, currently we cannot bind together 
multiple operations into a single processing pipeline executed using a single 
Arrow function call, computing expressions on the fly without persisting their 
results for an entire set of rows. 
   
   Related to the pipelining is the fact that in this PR we do not stream the 
group by output. That way, at some level of abstraction (squinting eyes) we can 
treat it as a scalar aggregate (we output a single item which happens to be an 
entire collection of arrays; variants make it possible).
   
   But to me the bigger problem here is that we are mixing together two 
separate worlds: scalar operators and relational operators and that muddies the 
general picture. That mixing happens as a consequence of treating the group by 
as a modified scalar aggregate in the code.
   
   ** 2. Scalar operators vs relational operators**
   
   One way to think about it is that scalar expressions (compositions of scalar 
operators) are like C++ lambdas (e.g. comparison of two values) provided in a 
call to C++ STL algorithms / containers (e.g. std::sort) while relational 
operators correspond to these algorithms / containers (except that they 
additionally have support for composability - creating execution pipelines / 
trees / DAGs). 
   
   The point of confusion may come from the fact that once you vectorize scalar 
expressions (which you probably want to do, for performance reasons), their 
interfaces start looking very similar (if not the same) as would some special 
relational operators (namely: filter, project and scalar aggregate). I claim 
that current kernel executor classes are relational operators in disguise - 
project (aka compute scalar) and reduce (aka scalar aggregate). 
   Relational operators inside current group by
   Interestingly, group by present in this PR, can itself be treated as a DAG 
of multiple relational operators with some kind of pipelines connecting them. 
The input batch is first processed by the code that assigns integer group id to 
every row based on key columns. The output of that is then processed by zero, 
one or more group by aggregate operators that update aggregate related 
accumulators in their internal arrays. At the end of the input, the output 
array related to group id mapping component is concatenated to output arrays 
for individual group by aggregate operators to produce output collection of 
arrays. We can treat group id mapping and group by aggregates as separate 
relational operators or we can choose to treat them as internals of a single 
hash group by operator. Even hash computation for hash table lookup (part of 
group id mapping) can be treated as a separate processing step that is using a 
projection operator. 
   
   When we talk about relational operators and their connections we can talk at 
different levels of granularity. Sometimes it’s simpler to treat building 
blocks made of multiple operators with fixed, hard-coded connections between 
them as a single operator, sometimes it brings more opportunities for code 
reusability to treat smaller blocks separately. 
   
   ** 3. Push and pull**
   
   My personal feeling is that the pull model was good in the early query 
execution engines, based on processing of a single row at a time and using 
virtual function calls to switch between relational operators within the query. 
In my experience, the push model is easier to work with in both modern worlds 
of query execution: JIT compiled query processing and vectorized query 
processing. 
   
   It may seem abstract right now, which model is better push or pull, but I 
believe that once you have to actually solve an existing problem it becomes 
more obvious which one you prefer.
   
   From my experience, in a vectorized execution model, it is easy to adapt 
existing push-based implementation to support pull-based interface. I am 
guessing that the same would be true the other way around.
   
   Also, intuitively, when we think about consuming input, we think: pull (e.g. 
scanning in-memory data or a file), and when we think about producing output, 
we think: push (e.g. sending results of the computation over the network).  
   
   I would probably recommend: at a lower level - use whatever model results in 
a more readable, simpler, more natural code - and at a higher level - adapt all 
interfaces to push model.
   
   ** 4. When streaming output is not desired**
   
   It may be a bit forward looking, but in some cases it is beneficial to give 
one relational operator direct access to internal row storage of another 
relational operator instead of always assuming that the operators stream their 
output. Streaming output should always be supported, but in addition to that, 
the direct access option may be useful.
   
   It’s not unusual to request the output of group by to be sorted on group by 
key columns. In that case the sort operator, which needs to accumulate all of 
the input in the buffer before sorting, could work directly on the group by 
buffers without converting internal row storage of group by to batches and then 
batches back to internal row storage of sort. 
   
   Similarly, window functions, quantiles, may require random access to the 
entire set of input rows (within a group / partition of data). In this case, 
again, they may want to work on an internal storage of rows of sort rather than 
streaming it out and accumulating the output stream in internal buffers.
   
   ** 5. Summary**
   
   I think that what we are talking about here has a goal that overlaps with 
group by but is wider in scope and somewhat separate and that is: a) 
refactoring the code to bring the concepts of relational and scalar operators, 
b) laying ground for the support of pipelining. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to