michalursa commented on pull request #9621: URL: https://github.com/apache/arrow/pull/9621#issuecomment-791153213
> Before digging into the details too much, my main issue with what I see is that I don't agree with making hash aggregation a callable function through `CallFunction`. > > In the context of a query engine, the interface for this operator looks something like: > > ``` > class ExecNode { > public: > virtual void Push(int node_index, const ExecBatch& batch) = 0; > virtual void PushDone(int node_index) = 0; > }; > > class HashAggregationNode : public ExecNode { > ... > }; > ``` > > (some query engines use a "pull"-based model, in which the data flow is inverted — there are pros and cons to both approaches, see https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf) > > If you want a simple one-shot version of the algorithm (rather than a general "streaming" one like the above), then you can break the input data into record batches of the desired size (e.g. 4K - 64K rows, depending on heuristics) and then push the chunks into the node that you create (note that the HashAggregationNode should push its result into a terminal "OutputNode" when you invoke `hash_agg_node->PushDone(0)`). > > The API can be completely crude / preliminary, but would it be possible to use the query-engine-type approach for this? I think it would be best to start taking some strides in this direction rather than bolting this onto the array function execution machinery which doesn't make sense in a query processing context (because aggregation is fundamentally a streaming algorithm) > > On the hash aggregation functions themselves, perhaps it makes sense to add a `HASH_AGGREGATE` function type and define the kernel interface for these functions, then look up these functions using the general dispatch machinery? I like these points. Let me break into subtopics what I think we are facing now and what we should think about for the future. I read the comment above as advocating bringing support for two concepts: a) relational operators (that are building blocks of a query execution pipeline or a more general query execution plan) and b) pipelines / streaming processing. **1. How we got here** Part of the problem is that we don't have a problem just yet. We didn't bite a big enough chunk of work to force us to do the appropriate refactoring. And the problem I am referring to is that of pipelining, which we do not have support for yet. From what I understand, currently we cannot bind together multiple operations into a single processing pipeline executed using a single Arrow function call, computing expressions on the fly without persisting their results for an entire set of rows. Related to the pipelining is the fact that in this PR we do not stream the group by output. That way, at some level of abstraction (squinting eyes) we can treat it as a scalar aggregate (we output a single item which happens to be an entire collection of arrays; variants make it possible). But to me the bigger problem here is that we are mixing together two separate worlds: scalar operators and relational operators and that muddies the general picture. That mixing happens as a consequence of treating the group by as a modified scalar aggregate in the code. ** 2. Scalar operators vs relational operators** One way to think about it is that scalar expressions (compositions of scalar operators) are like C++ lambdas (e.g. comparison of two values) provided in a call to C++ STL algorithms / containers (e.g. std::sort) while relational operators correspond to these algorithms / containers (except that they additionally have support for composability - creating execution pipelines / trees / DAGs). The point of confusion may come from the fact that once you vectorize scalar expressions (which you probably want to do, for performance reasons), their interfaces start looking very similar (if not the same) as would some special relational operators (namely: filter, project and scalar aggregate). I claim that current kernel executor classes are relational operators in disguise - project (aka compute scalar) and reduce (aka scalar aggregate). Relational operators inside current group by Interestingly, group by present in this PR, can itself be treated as a DAG of multiple relational operators with some kind of pipelines connecting them. The input batch is first processed by the code that assigns integer group id to every row based on key columns. The output of that is then processed by zero, one or more group by aggregate operators that update aggregate related accumulators in their internal arrays. At the end of the input, the output array related to group id mapping component is concatenated to output arrays for individual group by aggregate operators to produce output collection of arrays. We can treat group id mapping and group by aggregates as separate relational operators or we can choose to treat them as internals of a single hash group by operator. Even hash computation for hash table lookup (part of group id mapping) can be treated as a separate processing step that is using a projection operator. When we talk about relational operators and their connections we can talk at different levels of granularity. Sometimes it’s simpler to treat building blocks made of multiple operators with fixed, hard-coded connections between them as a single operator, sometimes it brings more opportunities for code reusability to treat smaller blocks separately. ** 3. Push and pull** My personal feeling is that the pull model was good in the early query execution engines, based on processing of a single row at a time and using virtual function calls to switch between relational operators within the query. In my experience, the push model is easier to work with in both modern worlds of query execution: JIT compiled query processing and vectorized query processing. It may seem abstract right now, which model is better push or pull, but I believe that once you have to actually solve an existing problem it becomes more obvious which one you prefer. From my experience, in a vectorized execution model, it is easy to adapt existing push-based implementation to support pull-based interface. I am guessing that the same would be true the other way around. Also, intuitively, when we think about consuming input, we think: pull (e.g. scanning in-memory data or a file), and when we think about producing output, we think: push (e.g. sending results of the computation over the network). I would probably recommend: at a lower level - use whatever model results in a more readable, simpler, more natural code - and at a higher level - adapt all interfaces to push model. ** 4. When streaming output is not desired** It may be a bit forward looking, but in some cases it is beneficial to give one relational operator direct access to internal row storage of another relational operator instead of always assuming that the operators stream their output. Streaming output should always be supported, but in addition to that, the direct access option may be useful. It’s not unusual to request the output of group by to be sorted on group by key columns. In that case the sort operator, which needs to accumulate all of the input in the buffer before sorting, could work directly on the group by buffers without converting internal row storage of group by to batches and then batches back to internal row storage of sort. Similarly, window functions, quantiles, may require random access to the entire set of input rows (within a group / partition of data). In this case, again, they may want to work on an internal storage of rows of sort rather than streaming it out and accumulating the output stream in internal buffers. ** 5. Summary** I think that what we are talking about here has a goal that overlaps with group by but is wider in scope and somewhat separate and that is: a) refactoring the code to bring the concepts of relational and scalar operators, b) laying ground for the support of pipelining. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org