This is an automated email from the ASF dual-hosted git repository.

felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d84c1e05b GH-42126: [C++] Move TakeXXX free functions into 
TakeMetaFunction and make them private (#42127)
7d84c1e05b is described below

commit 7d84c1e05bc358441da8bc8b214777e41868a101
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Thu Jun 13 18:52:21 2024 -0300

    GH-42126: [C++] Move TakeXXX free functions into TakeMetaFunction and make 
them private (#42127)
    
    ### Rationale for this change
    
    Move TakeXXX free functions into `TakeMetaFunction` and make them private
    
    ### What changes are included in this PR?
    
    Code move and some small refactorings in preparation for #41700.
    
    ### Are these changes tested?
    
    By existing tests.
    * GitHub Issue: #42126
    
    Authored-by: Felipe Oliveira Carvalho <[email protected]>
    Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
---
 .../kernels/vector_selection_take_internal.cc      | 252 +++++++++++----------
 1 file changed, 128 insertions(+), 124 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
index dee80e9d25..8b3f0431e6 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
@@ -31,6 +31,7 @@
 #include "arrow/compute/kernels/gather_internal.h"
 #include "arrow/compute/kernels/vector_selection_internal.h"
 #include "arrow/compute/kernels/vector_selection_take_internal.h"
+#include "arrow/compute/registry.h"
 #include "arrow/memory_pool.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
@@ -536,142 +537,144 @@ Status ExtensionTake(KernelContext* ctx, const 
ExecSpan& batch, ExecResult* out)
 // R -> RecordBatch
 // T -> Table
 
-Result<std::shared_ptr<ArrayData>> TakeAAA(const std::shared_ptr<ArrayData>& 
values,
-                                           const std::shared_ptr<ArrayData>& 
indices,
-                                           const TakeOptions& options, 
ExecContext* ctx) {
-  ARROW_ASSIGN_OR_RAISE(Datum result,
-                        CallFunction("array_take", {values, indices}, 
&options, ctx));
-  return result.array();
-}
+const FunctionDoc take_doc(
+    "Select values from an input based on indices from another array",
+    ("The output is populated with values from the input at positions\n"
+     "given by `indices`.  Nulls in `indices` emit null in the output."),
+    {"input", "indices"}, "TakeOptions");
 
-Result<std::shared_ptr<ChunkedArray>> TakeCAC(const ChunkedArray& values,
-                                              const Array& indices,
-                                              const TakeOptions& options,
-                                              ExecContext* ctx) {
-  std::shared_ptr<Array> values_array;
-  if (values.num_chunks() == 1) {
-    // Case 1: `values` has a single chunk, so just use it
-    values_array = values.chunk(0);
-  } else {
-    // TODO Case 2: See if all `indices` fall in the same chunk and call Array 
Take on it
-    // See
-    // 
https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151
-    // TODO Case 3: If indices are sorted, can slice them and call Array Take
-    // (these are relevant to TakeCCC as well)
-
-    // Case 4: Else, concatenate chunks and call Array Take
-    if (values.chunks().empty()) {
-      ARROW_ASSIGN_OR_RAISE(
-          values_array, MakeArrayOfNull(values.type(), /*length=*/0, 
ctx->memory_pool()));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(values_array,
-                            Concatenate(values.chunks(), ctx->memory_pool()));
-    }
+// Metafunction for dispatching to different Take implementations other than
+// Array-Array.
+class TakeMetaFunction : public MetaFunction {
+ public:
+  TakeMetaFunction()
+      : MetaFunction("take", Arity::Binary(), take_doc, 
GetDefaultTakeOptions()) {}
+
+  static Result<Datum> CallArrayTake(const std::vector<Datum>& args,
+                                     const TakeOptions& options, ExecContext* 
ctx) {
+    ARROW_ASSIGN_OR_RAISE(auto array_take_func,
+                          ctx->func_registry()->GetFunction("array_take"));
+    return array_take_func->Execute(args, &options, ctx);
   }
-  // Call Array Take on our single chunk
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> new_chunk,
-                        TakeAAA(values_array->data(), indices.data(), options, 
ctx));
-  std::vector<std::shared_ptr<Array>> chunks = {MakeArray(new_chunk)};
-  return std::make_shared<ChunkedArray>(std::move(chunks));
-}
 
-Result<std::shared_ptr<ChunkedArray>> TakeCCC(const ChunkedArray& values,
-                                              const ChunkedArray& indices,
-                                              const TakeOptions& options,
-                                              ExecContext* ctx) {
-  // XXX: for every chunk in indices, values are gathered from all chunks in 
values to
-  // form a new chunk in the result. Performing this concatenation is not 
ideal, but
-  // greatly simplifies the implementation before something more efficient is
-  // implemented.
-  std::shared_ptr<Array> values_array;
-  if (values.num_chunks() == 1) {
-    values_array = values.chunk(0);
-  } else {
-    if (values.chunks().empty()) {
-      ARROW_ASSIGN_OR_RAISE(
-          values_array, MakeArrayOfNull(values.type(), /*length=*/0, 
ctx->memory_pool()));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(values_array,
-                            Concatenate(values.chunks(), ctx->memory_pool()));
+  static Result<std::shared_ptr<Array>> ChunkedArrayAsArray(
+      const std::shared_ptr<ChunkedArray>& values, MemoryPool* pool) {
+    switch (values->num_chunks()) {
+      case 0:
+        return MakeArrayOfNull(values->type(), /*length=*/0, pool);
+      case 1:
+        return values->chunk(0);
+      default:
+        return Concatenate(values->chunks(), pool);
     }
   }
-  std::vector<std::shared_ptr<Array>> new_chunks;
-  new_chunks.resize(indices.num_chunks());
-  for (int i = 0; i < indices.num_chunks(); i++) {
-    ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(),
-                                              indices.chunk(i)->data(), 
options, ctx));
-    new_chunks[i] = MakeArray(chunk);
+
+ private:
+  static Result<std::shared_ptr<ArrayData>> TakeAAA(const std::vector<Datum>& 
args,
+                                                    const TakeOptions& options,
+                                                    ExecContext* ctx) {
+    DCHECK_EQ(args[0].kind(), Datum::ARRAY);
+    DCHECK_EQ(args[1].kind(), Datum::ARRAY);
+    ARROW_ASSIGN_OR_RAISE(Datum result, CallArrayTake(args, options, ctx));
+    return result.array();
   }
-  return std::make_shared<ChunkedArray>(std::move(new_chunks), values.type());
-}
 
-Result<std::shared_ptr<ChunkedArray>> TakeACC(const Array& values,
-                                              const ChunkedArray& indices,
-                                              const TakeOptions& options,
-                                              ExecContext* ctx) {
-  auto num_chunks = indices.num_chunks();
-  std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
-  for (int i = 0; i < num_chunks; i++) {
-    // Take with that indices chunk
-    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> chunk,
-                          TakeAAA(values.data(), indices.chunk(i)->data(), 
options, ctx));
-    new_chunks[i] = MakeArray(chunk);
+  static Result<std::shared_ptr<ArrayData>> TakeCAA(
+      const std::shared_ptr<ChunkedArray>& values, const Array& indices,
+      const TakeOptions& options, ExecContext* ctx) {
+    ARROW_ASSIGN_OR_RAISE(auto values_array,
+                          ChunkedArrayAsArray(values, ctx->memory_pool()));
+    std::vector<Datum> args = {std::move(values_array), indices};
+    return TakeAAA(args, options, ctx);
   }
-  return std::make_shared<ChunkedArray>(std::move(new_chunks), values.type());
-}
 
-Result<std::shared_ptr<RecordBatch>> TakeRAR(const RecordBatch& batch,
-                                             const Array& indices,
-                                             const TakeOptions& options,
-                                             ExecContext* ctx) {
-  auto ncols = batch.num_columns();
-  auto nrows = indices.length();
-  std::vector<std::shared_ptr<Array>> columns(ncols);
-  for (int j = 0; j < ncols; j++) {
-    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> col_data,
-                          TakeAAA(batch.column(j)->data(), indices.data(), 
options, ctx));
-    columns[j] = MakeArray(col_data);
+  static Result<std::shared_ptr<ChunkedArray>> TakeCAC(
+      const std::shared_ptr<ChunkedArray>& values, const Array& indices,
+      const TakeOptions& options, ExecContext* ctx) {
+    ARROW_ASSIGN_OR_RAISE(auto new_chunk, TakeCAA(values, indices, options, 
ctx));
+    return std::make_shared<ChunkedArray>(MakeArray(std::move(new_chunk)));
   }
-  return RecordBatch::Make(batch.schema(), nrows, std::move(columns));
-}
 
-Result<std::shared_ptr<Table>> TakeTAT(const Table& table, const Array& 
indices,
-                                       const TakeOptions& options, 
ExecContext* ctx) {
-  auto ncols = table.num_columns();
-  std::vector<std::shared_ptr<ChunkedArray>> columns(ncols);
+  static Result<std::shared_ptr<ChunkedArray>> TakeCCC(
+      const std::shared_ptr<ChunkedArray>& values,
+      const std::shared_ptr<ChunkedArray>& indices, const TakeOptions& options,
+      ExecContext* ctx) {
+    // XXX: for every chunk in indices, values are gathered from all chunks in 
values to
+    // form a new chunk in the result. Performing this concatenation is not 
ideal, but
+    // greatly simplifies the implementation before something more efficient is
+    // implemented.
+    ARROW_ASSIGN_OR_RAISE(auto values_array,
+                          ChunkedArrayAsArray(values, ctx->memory_pool()));
+    std::vector<Datum> args = {std::move(values_array), {}};
+    std::vector<std::shared_ptr<Array>> new_chunks;
+    new_chunks.resize(indices->num_chunks());
+    for (int i = 0; i < indices->num_chunks(); i++) {
+      args[1] = indices->chunk(i);
+      // XXX: this loop can use TakeCAA once it can handle ChunkedArray
+      // without concatenating first
+      ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx));
+      new_chunks[i] = MakeArray(chunk);
+    }
+    return std::make_shared<ChunkedArray>(std::move(new_chunks), 
values->type());
+  }
 
-  for (int j = 0; j < ncols; j++) {
-    ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(*table.column(j), indices, 
options, ctx));
+  static Result<std::shared_ptr<ChunkedArray>> TakeACC(const Array& values,
+                                                       const ChunkedArray& 
indices,
+                                                       const TakeOptions& 
options,
+                                                       ExecContext* ctx) {
+    auto num_chunks = indices.num_chunks();
+    std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+    std::vector<Datum> args = {values, {}};
+    for (int i = 0; i < num_chunks; i++) {
+      // Take with that indices chunk
+      args[1] = indices.chunk(i);
+      ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx));
+      new_chunks[i] = MakeArray(chunk);
+    }
+    return std::make_shared<ChunkedArray>(std::move(new_chunks), 
values.type());
   }
-  return Table::Make(table.schema(), std::move(columns));
-}
 
-Result<std::shared_ptr<Table>> TakeTCT(const Table& table, const ChunkedArray& 
indices,
-                                       const TakeOptions& options, 
ExecContext* ctx) {
-  auto ncols = table.num_columns();
-  std::vector<std::shared_ptr<ChunkedArray>> columns(ncols);
-  for (int j = 0; j < ncols; j++) {
-    ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(*table.column(j), indices, 
options, ctx));
+  static Result<std::shared_ptr<RecordBatch>> TakeRAR(const RecordBatch& batch,
+                                                      const Array& indices,
+                                                      const TakeOptions& 
options,
+                                                      ExecContext* ctx) {
+    auto ncols = batch.num_columns();
+    auto nrows = indices.length();
+    std::vector<std::shared_ptr<Array>> columns(ncols);
+    std::vector<Datum> args = {{}, indices};
+    for (int j = 0; j < ncols; j++) {
+      args[0] = batch.column(j);
+      ARROW_ASSIGN_OR_RAISE(auto col_data, TakeAAA(args, options, ctx));
+      columns[j] = MakeArray(col_data);
+    }
+    return RecordBatch::Make(batch.schema(), nrows, std::move(columns));
   }
-  return Table::Make(table.schema(), std::move(columns));
-}
 
-const FunctionDoc take_doc(
-    "Select values from an input based on indices from another array",
-    ("The output is populated with values from the input at positions\n"
-     "given by `indices`.  Nulls in `indices` emit null in the output."),
-    {"input", "indices"}, "TakeOptions");
+  static Result<std::shared_ptr<Table>> TakeTAT(const std::shared_ptr<Table>& 
table,
+                                                const Array& indices,
+                                                const TakeOptions& options,
+                                                ExecContext* ctx) {
+    auto ncols = table->num_columns();
+    std::vector<std::shared_ptr<ChunkedArray>> columns(ncols);
 
-// Metafunction for dispatching to different Take implementations other than
-// Array-Array.
-//
-// TODO: Revamp approach to executing Take operations. In addition to being
-// overly complex dispatching, there is no parallelization.
-class TakeMetaFunction : public MetaFunction {
- public:
-  TakeMetaFunction()
-      : MetaFunction("take", Arity::Binary(), take_doc, 
GetDefaultTakeOptions()) {}
+    for (int j = 0; j < ncols; j++) {
+      ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(table->column(j), indices, 
options, ctx));
+    }
+    return Table::Make(table->schema(), std::move(columns));
+  }
+
+  static Result<std::shared_ptr<Table>> TakeTCT(
+      const std::shared_ptr<Table>& table, const 
std::shared_ptr<ChunkedArray>& indices,
+      const TakeOptions& options, ExecContext* ctx) {
+    auto ncols = table->num_columns();
+    std::vector<std::shared_ptr<ChunkedArray>> columns(ncols);
+    for (int j = 0; j < ncols; j++) {
+      ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(table->column(j), indices, 
options, ctx));
+    }
+    return Table::Make(table->schema(), std::move(columns));
+  }
 
+ public:
   Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
                             const FunctionOptions* options,
                             ExecContext* ctx) const override {
@@ -680,16 +683,16 @@ class TakeMetaFunction : public MetaFunction {
     switch (args[0].kind()) {
       case Datum::ARRAY:
         if (index_kind == Datum::ARRAY) {
-          return TakeAAA(args[0].array(), args[1].array(), take_opts, ctx);
+          return TakeAAA(args, take_opts, ctx);
         } else if (index_kind == Datum::CHUNKED_ARRAY) {
           return TakeACC(*args[0].make_array(), *args[1].chunked_array(), 
take_opts, ctx);
         }
         break;
       case Datum::CHUNKED_ARRAY:
         if (index_kind == Datum::ARRAY) {
-          return TakeCAC(*args[0].chunked_array(), *args[1].make_array(), 
take_opts, ctx);
+          return TakeCAC(args[0].chunked_array(), *args[1].make_array(), 
take_opts, ctx);
         } else if (index_kind == Datum::CHUNKED_ARRAY) {
-          return TakeCCC(*args[0].chunked_array(), *args[1].chunked_array(), 
take_opts,
+          return TakeCCC(args[0].chunked_array(), args[1].chunked_array(), 
take_opts,
                          ctx);
         }
         break;
@@ -700,12 +703,13 @@ class TakeMetaFunction : public MetaFunction {
         break;
       case Datum::TABLE:
         if (index_kind == Datum::ARRAY) {
-          return TakeTAT(*args[0].table(), *args[1].make_array(), take_opts, 
ctx);
+          return TakeTAT(args[0].table(), *args[1].make_array(), take_opts, 
ctx);
         } else if (index_kind == Datum::CHUNKED_ARRAY) {
-          return TakeTCT(*args[0].table(), *args[1].chunked_array(), 
take_opts, ctx);
+          return TakeTCT(args[0].table(), args[1].chunked_array(), take_opts, 
ctx);
         }
         break;
-      default:
+      case Datum::NONE:
+      case Datum::SCALAR:
         break;
     }
     return Status::NotImplemented(

Reply via email to