This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b4a6b122a GH-37170: [C++] Support schema rewriting of RecordBatch.
(#37171)
3b4a6b122a is described below
commit 3b4a6b122a29c1d40623ba22e0c231c5540d84b7
Author: Francis <[email protected]>
AuthorDate: Fri Sep 8 00:10:24 2023 +0800
GH-37170: [C++] Support schema rewriting of RecordBatch. (#37171)
### Rationale for this change
We have a scene. There is a plan in pg that looks like the following. For
the Append node, there are two scans in parallel, and then there is a column of
data, but the column names are different. If it is mapped to the arrow schema
It is a different field. For the append node, we will get two batches. The
first batch comes from the first scan, and the second batch comes from the
second scan, but because the two columns are constructed based on the scan The
schema is different, so the [...]
Therefore, the problem is simplified to: For a node, If there are n child
nodes, the schema of the following child nodes must be consistent. If not, the
schema of n-1 child nodes must be the same as the first schema, so there is
logic to rewrite the schema of the batch data.
```
-> Vec Append
-> Vec Seq Scan on public. tenk1
Output: tenk1.unique1
-> Vec Seq Scan on public.tenk1 tenk1_1
Output: tenk1_1.fivethous
```
However, when reading the batch code, there is only the read-only interface
schema(), so here we submit a pr to add and rewrite the schema interface, and
only modify the columns with the same type. If they are not the same, an
invalid modification will be returned.
backgroud: https://github.com/apache/arrow/issues/37170
### What changes are included in this PR?
- record_batch.h
- record_batch.cc
- record_batch_test.cc
### Are these changes tested?
yes, see record_batch_test.cc.
gtest filter is:
```
TestRecordBatch.RewriteSchema
```
### Are there any user-facing changes?
yes: see background in issue.
* Closes: #37170
Authored-by: light-city <[email protected]>
Signed-off-by: David Li <[email protected]>
---
cpp/src/arrow/record_batch.cc | 19 +++++++++++++++++++
cpp/src/arrow/record_batch.h | 5 +++++
cpp/src/arrow/record_batch_test.cc | 34 ++++++++++++++++++++++++++++++++++
3 files changed, 58 insertions(+)
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 1c5c8912e5..f0ee295c63 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -283,6 +283,25 @@ bool RecordBatch::ApproxEquals(const RecordBatch& other,
const EqualOptions& opt
return true;
}
+Result<std::shared_ptr<RecordBatch>> RecordBatch::ReplaceSchema(
+ std::shared_ptr<Schema> schema) const {
+ if (schema_->num_fields() != schema->num_fields())
+ return Status::Invalid("RecordBatch schema fields", schema_->num_fields(),
+ ", did not match new schema fields: ",
schema->num_fields());
+ auto fields = schema_->fields();
+ int n_fields = static_cast<int>(fields.size());
+ for (int i = 0; i < n_fields; i++) {
+ auto old_type = fields[i]->type();
+ auto replace_type = schema->field(i)->type();
+ if (!old_type->Equals(replace_type)) {
+ return Status::Invalid(
+ "RecordBatch schema field index ", i, " type is ",
old_type->ToString(),
+ ", did not match new schema field type: ", replace_type->ToString());
+ }
+ }
+ return RecordBatch::Make(std::move(schema), num_rows(), columns());
+}
+
Result<std::shared_ptr<RecordBatch>> RecordBatch::SelectColumns(
const std::vector<int>& indices) const {
int n = static_cast<int>(indices.size());
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index d728d5eb0d..cb1f6d54f7 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -114,6 +114,11 @@ class ARROW_EXPORT RecordBatch {
/// \return the record batch's schema
const std::shared_ptr<Schema>& schema() const { return schema_; }
+ /// \brief Replace the schema with another schema with the same types, but
potentially
+ /// different field names and/or metadata.
+ Result<std::shared_ptr<RecordBatch>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const;
+
/// \brief Retrieve all columns at once
virtual const std::vector<std::shared_ptr<Array>>& columns() const = 0;
diff --git a/cpp/src/arrow/record_batch_test.cc
b/cpp/src/arrow/record_batch_test.cc
index e8180c6740..bc923a1444 100644
--- a/cpp/src/arrow/record_batch_test.cc
+++ b/cpp/src/arrow/record_batch_test.cc
@@ -521,4 +521,38 @@ TEST_F(TestRecordBatchReader, ToTable) {
ASSERT_EQ(table->column(0)->chunks().size(), 0);
}
+TEST_F(TestRecordBatch, ReplaceSchema) {
+ const int length = 10;
+
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", uint8());
+ auto f2 = field("f2", int16());
+ auto f3 = field("f3", int8());
+
+ auto schema = ::arrow::schema({f0, f1, f2});
+
+ random::RandomArrayGenerator gen(42);
+
+ auto a0 = gen.ArrayOf(int32(), length);
+ auto a1 = gen.ArrayOf(uint8(), length);
+ auto a2 = gen.ArrayOf(int16(), length);
+
+ auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
+
+ f0 = field("fd0", int32());
+ f1 = field("fd1", uint8());
+ f2 = field("fd2", int16());
+
+ schema = ::arrow::schema({f0, f1, f2});
+ ASSERT_OK_AND_ASSIGN(auto mutated, b1->ReplaceSchema(schema));
+ auto expected = RecordBatch::Make(schema, length, b1->columns());
+ ASSERT_TRUE(mutated->Equals(*expected));
+
+ schema = ::arrow::schema({f0, f1, f3});
+ ASSERT_RAISES(Invalid, b1->ReplaceSchema(schema));
+
+ schema = ::arrow::schema({f0, f1});
+ ASSERT_RAISES(Invalid, b1->ReplaceSchema(schema));
+}
+
} // namespace arrow