This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new c5bce96ba6 GH-37895: [C++] Feature: support concatenate recordbatches. 
(#37896)
c5bce96ba6 is described below

commit c5bce96ba626ad2558e10acddf8ef19ea06940e8
Author: Francis <[email protected]>
AuthorDate: Sat Oct 14 01:57:49 2023 +0800

    GH-37895: [C++] Feature: support concatenate recordbatches. (#37896)
    
    ### Rationale for this change
    
    User scenario: When we use acero plan, many smaller batches may be 
generated through agg and hashjoin. In addition, due to the mpp database, there 
is data distribution. When there are many segments, each segment data is 
compared at this time. Small, in order to improve performance, we hope to merge 
multiple fragmented small batches into one large batch for calculation together.
    
    ### What changes are included in this PR?
    
    record_batch.cc
    record_batch.h
    record_batch_test.cc
    
    ### Are these changes tested?
    
    yes, see record_batch_test.cc
    
    ### Are there any user-facing changes?
    
    yes
    
    * Closes: #37895
    
    Authored-by: light-city <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/record_batch.cc      | 33 +++++++++++++++++++++++++++++++++
 cpp/src/arrow/record_batch.h       | 14 ++++++++++++++
 cpp/src/arrow/record_batch_test.cc | 37 +++++++++++++++++++++++++++++++++++++
 3 files changed, 84 insertions(+)

diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index f0ee295c63..457135fa40 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -25,6 +25,7 @@
 #include <utility>
 
 #include "arrow/array.h"
+#include "arrow/array/concatenate.h"
 #include "arrow/array/validate.h"
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"
@@ -432,4 +433,36 @@ RecordBatchReader::~RecordBatchReader() {
   ARROW_WARN_NOT_OK(this->Close(), "Implicitly called RecordBatchReader::Close 
failed");
 }
 
+Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
+    const RecordBatchVector& batches, MemoryPool* pool) {
+  int64_t length = 0;
+  size_t n = batches.size();
+  if (n == 0) {
+    return Status::Invalid("Must pass at least one recordbatch");
+  }
+  int cols = batches[0]->num_columns();
+  auto schema = batches[0]->schema();
+  for (size_t i = 0; i < batches.size(); ++i) {
+    length += batches[i]->num_rows();
+    if (!schema->Equals(batches[i]->schema())) {
+      return Status::Invalid(
+          "Schema of RecordBatch index ", i, " is ", 
batches[i]->schema()->ToString(),
+          ", which does not match index 0 recordbatch schema: ", 
schema->ToString());
+    }
+  }
+
+  std::vector<std::shared_ptr<Array>> concatenated_columns;
+  concatenated_columns.reserve(cols);
+  for (int col = 0; col < cols; ++col) {
+    ArrayVector column_arrays;
+    column_arrays.reserve(batches.size());
+    for (const auto& batch : batches) {
+      column_arrays.emplace_back(batch->column(col));
+    }
+    ARROW_ASSIGN_OR_RAISE(auto concatenated_column, Concatenate(column_arrays, 
pool))
+    concatenated_columns.emplace_back(std::move(concatenated_column));
+  }
+  return RecordBatch::Make(std::move(schema), length, 
std::move(concatenated_columns));
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index cb1f6d54f7..1a66fc3fb5 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -350,4 +350,18 @@ class ARROW_EXPORT RecordBatchReader {
       Iterator<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<Schema> 
schema);
 };
 
+/// \brief Concatenate record batches
+///
+/// The columns of the new batch are formed by concatenate the same columns of 
each input
+/// batch. Concatenate multiple batches into a new batch requires that the 
schema must be
+/// consistent. It supports merging batches without columns (only length, 
scenarios such
+/// as count(*)).
+///
+/// \param[in] batches a vector of record batches to be concatenated
+/// \param[in] pool memory to store the result will be allocated from this 
memory pool
+/// \return the concatenated record batch
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
+    const RecordBatchVector& batches, MemoryPool* pool = 
default_memory_pool());
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/record_batch_test.cc 
b/cpp/src/arrow/record_batch_test.cc
index bc923a1444..db3a2d3def 100644
--- a/cpp/src/arrow/record_batch_test.cc
+++ b/cpp/src/arrow/record_batch_test.cc
@@ -555,4 +555,41 @@ TEST_F(TestRecordBatch, ReplaceSchema) {
   ASSERT_RAISES(Invalid, b1->ReplaceSchema(schema));
 }
 
+TEST_F(TestRecordBatch, ConcatenateRecordBatches) {
+  int length = 10;
+
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", uint8());
+
+  auto schema = ::arrow::schema({f0, f1});
+
+  random::RandomArrayGenerator gen(42);
+
+  auto b1 = gen.BatchOf(schema->fields(), length);
+
+  length = 5;
+
+  auto b2 = gen.BatchOf(schema->fields(), length);
+
+  ASSERT_OK_AND_ASSIGN(auto batch, ConcatenateRecordBatches({b1, b2}));
+  ASSERT_EQ(batch->num_rows(), b1->num_rows() + b2->num_rows());
+  ASSERT_BATCHES_EQUAL(*batch->Slice(0, b1->num_rows()), *b1);
+  ASSERT_BATCHES_EQUAL(*batch->Slice(b1->num_rows()), *b2);
+
+  f0 = field("fd0", int32());
+  f1 = field("fd1", uint8());
+
+  schema = ::arrow::schema({f0, f1});
+
+  auto b3 = gen.BatchOf(schema->fields(), length);
+
+  ASSERT_RAISES(Invalid, ConcatenateRecordBatches({b1, b3}));
+
+  auto null_batch = RecordBatch::Make(::arrow::schema({}), length,
+                                      
std::vector<std::shared_ptr<ArrayData>>{});
+  ASSERT_OK_AND_ASSIGN(batch, ConcatenateRecordBatches({null_batch}));
+  ASSERT_EQ(batch->num_rows(), null_batch->num_rows());
+  ASSERT_BATCHES_EQUAL(*batch, *null_batch);
+}
+
 }  // namespace arrow

Reply via email to