ic4y commented on issue #956:
URL: 
https://github.com/apache/arrow-datafusion/issues/956#issuecomment-993785271


   @Dandandan Thank you very much for your suggestions
   1. Canceling the `make slice` solution can indeed bring a certain 
performance improvement, but I tested on arrow2 and found that the performance 
of `array.slice` has been greatly improved. So `slice` performance is not a 
problem after using arrow2.
   2. For `eq_array` I tried hashbrown's `get_each_mut` method, but the 
performance did not change (through the analysis of PPROF, I found that 
`eq_array` still takes a lot of time). The code show as follows
   ```rust
         create_hashes(&group_values, random_state, &mut batch_hashes)?;
       let mut absent_rows = vec![];
       const CHUNK_SIZE: usize = 16;
       let (chunks, remainder) = batch_hashes.as_chunks::<CHUNK_SIZE>();
       for (num, chunk) in chunks.into_iter().enumerate() {
           let Accumulators { map, group_states } = &mut accumulators;
           let result = map.get_each_mut(*chunk, |index, (hash, group_idx)| {
               if(batch_hashes[num*CHUNK_SIZE + index] != *hash){
                   false
               }else {
                   let group_state = &group_states[*group_idx];
                   group_values
                       .iter()
                       .zip(group_state.group_by_values.iter())
                       .all(|(array, scalar)| scalar.eq_array(array, index + 
num * CHUNK_SIZE))
               }
           });
   
           for (i, re) in result.iter().enumerate() {
               let row = num*CHUNK_SIZE+i;
               match re {
                   Ok((hash, group_idx)) => {
                       let group_state = &mut group_states[*group_idx];
                       // 1.3
                       if group_state.indices.is_empty() {
                           groups_with_rows.push(*group_idx);
                       };
                       group_state.indices.push(row as u32); // remember this 
row
                   }
                   Err(unavailable) => { match unavailable {
                       UnavailableMutError::Absent => {
                           absent_rows.push(row);
                       }
                       UnavailableMutError::Duplicate(index) => {
                           let (hash, group_idx) = 
&result.get(*index).unwrap().as_ref().unwrap();
                           let group_state = &mut group_states[*group_idx];
                           // 1.3
                           if group_state.indices.is_empty() {
                               groups_with_rows.push(*group_idx);
                           };
                           group_state.indices.push(row as u32); // remember 
this row
                       }
                   } }
               }
           }
       }
   
       //absent rows
       for (i, row) in absent_rows.into_iter().enumerate() {
           let hash= batch_hashes[row];
           let Accumulators { map, group_states } = &mut accumulators;
           let entry = map.get_mut(hash, |(_hash, group_idx)| {
               let group_state = &group_states[*group_idx];
               if (*_hash != hash){
                   false
               }else{
               group_values
                   .iter()
                   .zip(group_state.group_by_values.iter())
                   .all(|(array, scalar)| scalar.eq_array(array, row))
               }
               //*_hash == hash
           });
   
           match entry {
               // Existing entry for this group value
               Some((_hash, group_idx)) => {
                   let group_state = &mut group_states[*group_idx];
                   // 1.3
                   if group_state.indices.is_empty() {
                       groups_with_rows.push(*group_idx);
                   };
                   group_state.indices.push(row as u32); // remember this row
               }
               //  1.2 Need to create new entry
               None => {
                   let accumulator_set = create_accumulators(aggr_expr)
                       .map_err(DataFusionError::into_arrow_external_error)?;
   
                   // Copy group values out of arrays into `ScalarValue`s
                   let group_by_values = group_values
                       .iter()
                       .map(|col| ScalarValue::try_from_array(col, row))
                       .collect::<Result<Vec<_>>>()?;
   
                   // Add new entry to group_states and save newly created index
                   let group_state = GroupState {
                       group_by_values: group_by_values.into_boxed_slice(),
                       accumulator_set,
                       indices: vec![row as u32], // 1.3
                   };
                   let group_idx = group_states.len();
                   group_states.push(group_state);
                   groups_with_rows.push(group_idx);
   
                   // for hasher function, use precomputed hash value
                   map.insert(hash, (hash, group_idx), |(hash, _group_idx)| 
*hash);
               }
           };
       }
   
       //remainder rows
       for (i, &hash) in remainder.into_iter().enumerate() {
           let row = i+CHUNK_SIZE*chunks.len();
           let Accumulators { map, group_states } = &mut accumulators;
   
           let entry = map.get_mut(hash, |(_hash, group_idx)| {
               let group_state = &group_states[*group_idx];
               group_values
                   .iter()
                   .zip(group_state.group_by_values.iter())
                   .all(|(array, scalar)| scalar.eq_array(array, row))
           });
   
           match entry {
               // Existing entry for this group value
               Some((_hash, group_idx)) => {
                   let group_state = &mut group_states[*group_idx];
                   // 1.3
                   if group_state.indices.is_empty() {
                       groups_with_rows.push(*group_idx);
                   };
                   group_state.indices.push(row as u32); // remember this row
               }
               //  1.2 Need to create new entry
               None => {
                   let accumulator_set = create_accumulators(aggr_expr)
                       .map_err(DataFusionError::into_arrow_external_error)?;
   
                   // Copy group values out of arrays into `ScalarValue`s
                   let group_by_values = group_values
                       .iter()
                       .map(|col| ScalarValue::try_from_array(col, row))
                       .collect::<Result<Vec<_>>>()?;
   
                   // Add new entry to group_states and save newly created index
                   let group_state = GroupState {
                       group_by_values: group_by_values.into_boxed_slice(),
                       accumulator_set,
                       indices: vec![row as u32], // 1.3
                   };
                   let group_idx = group_states.len();
                   group_states.push(group_state);
                   groups_with_rows.push(group_idx);
   
                   // for hasher function, use precomputed hash value
                   map.insert(hash, (hash, group_idx), |(hash, _group_idx)| 
*hash);
               }
           };
       }
   
   ```
   3. I separated the `get_mut` and `eq_array` methods by useing hashbrown's 
iter_hash to process them in two loops for making the `eq_array` vectorized 
execution (but there is no implement vectorized execution yet. I am not very 
familiar with rust's vectorized execution). the following code does separated 
most of `get_mut` and `eq_array`, but how can we vectorize execution an 
`eq_array` in a loop?
   ```rust
       create_hashes(&group_values, random_state, &mut batch_hashes)?;
   
       let map = &mut accumulators.map;
       let mut allid = Vec::with_capacity(batch_hashes.len());
       unsafe {
           for (row, &hash) in batch_hashes.iter().enumerate() {
               let hash1 = map.iter_hash(hash);
               allid.push(hash1.map(|bucket|{
                   let (_hash, group_idx) = bucket.as_ref();
                   *group_idx
               }).collect::<Vec<_>>())
           }
       }
   
       let mut gids = Vec::with_capacity(batch_hashes.len());
       for (row, iterHash) in allid.iter().enumerate(){
           let group_states = &mut accumulators.group_states;
           let mut gid = None;
           iterHash.into_iter().for_each(|group_idx| {
                   let group_state = &group_states[*group_idx];
                   if group_values
                       .iter()
                       .zip(group_state.group_by_values.iter())
                       .all(|(array, scalar)| scalar.eq_array(array, row))
                   {
                       gid = Some(*group_idx);
                   }
               });
           gids.push(gid);
       }
   
   
       for (row, op_group_idx) in gids.into_iter().enumerate() {
           let Accumulators { map, group_states } = &mut accumulators;
           let hash = batch_hashes[row];
           match op_group_idx {
               None => {
                   let entry = map.get_mut(hash, |(_hash, group_idx)| {
                       let group_state = &group_states[*group_idx];
                       group_values
                           .iter()
                           .zip(group_state.group_by_values.iter())
                           .all(|(array, scalar)| scalar.eq_array(array, row))
                   });
   
                   match entry {
                       // Existing entry for this group value
                       Some((_hash, group_idx)) => {
                           let group_state = &mut group_states[*group_idx];
                           // 1.3
                           if group_state.indices.is_empty() {
                               groups_with_rows.push(*group_idx);
                           };
                           group_state.indices.push(row as u32); // remember 
this row
                       }
                       //  1.2 Need to create new entry
                       None => {
                           let accumulator_set = create_accumulators(aggr_expr)
                               
.map_err(DataFusionError::into_arrow_external_error)?;
   
                           // Copy group values out of arrays into 
`ScalarValue`s
                           let group_by_values = group_values
                               .iter()
                               .map(|col| ScalarValue::try_from_array(col, row))
                               .collect::<Result<Vec<_>>>()?;
   
                           // Add new entry to group_states and save newly 
created index
                           let group_state = GroupState {
                               group_by_values: 
group_by_values.into_boxed_slice(),
                               accumulator_set,
                               indices: vec![row as u32], // 1.3
                           };
                           let group_idx = group_states.len();
                           group_states.push(group_state);
                           groups_with_rows.push(group_idx);
   
                           // for hasher function, use precomputed hash value
                           map.insert(hash, (hash, group_idx), |(hash, 
_group_idx)| *hash);
                       }
                   };
   
               }
               Some(group_idx) => {
                   let group_state = &mut group_states[group_idx];
                   // 1.3
                   if group_state.indices.is_empty() {
                       groups_with_rows.push(group_idx);
                   };
                   group_state.indices.push(row as u32); // remember this row
               }
           }
       }
   
   ```
   I really want to solve the performance problem of high cardinality 
aggregation, which has troubled me for a long time. Is there any problem with 
the above code? How can we further optimize? thanks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to