[ 
https://issues.apache.org/jira/browse/ARROW-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16800648#comment-16800648
 ] 

Francois Saint-Jacques commented on ARROW-5002:
-----------------------------------------------

The Take kernel should be out of the solution, like you mentioned, we want to 
minimize memory copy. My intent is to extend the `AggregateFunction` class with 
the `ConsumeWithFilters` and `ConsumeWithGroups` as follows:

{code:C++}
class AggregateFunction {                                                       
                              
 public:                                                                        
                              
  /// \brief Consume an array into a state.                                     
                              
  /// `SELECT AGG(x) FROM T;`                                                   
                              
  virtual Status Consume(const Array& input, void* state) const = 0;            
 
                            
  /// \brief Consume an array with a mask into a state.                         
                              
  /// `SELECT AGG(x) FROM T WHERE pred;`                                        
                              
  virtual Status ConsumeWithFilter(const Array& input, const Array& mask, void* 
state) const = 0;             

  /// \brief Consume an array with a scatter group into states.                 
                              
  /// `SELECT k, AGG(x) FROM T GROUP BY k;`                                     
                                 
  /// `SELECT k, AGG(x) FROM T WHERE pred GROUP BY k;`                          
                                 
  virtual Status ConsumeWithGroups(const Array& input, const Array& groups, 
IndexableState* states) const = 0;
{code}

The GroupBy kernel would emit an array of index (it also needs to provide the 
hash table of original keys). One desirable property of the GroupBy kernel is 
that it is not a full barrier to run the aggregates, in other words, you can 
run GroupBy & aggregates in parallel assuming you have a final consolidating 
phase (which is the barrier). Read section 4.4. of [Morsel-driven parallelism: 
a NUMA-aware query evaluation framework for the many-core 
age|https://www.semanticscholar.org/paper/Morsel-driven-parallelism%3A-a-NUMA-aware-query-for-Leis-Boncz/463bec3d0298e96e3702e071e241e3898f76eff2].

As a side note, we should specialize the GroupBy on the group types, e.g. for 
any primitive types of width <=16 (assuming a single column expression), we can 
use a fixed size array and no hash required.


> [C++] Implement GroupBy
> -----------------------
>
>                 Key: ARROW-5002
>                 URL: https://issues.apache.org/jira/browse/ARROW-5002
>             Project: Apache Arrow
>          Issue Type: Improvement
>            Reporter: Philipp Moritz
>            Priority: Major
>
> Dear all,
> I wonder what the best way forward is for implementing GroupBy kernels. 
> Initially this was part of
> https://issues.apache.org/jira/browse/ARROW-4124
> but is not contained in the current implementation as far as I can tell.
> It seems that the part of group by that just returns indices could be 
> conveniently implemented with the HashKernel. That seems useful in any case. 
> Is that indeed the best way forward/should this be done?
> GroupBy + Aggregate could then either be implemented with that + the Take 
> kernel + aggregation involving more memory copies than necessary though or as 
> part of the aggregate kernel. Probably the latter is preferred, any thoughts 
> on that?
> Am I missing any other JIRAs related to this?
> Best, Philipp.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to