pitrou commented on code in PR #13630:
URL: https://github.com/apache/arrow/pull/13630#discussion_r924240870


##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -193,8 +193,11 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, 
const std::vector<Dat
 
   // Merge if necessary
   for (size_t thread_index = 1; thread_index < thread_ids.size(); 
++thread_index) {
+    // TODO: Return ExecSpan from GetUniques; but need to figure out memory
+    // management strategy

Review Comment:
   Same here?



##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -174,15 +171,18 @@ Result<Datum> GroupBy(const std::vector<Datum>& 
arguments, const std::vector<Dat
       auto grouper = groupers[thread_index].get();
 
       // compute a batch of group ids
+      // TODO(wesm): refactor Grouper::Consume to write into preallocated
+      // memory

Review Comment:
   Rather than adding TODOs in the code, this would better be tracked and 
remembered using a JIRA, IMHO.



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -84,7 +51,10 @@ class ARROW_EXPORT ExecSpanIterator {
   /// \param[in] batch the input ExecBatch
   /// \param[in] max_chunksize the maximum length of each ExecSpan. Depending
   /// on the chunk layout of ChunkedArray.
-  Status Init(const ExecBatch& batch, int64_t max_chunksize = 
kDefaultMaxChunksize);
+  /// \param[in] promote_if_all_scalars if all of the values are scalars,
+  /// return them in each ExecSpan as ArraySpan of length 1

Review Comment:
   What happens otherwise? (i.e. if all values are scalars but 
`promote_if_all_scalars` is false)



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -294,20 +282,21 @@ struct GrouperFastImpl : Grouper {
                                     ctx_->memory_pool()));
           }
         }
-        return ConsumeImpl(expanded);
+        return ConsumeImpl(ExecSpan(expanded));
       }
     }
     return ConsumeImpl(batch);
   }
 
-  Result<Datum> ConsumeImpl(const ExecBatch& batch) {
+  Result<Datum> ConsumeImpl(const ExecSpan& batch) {
     int64_t num_rows = batch.length;
     int num_columns = batch.num_values();
     // Process dictionaries
     for (int icol = 0; icol < num_columns; ++icol) {
       if (key_types_[icol].id() == Type::DICTIONARY) {
-        auto data = batch[icol].array();
-        auto dict = MakeArray(data->dictionary);
+        const ArraySpan& data = batch[icol].array;
+        // TODO(wesm): do not require ToArrayData here

Review Comment:
   Add a JIRA?



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -110,6 +80,7 @@ class ARROW_EXPORT ExecSpanIterator {
   bool initialized_ = false;
   bool have_chunked_arrays_ = false;
   bool have_all_scalars_ = false;
+  bool promote_if_all_scalars_ = false;

Review Comment:
   For the sake of avoiding confusion, can this have the same default value 
`true` as in the constructor?



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -609,20 +609,17 @@ struct VectorKernel : public Kernel {
 // ----------------------------------------------------------------------
 // ScalarAggregateKernel (for ScalarAggregateFunction)
 
-using ScalarAggregateConsume = std::function<Status(KernelContext*, const 
ExecBatch&)>;
-
-using ScalarAggregateMerge =
-    std::function<Status(KernelContext*, KernelState&&, KernelState*)>;
-
+typedef Status (*ScalarAggregateConsume)(KernelContext*, const ExecSpan&);
+typedef Status (*ScalarAggregateMerge)(KernelContext*, KernelState&&, 
KernelState*);
 // Finalize returns Datum to permit multiple return values
-using ScalarAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;
+typedef Status (*ScalarAggregateFinalize)(KernelContext*, Datum*);

Review Comment:
   The style guide discourages `typedef` and recommends `using` instead



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -448,9 +449,12 @@ struct MinMaxImpl : public ScalarAggregator {
     return Status::OK();
   }
 
-  Status ConsumeArray(const ArrayType& arr) {
+  Status ConsumeArray(const ArraySpan& arr_span) {
     StateType local;
 
+    // TODO(wesm): do not use ToArrayData
+    ArrayType arr(arr_span.ToArrayData());

Review Comment:
   Is this something that you plan to do in a later PR? Or can it just be done 
here?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -202,14 +202,16 @@ Result<Datum> GroupByUsingExecPlan(const 
std::vector<Datum>& arguments,
   inputs.reserve(inputs.size() + keys.size());
   inputs.insert(inputs.end(), keys.begin(), keys.end());
 
-  ARROW_ASSIGN_OR_RAISE(auto batch_iterator,
-                        ExecBatchIterator::Make(inputs, 
ctx->exec_chunksize()));
+  ExecSpanIterator span_iterator;
+  ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make(inputs));
+  RETURN_NOT_OK(span_iterator.Init(batch, ctx->exec_chunksize()));
   BatchesWithSchema input;
   input.schema = schema(std::move(scan_fields));
-  ExecBatch batch;
-  while (batch_iterator->Next(&batch)) {
-    if (batch.length == 0) continue;
-    input.batches.push_back(std::move(batch));
+  ExecSpan span;
+  while (span_iterator.Next(&span)) {
+    if (span.length == 0) continue;
+    // TODO(wesm): investigate possibility of ExecPlans using ExecSpan

Review Comment:
   Should this be a JIRA instead? TODOs sprinkled in code are often ignored.



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -659,23 +656,20 @@ struct ScalarAggregateKernel : public Kernel {
 // ----------------------------------------------------------------------
 // HashAggregateKernel (for HashAggregateFunction)
 
-using HashAggregateResize = std::function<Status(KernelContext*, int64_t)>;
-
-using HashAggregateConsume = std::function<Status(KernelContext*, const 
ExecBatch&)>;
-
-using HashAggregateMerge =
-    std::function<Status(KernelContext*, KernelState&&, const ArrayData&)>;
+typedef Status (*HashAggregateResize)(KernelContext*, int64_t);
+typedef Status (*HashAggregateConsume)(KernelContext*, const ExecSpan&);
+typedef Status (*HashAggregateMerge)(KernelContext*, KernelState&&, const 
ArrayData&);
 
 // Finalize returns Datum to permit multiple return values
-using HashAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;
+typedef Status (*HashAggregateFinalize)(KernelContext*, Datum*);

Review Comment:
   Same here.



##########
cpp/src/arrow/compute/kernels/aggregate_quantile.cc:
##########
@@ -471,45 +471,6 @@ struct ExactQuantiler<InType, 
enable_if_t<is_decimal_type<InType>::value>> {
   SortQuantiler<InType> impl;
 };
 
-template <typename T>
-Status ScalarQuantile(KernelContext* ctx, const Scalar& scalar, ExecResult* 
out) {
-  const QuantileOptions& options = QuantileState::Get(ctx);
-  using CType = typename TypeTraits<T>::CType;
-  ArrayData* output = out->array_data().get();
-  output->length = options.q.size();
-  auto out_type = IsDataPoint(options) ? scalar.type : float64();
-  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
-                        ctx->Allocate(output->length * 
out_type->byte_width()));
-
-  if (!scalar.is_valid || options.min_count > 1) {
-    output->null_count = output->length;
-    ARROW_ASSIGN_OR_RAISE(output->buffers[0], 
ctx->AllocateBitmap(output->length));
-    bit_util::SetBitsTo(output->buffers[0]->mutable_data(), /*offset=*/0, 
output->length,
-                        false);
-    if (IsDataPoint(options)) {
-      CType* out_buffer = output->template GetMutableValues<CType>(1);
-      std::fill(out_buffer, out_buffer + output->length, CType(0));
-    } else {
-      double* out_buffer = output->template GetMutableValues<double>(1);
-      std::fill(out_buffer, out_buffer + output->length, 0.0);
-    }
-    return Status::OK();
-  }
-  output->null_count = 0;
-  if (IsDataPoint(options)) {
-    CType* out_buffer = output->template GetMutableValues<CType>(1);
-    for (int64_t i = 0; i < output->length; i++) {
-      out_buffer[i] = UnboxScalar<T>::Unbox(scalar);
-    }
-  } else {
-    double* out_buffer = output->template GetMutableValues<double>(1);
-    for (int64_t i = 0; i < output->length; i++) {
-      out_buffer[i] = DataPointToDouble(UnboxScalar<T>::Unbox(scalar), 
*scalar.type);
-    }
-  }
-  return Status::OK();
-}
-

Review Comment:
   Neat removal :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to