This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new f57b12d7a feat(python): pure python row-columar convert (#2919)
f57b12d7a is described below
commit f57b12d7a407997373ff2d18ff56da5f741d70b1
Author: Shawn Yang <[email protected]>
AuthorDate: Sat Nov 22 10:17:21 2025 +0800
feat(python): pure python row-columar convert (#2919)
## Why?
<!-- Describe the purpose of this PR. -->
## What does this PR do?
This is first step to remove apache arrow from fory c++. I created pure
python implermentation to ocnvert fory row to arrow format
## Related issues
#2906
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fory/issues/new/choose) describing the
need to do so and update the document if necessary.
Delete section if not applicable.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
Delete section if not applicable.
-->
---
cpp/fory/BUILD | 1 -
cpp/fory/columnar/BUILD | 34 ---
cpp/fory/columnar/arrow_writer.cc | 235 ----------------
cpp/fory/columnar/arrow_writer.h | 310 ---------------------
cpp/fory/columnar/arrow_writer_test.cc | 195 --------------
cpp/fory/columnar/convert_test.cc | 192 -------------
python/pyfory/format/__init__.py | 4 +-
python/pyfory/format/_format.pyx | 3 -
python/pyfory/format/columnar.py | 475 +++++++++++++++++++++++++++++++++
python/pyfory/format/vectorized.pxi | 49 ----
python/pyfory/includes/libformat.pxd | 15 --
11 files changed, 478 insertions(+), 1035 deletions(-)
diff --git a/cpp/fory/BUILD b/cpp/fory/BUILD
index 8e5690b59..ae443419e 100644
--- a/cpp/fory/BUILD
+++ b/cpp/fory/BUILD
@@ -5,7 +5,6 @@ cc_library(
deps = [
"@local_config_pyarrow//:arrow",
"//cpp/fory/row:fory_row_format",
- "//cpp/fory/columnar:fory_columnar_format",
],
visibility = ["//visibility:public"],
)
diff --git a/cpp/fory/columnar/BUILD b/cpp/fory/columnar/BUILD
deleted file mode 100644
index 0cd7b5c9c..000000000
--- a/cpp/fory/columnar/BUILD
+++ /dev/null
@@ -1,34 +0,0 @@
-load("@rules_cc//cc:defs.bzl", "cc_library", "cc_test")
-
-cc_library(
- name = "fory_columnar_format",
- srcs = ["arrow_writer.cc"],
- hdrs = ["arrow_writer.h"],
- strip_include_prefix = "/cpp",
- deps = [
- "@local_config_pyarrow//:arrow", "//cpp/fory/util:fory_util",
"//cpp/fory/row:fory_row_format"
- ],
- visibility = ["//visibility:public"],
-)
-
-cc_test(
- name = "arrow_writer_test",
- srcs = [
- "arrow_writer_test.cc",
- ],
- deps = [
- ":fory_columnar_format",
- "@com_google_googletest//:gtest",
- ],
-)
-
-cc_test(
- name = "convert_test",
- srcs = [
- "convert_test.cc",
- ],
- deps = [
- ":fory_columnar_format",
- "@com_google_googletest//:gtest",
- ],
-)
diff --git a/cpp/fory/columnar/arrow_writer.cc
b/cpp/fory/columnar/arrow_writer.cc
deleted file mode 100644
index db771ea57..000000000
--- a/cpp/fory/columnar/arrow_writer.cc
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "fory/columnar/arrow_writer.h"
-
-namespace fory {
-namespace columnar {
-
-::arrow::Status
-ArrowWriter::Make(const std::shared_ptr<::arrow::Schema> &arrow_schema,
- ::arrow::MemoryPool *pool,
- std::shared_ptr<ArrowWriter> *writer) {
- auto out = std::shared_ptr<ArrowWriter>(new ArrowWriter(arrow_schema, pool));
- for (auto &field : arrow_schema->fields()) {
- std::unique_ptr<ArrowArrayWriter> array_writer;
- RETURN_NOT_OK(createArrayWriter(field->type(), pool, &array_writer));
- out->column_writers_.push_back(std::move(array_writer));
- }
- *writer = out;
- return arrow::Status::OK();
-}
-
-::arrow::Status ArrowWriter::createArrayWriter(
- const std::shared_ptr<::arrow::DataType> &type, ::arrow::MemoryPool *pool,
- std::unique_ptr<ArrowArrayWriter> *arrow_array_writer) {
- ArrowArrayWriter *writer;
- switch (type->id()) {
- case ::arrow::Type::BOOL:
- writer = new BooleanWriter(pool);
- break;
- case ::arrow::Type::INT8:
- writer = new Int8Writer(pool);
- break;
- case ::arrow::Type::INT16:
- writer = new Int16Writer(pool);
- break;
- case ::arrow::Type::INT32:
- writer = new Int32Writer(pool);
- break;
- case ::arrow::Type::INT64:
- writer = new Int64Writer(pool);
- break;
- case ::arrow::Type::FLOAT:
- writer = new FloatWriter(pool);
- break;
- case ::arrow::Type::DOUBLE:
- writer = new DoubleWriter(pool);
- break;
- case ::arrow::Type::DECIMAL:
- return ::arrow::Status::NotImplemented("Unsupported type",
- type->ToString());
- case ::arrow::Type::DATE32:
- writer = new DateWriter(pool);
- break;
- case ::arrow::Type::TIME32:
- writer = new Time32Writer(type, pool);
- break;
- case ::arrow::Type::TIME64:
- writer = new Time64Writer(type, pool);
- break;
- case ::arrow::Type::TIMESTAMP:
- writer = new TimestampWriter(type, pool);
- break;
- case ::arrow::Type::BINARY:
- writer = new BinaryWriter(pool);
- break;
- case ::arrow::Type::STRING:
- writer = new StringWriter(pool);
- break;
- case ::arrow::Type::LIST: {
- std::unique_ptr<ArrowArrayWriter> elem_writer;
- RETURN_NOT_OK(createArrayWriter(
- std::dynamic_pointer_cast<::arrow::ListType>(type)->value_type(), pool,
- &elem_writer));
- writer = new ListWriter(type, pool, std::move(elem_writer));
- break;
- }
- case ::arrow::Type::MAP: {
- std::unique_ptr<ArrowArrayWriter> key_writer;
- RETURN_NOT_OK(createArrayWriter(
- std::dynamic_pointer_cast<::arrow::MapType>(type)->key_type(), pool,
- &key_writer));
- std::unique_ptr<ArrowArrayWriter> value_writer;
- RETURN_NOT_OK(createArrayWriter(
- std::dynamic_pointer_cast<::arrow::MapType>(type)->item_type(), pool,
- &value_writer));
- writer = new MapWriter(type, pool, std::move(key_writer),
- std::move(value_writer));
- break;
- }
- case ::arrow::Type::STRUCT: {
- std::vector<std::unique_ptr<ArrowArrayWriter>> field_writers;
- auto struct_type = std::dynamic_pointer_cast<::arrow::StructType>(type);
- for (auto &field : struct_type->fields()) {
- std::unique_ptr<ArrowArrayWriter> field_writer;
- RETURN_NOT_OK(createArrayWriter(field->type(), pool, &field_writer));
- field_writers.push_back(std::move(field_writer));
- }
- writer = new StructWriter(type, pool, std::move(field_writers));
- break;
- }
- default:
- return ::arrow::Status::NotImplemented("Unsupported type",
- type->ToString());
- }
- *arrow_array_writer = std::unique_ptr<ArrowArrayWriter>(writer);
- return ::arrow::Status::OK();
-}
-
-::arrow::Status ArrowWriter::Write(const std::shared_ptr<Row> &row) {
- int num_fields = row->num_fields();
- for (int i = 0; i < num_fields; ++i) {
- auto &field_writer = column_writers_[i];
- RETURN_NOT_OK(field_writer->Write(row, i));
- }
- num_rows_++;
- return ::arrow::Status::OK();
-}
-
-::arrow::Status
-ArrowWriter::Finish(std::shared_ptr<::arrow::RecordBatch> *record_batch) {
- std::vector<std::shared_ptr<::arrow::Array>> columns;
- for (auto &array_writer : column_writers_) {
- std::shared_ptr<::arrow::Array> array;
- RETURN_NOT_OK(array_writer->Finish(&array));
- columns.push_back(array);
- }
- *record_batch = ::arrow::RecordBatch::Make(arrow_schema_, num_rows_,
columns);
- return ::arrow::Status::OK();
-}
-
-void ArrowWriter::Reset() {
- num_rows_ = 0;
- for (auto &array_writer : column_writers_) {
- array_writer->Reset();
- }
-}
-
-::arrow::Status
-ArrowArrayWriter::Write(const std::shared_ptr<fory::Getter> &getter, int i) {
- ::arrow::Status status;
- if (getter->IsNullAt(i)) {
- status = AppendNull();
- } else {
- status = AppendValue(getter, i);
- }
- return status;
-}
-
-ListWriter::ListWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool,
- std::unique_ptr<ArrowArrayWriter> elem_writer) {
- builder_ = std::make_shared<::arrow::ListBuilder>(
- pool, elem_writer->builder(), type);
- elem_writer_ = std::move(elem_writer);
-}
-
-::arrow::Status ListWriter::AppendValue(std::shared_ptr<fory::Getter> getter,
- int i) {
- auto array = getter->GetArray(i);
- RETURN_NOT_OK(builder_->Append());
- auto num_elements = array->num_elements();
- for (int x = 0; x < num_elements; ++x) {
- RETURN_NOT_OK(elem_writer_->Write(array, x));
- }
- return ::arrow::Status::OK();
-}
-
-::arrow::Status StructWriter::AppendValue(std::shared_ptr<fory::Getter> getter,
- int i) {
- auto struct_data = getter->GetStruct(i);
- auto num_fields = struct_data->num_fields();
- RETURN_NOT_OK(builder_->Append());
- for (int x = 0; x < num_fields; ++x) {
- RETURN_NOT_OK(field_writers_[x]->Write(struct_data, x));
- }
- return ::arrow::Status::OK();
-}
-
-StructWriter::StructWriter(
- const std::shared_ptr<::arrow::DataType> &type, ::arrow::MemoryPool *pool,
- std::vector<std::unique_ptr<ArrowArrayWriter>> &&field_writers) {
- std::vector<std::shared_ptr<::arrow::ArrayBuilder>> field_builders;
- field_builders.reserve(field_writers.size());
- for (auto &field_writer : field_writers) {
- field_builders.push_back(field_writer->builder());
- }
- builder_ = std::make_shared<::arrow::StructBuilder>(
- type, pool, std::move(field_builders));
- field_writers_ = std::move(field_writers);
-}
-
-MapWriter::MapWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool,
- std::unique_ptr<ArrowArrayWriter> key_writer,
- std::unique_ptr<ArrowArrayWriter> item_writer) {
- builder_ = std::make_shared<::arrow::MapBuilder>(
- pool, key_writer->builder(), item_writer->builder(), type);
- key_writer_ = std::move(key_writer);
- item_writer_ = std::move(item_writer);
-}
-
-::arrow::Status MapWriter::AppendValue(std::shared_ptr<fory::Getter> getter,
- int i) {
- auto map = getter->GetMap(i);
- auto key_array = map->keys_array();
- auto value_array = map->values_array();
- RETURN_NOT_OK(builder_->Append());
- auto num_elements = map->num_elements();
- for (int i = 0; i < num_elements; ++i) {
- RETURN_NOT_OK(key_writer_->Write(key_array, i));
- RETURN_NOT_OK(item_writer_->Write(value_array, i));
- }
-
- return ::arrow::Status::OK();
-}
-
-} // namespace columnar
-} // namespace fory
diff --git a/cpp/fory/columnar/arrow_writer.h b/cpp/fory/columnar/arrow_writer.h
deleted file mode 100644
index dc6579689..000000000
--- a/cpp/fory/columnar/arrow_writer.h
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include "arrow/api.h"
-#include "fory/row/row.h"
-#include "fory/util/logging.h"
-#include <utility>
-
-namespace fory {
-namespace columnar {
-
-class ArrowArrayWriter;
-
-class ArrowWriter {
-public:
- static ::arrow::Status
- Make(const std::shared_ptr<arrow::Schema> &arrow_schema,
- ::arrow::MemoryPool *pool, std::shared_ptr<ArrowWriter> *writer);
-
- ::arrow::Status Write(const std::shared_ptr<Row> &row);
-
- ::arrow::Status Finish(std::shared_ptr<::arrow::RecordBatch> *record_batch);
-
- void Reset();
-
-private:
- explicit ArrowWriter(std::shared_ptr<::arrow::Schema> arrow_schema,
- ::arrow::MemoryPool *pool)
- : pool_(pool), arrow_schema_(std::move(arrow_schema)) {
- FORY_CHECK(pool_ != nullptr);
- }
-
- static ::arrow::Status
- createArrayWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool,
- std::unique_ptr<ArrowArrayWriter> *arrow_array_writer);
-
- ::arrow::MemoryPool *pool_;
- const std::shared_ptr<::arrow::Schema> arrow_schema_;
- std::vector<std::unique_ptr<ArrowArrayWriter>> column_writers_;
- int num_rows_ = 0;
-};
-
-class ArrowArrayWriter {
-public:
- virtual ~ArrowArrayWriter() = default;
-
- ::arrow::Status Write(const std::shared_ptr<fory::Getter> &getter, int i);
-
- virtual ::arrow::Status AppendNull() { return builder()->AppendNull(); }
-
- virtual ::arrow::Status AppendValue(std::shared_ptr<Getter> getter,
- int i) = 0;
-
- virtual ::arrow::Status Finish(std::shared_ptr<::arrow::Array> *array) {
- return builder()->Finish(array);
- }
-
- virtual void Reset() { builder()->Reset(); }
-
- virtual std::shared_ptr<::arrow::ArrayBuilder> builder() = 0;
-};
-
-#define _NUMERIC_TYPE_WRITER_DECL(KLASS)
\
- class KLASS##Writer : public ArrowArrayWriter {
\
- public:
\
- explicit KLASS##Writer(::arrow::MemoryPool *pool)
\
- : builder_(std::make_shared<::arrow::KLASS##Builder>(pool)){};
\
- ::arrow::Status AppendNull() override { return builder_->AppendNull(); }
\
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter,
\
- int i) override {
\
- return builder_->Append(getter->Get##KLASS(i));
\
- }
\
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
\
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
\
- }
\
-
\
- private:
\
- std::shared_ptr<::arrow::KLASS##Builder> builder_;
\
- };
-
-_NUMERIC_TYPE_WRITER_DECL(Int8)
-
-_NUMERIC_TYPE_WRITER_DECL(Int16)
-
-_NUMERIC_TYPE_WRITER_DECL(Int32)
-
-_NUMERIC_TYPE_WRITER_DECL(Int64)
-
-_NUMERIC_TYPE_WRITER_DECL(Float)
-
-_NUMERIC_TYPE_WRITER_DECL(Double)
-
-class BooleanWriter : public ArrowArrayWriter {
-public:
- explicit BooleanWriter(::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::BooleanBuilder>(pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- return builder_->Append(getter->GetBoolean(i));
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::BooleanBuilder> builder_;
-};
-
-class DateWriter : public ArrowArrayWriter {
-public:
- explicit DateWriter(::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::Date32Builder>(pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- return builder_->Append(getter->GetInt32(i));
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::Date32Builder> builder_;
-};
-
-class Time32Writer : public ArrowArrayWriter {
-public:
- explicit Time32Writer(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::Time32Builder>(type, pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- return builder_->Append(getter->GetInt32(i));
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::Time32Builder> builder_;
-};
-
-class Time64Writer : public ArrowArrayWriter {
-public:
- explicit Time64Writer(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::Time64Builder>(type, pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- return builder_->Append(getter->GetInt64(i));
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::Time64Builder> builder_;
-};
-
-class TimestampWriter : public ArrowArrayWriter {
-public:
- explicit TimestampWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::TimestampBuilder>(type, pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- return builder_->Append(getter->GetInt64(i));
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::TimestampBuilder> builder_;
-};
-
-class BinaryWriter : public ArrowArrayWriter {
-public:
- explicit BinaryWriter(::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::BinaryBuilder>(pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- uint8_t *bytes;
- int size = getter->GetBinary(i, &bytes);
- return builder_->Append(bytes, size);
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::BinaryBuilder> builder_;
-};
-
-class StringWriter : public ArrowArrayWriter {
-public:
- explicit StringWriter(::arrow::MemoryPool *pool)
- : builder_(std::make_shared<::arrow::StringBuilder>(pool)) {}
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override {
- uint8_t *bytes;
- int size = getter->GetBinary(i, &bytes);
- return builder_->Append(bytes, size);
- }
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
-private:
- std::shared_ptr<::arrow::StringBuilder> builder_;
-};
-
-class ListWriter : public ArrowArrayWriter {
-public:
- ListWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool,
- std::unique_ptr<ArrowArrayWriter> elem_writer);
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override;
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
- void Reset() override {
- builder_->Reset();
- elem_writer_->Reset();
- }
-
-protected:
- std::shared_ptr<::arrow::ListBuilder> builder_;
- std::unique_ptr<ArrowArrayWriter> elem_writer_;
-};
-
-class StructWriter : public ArrowArrayWriter {
-public:
- StructWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool,
- std::vector<std::unique_ptr<ArrowArrayWriter>> &&field_writers);
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override;
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
- void Reset() override {
- builder_->Reset();
- for (auto &array_writer : field_writers_) {
- array_writer->Reset();
- }
- }
-
-private:
- std::shared_ptr<::arrow::StructBuilder> builder_;
- std::vector<std::unique_ptr<ArrowArrayWriter>> field_writers_;
-};
-
-class MapWriter : public ArrowArrayWriter {
-public:
- MapWriter(const std::shared_ptr<::arrow::DataType> &type,
- ::arrow::MemoryPool *pool,
- std::unique_ptr<ArrowArrayWriter> key_writer,
- std::unique_ptr<ArrowArrayWriter> item_writer);
-
- ::arrow::Status AppendValue(std::shared_ptr<Getter> getter, int i) override;
-
- std::shared_ptr<::arrow::ArrayBuilder> builder() override {
- return std::static_pointer_cast<::arrow::ArrayBuilder>(builder_);
- }
-
- void Reset() override {
- builder_->Reset();
- key_writer_->Reset();
- item_writer_->Reset();
- }
-
-private:
- std::shared_ptr<::arrow::MapBuilder> builder_;
- std::unique_ptr<ArrowArrayWriter> key_writer_;
- std::unique_ptr<ArrowArrayWriter> item_writer_;
-};
-
-} // namespace columnar
-} // namespace fory
diff --git a/cpp/fory/columnar/arrow_writer_test.cc
b/cpp/fory/columnar/arrow_writer_test.cc
deleted file mode 100644
index a9bd22866..000000000
--- a/cpp/fory/columnar/arrow_writer_test.cc
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "gtest/gtest.h"
-
-#include "fory/columnar/arrow_writer.h"
-#include "fory/row/writer.h"
-#include <limits>
-#include <memory>
-#include <vector>
-
-namespace fory {
-namespace columnar {
-
-TEST(ARROW_WRITER, BASE_TYPE_TO_RECORD_BATCH) {
- auto f1 = arrow::field("f0", arrow::int32());
- auto f2 = arrow::field("f1", arrow::int64());
- auto f3 = arrow::field("f2", arrow::float64());
- std::vector<std::shared_ptr<arrow::Field>> fields = {f1, f2, f3};
- auto schema = arrow::schema(fields);
- std::vector<std::shared_ptr<Row>> rows;
- int row_nums = 100;
- RowWriter row_writer(schema);
- for (int i = 0; i < row_nums; ++i) {
- std::shared_ptr<Buffer> buffer;
- AllocateBuffer(16, &buffer);
- row_writer.SetBuffer(buffer);
- row_writer.Reset();
- row_writer.Write(0, std::numeric_limits<int>::max());
- row_writer.Write(1, std::numeric_limits<int64_t>::max());
- row_writer.Write(2, std::numeric_limits<double>::max());
- rows.push_back(row_writer.ToRow());
- }
-
- std::shared_ptr<ArrowWriter> arrow_writer;
- EXPECT_TRUE(
- ArrowWriter::Make(schema, ::arrow::default_memory_pool(), &arrow_writer)
- .ok());
- for (auto &row : rows) {
- EXPECT_TRUE(arrow_writer->Write(row).ok());
- }
- std::shared_ptr<::arrow::RecordBatch> record_batch;
- EXPECT_TRUE(arrow_writer->Finish(&record_batch).ok());
- EXPECT_TRUE(record_batch->Validate().ok());
- EXPECT_EQ(record_batch->num_columns(), schema->num_fields());
- EXPECT_EQ(record_batch->num_rows(), row_nums);
- // std::cout << record_batch->column(0)->ToString() << std::endl;
-}
-
-TEST(ARROW_WRITER, ARRAY_TYPE_TO_RECORD_BATCH) {
- auto f0 = arrow::field("f0", arrow::int32());
- auto f1 = arrow::field("f1", arrow::list(arrow::int32()));
- auto schema = arrow::schema({f0, f1});
- std::vector<std::shared_ptr<Row>> rows;
- int row_nums = 100;
- RowWriter row_writer(schema);
- ArrayWriter array_writer(
- std::dynamic_pointer_cast<arrow::ListType>(schema->field(1)->type()),
- &row_writer);
- for (int i = 0; i < row_nums; ++i) {
- std::shared_ptr<Buffer> buffer;
- AllocateBuffer(16, &buffer);
- row_writer.SetBuffer(buffer);
- row_writer.Reset();
- row_writer.Write(0, std::numeric_limits<int>::max());
-
- int start = row_writer.cursor();
- int array_elements = 50;
- array_writer.Reset(array_elements);
- for (int j = 0; j < array_elements; ++j) {
- array_writer.Write(j, std::numeric_limits<int>::min());
- }
- row_writer.SetOffsetAndSize(1, start, row_writer.cursor() - start);
- rows.push_back(row_writer.ToRow());
- }
- // std::cout << rows[0]->ToString() << std::endl;
-
- std::shared_ptr<ArrowWriter> arrow_writer;
- EXPECT_TRUE(
- ArrowWriter::Make(schema, ::arrow::default_memory_pool(), &arrow_writer)
- .ok());
- for (auto &row : rows) {
- EXPECT_TRUE(arrow_writer->Write(row).ok());
- }
- std::shared_ptr<::arrow::RecordBatch> record_batch;
- EXPECT_TRUE(arrow_writer->Finish(&record_batch).ok());
- EXPECT_TRUE(record_batch->Validate().ok());
- EXPECT_EQ(record_batch->num_columns(), schema->num_fields());
- EXPECT_EQ(record_batch->num_rows(), row_nums);
- // std::cout << record_batch->column(1)->ToString() << std::endl;
-}
-
-typedef struct {
- std::vector<std::vector<int>> indexes;
- std::vector<int> values;
- std::vector<int> shape;
-} SparseTensor;
-
-TEST(ARROW_WRITER, SPARSE_TENSOR_TO_RECORD_BATCH) {
- auto indexes =
- arrow::field("indexes", arrow::list(arrow::list(arrow::int32())));
- auto values = arrow::field("values", arrow::list(arrow::int32()));
- auto shape = arrow::field("shape", arrow::list(arrow::int32()));
- auto sparse_tensor_type = arrow::struct_({indexes, values, shape});
- auto sparse_tensor_field = arrow::field("sparse_tensor", sparse_tensor_type);
- auto schema = arrow::schema({sparse_tensor_field});
-
- std::vector<std::shared_ptr<Row>> rows;
- RowWriter row_writer(schema);
- auto struct_type =
- std::dynamic_pointer_cast<arrow::StructType>(schema->field(0)->type());
- RowWriter tensor_struct_writer(arrow::schema(struct_type->fields()),
- &row_writer);
- ArrayWriter indexes_writer(
-
std::dynamic_pointer_cast<arrow::ListType>(struct_type->field(0)->type()),
- &tensor_struct_writer);
- ArrayWriter values_writer(
-
std::dynamic_pointer_cast<arrow::ListType>(struct_type->field(1)->type()),
- &tensor_struct_writer);
- int row_nums = 100;
- int tensor_values_num = 3;
- SparseTensor sparse_tensor = {{{0, 0}, {1, 1}, {1, 2}}, {1, 2, 3}, {3, 4}};
- for (int i = 0; i < row_nums; ++i) {
- std::shared_ptr<Buffer> buffer;
- AllocateBuffer(16, &buffer);
- row_writer.SetBuffer(buffer);
- row_writer.Reset();
-
- auto tensor_struct_start = row_writer.cursor();
- tensor_struct_writer.Reset();
-
- auto indexes_start = indexes_writer.cursor();
- indexes_writer.Reset(tensor_values_num);
- for (int j = 0; j < tensor_values_num; ++j) {
- indexes_writer.WriteArray(j, ArrayData::From(sparse_tensor.indexes[j]));
- }
- // std::cout << indexes_writer.CopyToArrayData()->ToString() << std::endl;
- tensor_struct_writer.SetOffsetAndSize(
- 0, indexes_start, tensor_struct_writer.cursor() - tensor_struct_start);
-
- auto tensor_values_start = values_writer.cursor();
- values_writer.Reset(tensor_values_num);
- for (int j = 0; j < tensor_values_num; ++j) {
- values_writer.Write(j, sparse_tensor.values[j]);
- }
- // std::cout << values_writer.CopyToArrayData()->ToString() << std::endl;
- tensor_struct_writer.SetOffsetAndSize(1, tensor_values_start,
- tensor_struct_writer.cursor() -
- tensor_struct_start);
-
- tensor_struct_writer.WriteArray(2, ArrayData::From(sparse_tensor.shape));
-
- row_writer.SetOffsetAndSize(0, tensor_struct_start,
- row_writer.cursor() - tensor_struct_start);
- rows.push_back(row_writer.ToRow());
- }
- // std::cout << rows[0]->ToString() << std::endl;
- std::shared_ptr<ArrowWriter> arrow_writer;
- EXPECT_TRUE(
- ArrowWriter::Make(schema, ::arrow::default_memory_pool(), &arrow_writer)
- .ok());
- for (auto &row : rows) {
- EXPECT_TRUE(arrow_writer->Write(row).ok());
- }
- std::shared_ptr<::arrow::RecordBatch> record_batch;
- EXPECT_TRUE(arrow_writer->Finish(&record_batch).ok());
- EXPECT_TRUE(record_batch->Validate().ok());
- EXPECT_EQ(record_batch->num_columns(), schema->num_fields());
- EXPECT_EQ(record_batch->num_rows(), row_nums);
- // std::cout << record_batch->column(0)->ToString() << std::endl;
-}
-
-} // namespace columnar
-} // namespace fory
-
-int main(int argc, char **argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-}
diff --git a/cpp/fory/columnar/convert_test.cc
b/cpp/fory/columnar/convert_test.cc
deleted file mode 100644
index 98fcf234c..000000000
--- a/cpp/fory/columnar/convert_test.cc
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <cstdint>
-#include <iostream>
-#include <vector>
-
-#include "gtest/gtest.h"
-#include <arrow/api.h>
-
-using arrow::DoubleBuilder;
-using arrow::Int64Builder;
-using arrow::ListBuilder;
-
-// While we want to use columnar data structures to build efficient operations,
-// we often receive data in a row-wise fashion from other systems. In the
-// following, we want give a brief introduction into the classes provided by
-// Apache Arrow by showing how to transform row-wise data into a columnar
table.
-//
-// The data in this example is stored in the following struct:
-struct data_row {
- int64_t id;
- double cost;
- std::vector<double> cost_components;
-};
-
-// Transforming a vector of structs into a columnar Table.
-//
-// The final representation should be an `arrow::Table` which in turn
-// is made up of an `arrow::Schema` and a list of
-// `arrow::ChunkedArray` instances. As the first step, we will iterate
-// over the data and build up the arrays incrementally. For this
-// task, we provide `arrow::ArrayBuilder` classes that help in the
-// construction of the final `arrow::Array` instances.
-//
-// For each type, Arrow has a specially typed builder class. For the primitive
-// values `id` and `cost` we can use the respective `arrow::Int64Builder` and
-// `arrow::DoubleBuilder`. For the `cost_components` vector, we need to have
two
-// builders, a top-level `arrow::ListBuilder` that builds the array of offsets
-// and a nested `arrow::DoubleBuilder` that constructs the underlying values
-// array that is referenced by the offsets in the former array.
-arrow::Status VectorToColumnarTable(const std::vector<struct data_row> &rows,
- std::shared_ptr<arrow::Table> *table) {
- // The builders are more efficient using
- // arrow::jemalloc::MemoryPool::default_pool() as this can increase the size
- // of the underlying memory regions in-place. At the moment, arrow::jemalloc
- // is only supported on Unix systems, not Windows.
- arrow::MemoryPool *pool = arrow::default_memory_pool();
-
- Int64Builder id_builder(pool);
- DoubleBuilder cost_builder(pool);
- ListBuilder components_builder(pool, std::make_shared<DoubleBuilder>(pool));
- // The following builder is owned by components_builder.
- DoubleBuilder &cost_components_builder =
- *(static_cast<DoubleBuilder *>(components_builder.value_builder()));
-
- // Now we can loop over our existing data and insert it into the builders.
The
- // `Append` calls here may fail (e.g. we cannot allocate enough additional
- // memory). Thus we need to check their return values. For more information
on
- // these values, check the documentation about `arrow::Status`.
- for (const data_row &row : rows) {
- ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
- ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost));
-
- // Indicate the start of a new list row. This will memorise the current
- // offset in the values builder.
- ARROW_RETURN_NOT_OK(components_builder.Append());
- // Store the actual values. The final nullptr argument tells the underlying
- // builder that all added values are valid, i.e. non-null.
- ARROW_RETURN_NOT_OK(cost_components_builder.AppendValues(
- row.cost_components.data(), row.cost_components.size()));
- }
-
- // At the end, we finalise the arrays, declare the (type) schema and combine
- // them into a single `arrow::Table`:
- std::shared_ptr<arrow::Array> id_array;
- ARROW_RETURN_NOT_OK(id_builder.Finish(&id_array));
- std::shared_ptr<arrow::Array> cost_array;
- ARROW_RETURN_NOT_OK(cost_builder.Finish(&cost_array));
- // No need to invoke cost_components_builder.Finish because it is implied by
- // the parent builder's Finish invocation.
- std::shared_ptr<arrow::Array> cost_components_array;
- ARROW_RETURN_NOT_OK(components_builder.Finish(&cost_components_array));
-
- std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
- arrow::field("id", arrow::int64()),
- arrow::field("cost", arrow::float64()),
- arrow::field("cost_components", arrow::list(arrow::float64()))};
-
- auto schema = std::make_shared<arrow::Schema>(schema_vector);
-
- // The final `table` variable is the one we then can pass on to other
- // functions that can consume Apache Arrow memory structures. This object has
- // ownership of all referenced data, thus we don't have to care about
- // undefined references once we leave the scope of the function building the
- // table and its underlying arrays.
- *table =
- arrow::Table::Make(schema, {id_array, cost_array,
cost_components_array});
-
- return arrow::Status::OK();
-}
-
-arrow::Status ColumnarTableToVector(const std::shared_ptr<arrow::Table> &table,
- std::vector<struct data_row> *rows) {
- // To convert an Arrow table back into the same row-wise representation as in
- // the above section, we first will check that the table conforms to our
- // expected schema and then will build up the vector of rows incrementally.
- //
- // For the check if the table is as expected, we can utilise solely its
- // schema.
- std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
- arrow::field("id", arrow::int64()),
- arrow::field("cost", arrow::float64()),
- arrow::field("cost_components", arrow::list(arrow::float64()))};
- auto expected_schema = std::make_shared<arrow::Schema>(schema_vector);
-
- if (!expected_schema->Equals(*table->schema())) {
- // The table doesn't have the expected schema thus we cannot directly
- // convert it to our target representation.
- return arrow::Status::Invalid("Schemas are not matching!");
- }
-
- // As we have ensured that the table has the expected structure, we can
unpack
- // the underlying arrays. For the primitive columns `id` and `cost` we can
use
- // the high level functions to get the values whereas for the nested column
- // `cost_components` we need to access the C-pointer to the data to copy its
- // contents into the resulting `std::vector<double>`. Here we need to be care
- // to also add the offset to the pointer. This offset is needed to enable
- // zero-copy slicing operations. While this could be adjusted automatically
- // for double arrays, this cannot be done for the accompanying bitmap as
often
- // the slicing border would be inside a byte.
-
- auto ids =
- std::static_pointer_cast<arrow::Int64Array>(table->column(0)->chunk(0));
- auto costs =
- std::static_pointer_cast<arrow::DoubleArray>(table->column(1)->chunk(0));
- auto cost_components =
- std::static_pointer_cast<arrow::ListArray>(table->column(2)->chunk(0));
- auto cost_components_values =
- std::static_pointer_cast<arrow::DoubleArray>(cost_components->values());
- // To enable zero-copy slices, the native values pointer might need to
account
- // for this slicing offset. This is not needed for the higher level functions
- // like Value(…) that already account for this offset internally.
- const double *ccv_ptr = cost_components_values->data()->GetValues<double>(1);
-
- for (int64_t i = 0; i < table->num_rows(); i++) {
- // Another simplification in this example is that we assume that there are
- // no null entries, e.g. each row is fill with valid values.
- int64_t id = ids->Value(i);
- double cost = costs->Value(i);
- const double *first = ccv_ptr + cost_components->value_offset(i);
- const double *last = ccv_ptr + cost_components->value_offset(i + 1);
- std::vector<double> components_vec(first, last);
- rows->push_back({id, cost, components_vec});
- }
-
- return arrow::Status::OK();
-}
-
-TEST(ConvertTest, convert) {
- std::vector<data_row> rows = {
- {1, 1.0, {1.0}}, {2, 2.0, {1.0, 2.0}}, {3, 3.0, {1.0, 2.0, 3.0}}};
-
- std::shared_ptr<arrow::Table> table;
- ASSERT_TRUE(VectorToColumnarTable(rows, &table).ok());
-
- std::vector<data_row> expected_rows;
- ASSERT_TRUE(ColumnarTableToVector(table, &expected_rows).ok());
-
- EXPECT_EQ(rows.size(), expected_rows.size());
-}
-
-int main(int argc, char **argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-}
diff --git a/python/pyfory/format/__init__.py b/python/pyfory/format/__init__.py
index f6fd1d8f5..3b2971020 100644
--- a/python/pyfory/format/__init__.py
+++ b/python/pyfory/format/__init__.py
@@ -28,8 +28,10 @@ try:
from pyfory.format._format import ( # noqa: F401 # pylint:
disable=unused-import
create_row_encoder,
RowData,
- ArrowWriter,
) # noqa: E402
+ from pyfory.format.columnar import ( # noqa: F401 # pylint:
disable=unused-import
+ ArrowWriter,
+ )
from pyfory.format.infer import ( # noqa: F401 # pylint:
disable=unused-import
get_cls_by_schema,
remove_schema,
diff --git a/python/pyfory/format/_format.pyx b/python/pyfory/format/_format.pyx
index e7380b649..7de8bc5c5 100644
--- a/python/pyfory/format/_format.pyx
+++ b/python/pyfory/format/_format.pyx
@@ -33,6 +33,3 @@ include "row.pxi"
# Python encoder for row format
include "encoder.pxi"
-
-# Interoperability between row and columnar format
-include "vectorized.pxi"
diff --git a/python/pyfory/format/columnar.py b/python/pyfory/format/columnar.py
new file mode 100644
index 000000000..3ed811898
--- /dev/null
+++ b/python/pyfory/format/columnar.py
@@ -0,0 +1,475 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Pure Python implementation for converting Fory row format to Apache Arrow
+columnar format. This module provides ArrowWriter class that accumulates
+row data and converts it to Arrow RecordBatch.
+"""
+
+import pyarrow as pa
+from pyarrow import types
+
+
+def _create_array_appender(data_type):
+ """Factory function to create appropriate array appender for a given Arrow
type."""
+ if types.is_boolean(data_type):
+ return BooleanArrayAppender()
+ elif types.is_int8(data_type):
+ return Int8ArrayAppender()
+ elif types.is_int16(data_type):
+ return Int16ArrayAppender()
+ elif types.is_int32(data_type):
+ return Int32ArrayAppender()
+ elif types.is_int64(data_type):
+ return Int64ArrayAppender()
+ elif types.is_float32(data_type):
+ return FloatArrayAppender()
+ elif types.is_float64(data_type):
+ return DoubleArrayAppender()
+ elif types.is_date32(data_type):
+ return DateArrayAppender()
+ elif types.is_time32(data_type):
+ return Time32ArrayAppender(data_type)
+ elif types.is_time64(data_type):
+ return Time64ArrayAppender(data_type)
+ elif types.is_timestamp(data_type):
+ return TimestampArrayAppender(data_type)
+ elif types.is_binary(data_type):
+ return BinaryArrayAppender()
+ elif types.is_string(data_type) or types.is_large_string(data_type):
+ return StringArrayAppender()
+ elif types.is_list(data_type):
+ elem_appender = _create_array_appender(data_type.value_type)
+ return ListArrayAppender(data_type, elem_appender)
+ elif types.is_struct(data_type):
+ field_appenders = [_create_array_appender(data_type.field(i).type) for
i in range(data_type.num_fields)]
+ return StructArrayAppender(data_type, field_appenders)
+ elif types.is_map(data_type):
+ key_appender = _create_array_appender(data_type.key_type)
+ item_appender = _create_array_appender(data_type.item_type)
+ return MapArrayAppender(data_type, key_appender, item_appender)
+ else:
+ raise NotImplementedError(f"Unsupported type: {data_type}")
+
+
+def _get_value_reader(data_type):
+ """Return the appropriate getter method name for the data type."""
+ if types.is_boolean(data_type):
+ return "get_boolean"
+ elif types.is_int8(data_type):
+ return "get_int8"
+ elif types.is_int16(data_type):
+ return "get_int16"
+ elif types.is_int32(data_type):
+ return "get_int32"
+ elif types.is_int64(data_type):
+ return "get_int64"
+ elif types.is_float32(data_type):
+ return "get_float"
+ elif types.is_float64(data_type):
+ return "get_double"
+ elif types.is_date32(data_type):
+ return "get_date"
+ elif types.is_time32(data_type):
+ return "get_int32"
+ elif types.is_time64(data_type):
+ return "get_int64"
+ elif types.is_timestamp(data_type):
+ return "get_datetime"
+ elif types.is_binary(data_type):
+ return "get_binary"
+ elif types.is_string(data_type) or types.is_large_string(data_type):
+ return "get_str"
+ elif types.is_list(data_type):
+ return "get_array_data"
+ elif types.is_struct(data_type):
+ return "get_struct"
+ elif types.is_map(data_type):
+ return "get_map_data"
+ else:
+ raise NotImplementedError(f"Unsupported type: {data_type}")
+
+
+class ArrayAppender:
+ """Base class for array appenders."""
+
+ def append(self, value):
+ """Append a value (can be None for null)."""
+ raise NotImplementedError
+
+ def finish(self):
+ """Finish building and return the Arrow array."""
+ raise NotImplementedError
+
+ def reset(self):
+ """Reset the builder for reuse."""
+ raise NotImplementedError
+
+
+class BooleanArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.bool_())
+
+ def reset(self):
+ self._values = []
+
+
+class Int8ArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.int8())
+
+ def reset(self):
+ self._values = []
+
+
+class Int16ArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.int16())
+
+ def reset(self):
+ self._values = []
+
+
+class Int32ArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.int32())
+
+ def reset(self):
+ self._values = []
+
+
+class Int64ArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.int64())
+
+ def reset(self):
+ self._values = []
+
+
+class FloatArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.float32())
+
+ def reset(self):
+ self._values = []
+
+
+class DoubleArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.float64())
+
+ def reset(self):
+ self._values = []
+
+
+class DateArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.date32())
+
+ def reset(self):
+ self._values = []
+
+
+class Time32ArrayAppender(ArrayAppender):
+ def __init__(self, data_type):
+ self._type = data_type
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=self._type)
+
+ def reset(self):
+ self._values = []
+
+
+class Time64ArrayAppender(ArrayAppender):
+ def __init__(self, data_type):
+ self._type = data_type
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=self._type)
+
+ def reset(self):
+ self._values = []
+
+
+class TimestampArrayAppender(ArrayAppender):
+ def __init__(self, data_type):
+ self._type = data_type
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=self._type)
+
+ def reset(self):
+ self._values = []
+
+
+class BinaryArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.binary())
+
+ def reset(self):
+ self._values = []
+
+
+class StringArrayAppender(ArrayAppender):
+ def __init__(self):
+ self._values = []
+
+ def append(self, value):
+ self._values.append(value)
+
+ def finish(self):
+ return pa.array(self._values, type=pa.string())
+
+ def reset(self):
+ self._values = []
+
+
+class ListArrayAppender(ArrayAppender):
+ def __init__(self, data_type, elem_appender):
+ self._type = data_type
+ self._elem_appender = elem_appender
+ self._elem_reader = _get_value_reader(data_type.value_type)
+ self._offsets = [0]
+ self._null_bitmap = []
+
+ def append(self, array_data):
+ if array_data is None:
+ self._offsets.append(self._offsets[-1])
+ self._null_bitmap.append(False)
+ else:
+ num_elements = array_data.num_elements
+ reader = getattr(array_data, self._elem_reader)
+ for j in range(num_elements):
+ value = reader(j)
+ self._elem_appender.append(value)
+ self._offsets.append(self._offsets[-1] + num_elements)
+ self._null_bitmap.append(True)
+
+ def finish(self):
+ values = self._elem_appender.finish()
+ offsets = pa.array(self._offsets, type=pa.int32())
+ null_mask = pa.array([not v for v in self._null_bitmap],
type=pa.bool_())
+ return pa.ListArray.from_arrays(offsets, values, mask=null_mask)
+
+ def reset(self):
+ self._offsets = [0]
+ self._null_bitmap = []
+ self._elem_appender.reset()
+
+
+class StructArrayAppender(ArrayAppender):
+ def __init__(self, data_type, field_appenders):
+ self._type = data_type
+ self._field_appenders = field_appenders
+ self._field_readers = [_get_value_reader(data_type.field(i).type) for
i in range(data_type.num_fields)]
+ self._null_bitmap = []
+
+ def append(self, struct_data):
+ if struct_data is None:
+ self._null_bitmap.append(False)
+ for appender in self._field_appenders:
+ appender.append(None)
+ else:
+ num_fields = struct_data.num_fields
+ for j in range(num_fields):
+ reader = getattr(struct_data, self._field_readers[j])
+ value = reader(j)
+ self._field_appenders[j].append(value)
+ self._null_bitmap.append(True)
+
+ def finish(self):
+ arrays = [appender.finish() for appender in self._field_appenders]
+ names = [self._type.field(i).name for i in
range(self._type.num_fields)]
+ null_mask = pa.array([not v for v in self._null_bitmap],
type=pa.bool_())
+ return pa.StructArray.from_arrays(arrays, names=names, mask=null_mask)
+
+ def reset(self):
+ self._null_bitmap = []
+ for appender in self._field_appenders:
+ appender.reset()
+
+
+class MapArrayAppender(ArrayAppender):
+ def __init__(self, data_type, key_appender, item_appender):
+ self._type = data_type
+ self._key_appender = key_appender
+ self._item_appender = item_appender
+ self._key_reader = _get_value_reader(data_type.key_type)
+ self._item_reader = _get_value_reader(data_type.item_type)
+ self._offsets = [0]
+ self._null_bitmap = []
+
+ def append(self, map_data):
+ if map_data is None:
+ self._offsets.append(self._offsets[-1])
+ self._null_bitmap.append(False)
+ else:
+ num_elements = map_data.num_elements
+ keys_array = map_data.keys_array()
+ values_array = map_data.values_array()
+ key_reader = getattr(keys_array, self._key_reader)
+ item_reader = getattr(values_array, self._item_reader)
+ for j in range(num_elements):
+ self._key_appender.append(key_reader(j))
+ self._item_appender.append(item_reader(j))
+ self._offsets.append(self._offsets[-1] + num_elements)
+ self._null_bitmap.append(True)
+
+ def finish(self):
+ keys = self._key_appender.finish()
+ items = self._item_appender.finish()
+ offsets = pa.array(self._offsets, type=pa.int32())
+ return pa.MapArray.from_arrays(offsets, keys, items)
+
+ def reset(self):
+ self._offsets = [0]
+ self._null_bitmap = []
+ self._key_appender.reset()
+ self._item_appender.reset()
+
+
+class ArrowWriter:
+ """
+ Converts Fory row format data to Apache Arrow columnar format.
+
+ This class accumulates rows and produces an Arrow RecordBatch.
+
+ Example:
+ >>> schema = pa.schema([("f1", pa.int64()), ("f2", pa.string())])
+ >>> writer = ArrowWriter(schema)
+ >>> encoder = create_row_encoder(schema)
+ >>> for obj in objects:
+ ... row = encoder.to_row(obj)
+ ... writer.write(row)
+ >>> record_batch = writer.finish()
+ """
+
+ def __init__(self, schema, pool=None):
+ """
+ Initialize ArrowWriter with the given schema.
+
+ Args:
+ schema: PyArrow Schema defining the structure of the data.
+ pool: Memory pool (unused, kept for API compatibility).
+ """
+ self._schema = schema
+ self._column_appenders = []
+ self._column_readers = []
+ self._num_rows = 0
+
+ for i in range(len(schema)):
+ field_type = schema.field(i).type
+ self._column_appenders.append(_create_array_appender(field_type))
+ self._column_readers.append(_get_value_reader(field_type))
+
+ def write(self, row):
+ """
+ Write a row to the writer.
+
+ Args:
+ row: A RowData instance containing the row data.
+ """
+ num_fields = row.num_fields
+ for i in range(num_fields):
+ reader_method = getattr(row, self._column_readers[i])
+ value = reader_method(i)
+ self._column_appenders[i].append(value)
+ self._num_rows += 1
+
+ def finish(self):
+ """
+ Finish writing and return the RecordBatch.
+
+ Returns:
+ An Arrow RecordBatch containing all written rows.
+ """
+ columns = [appender.finish() for appender in self._column_appenders]
+ return pa.RecordBatch.from_arrays(columns, schema=self._schema)
+
+ def reset(self):
+ """Reset the writer for reuse."""
+ self._num_rows = 0
+ for appender in self._column_appenders:
+ appender.reset()
diff --git a/python/pyfory/format/vectorized.pxi
b/python/pyfory/format/vectorized.pxi
deleted file mode 100644
index e9492d0a5..000000000
--- a/python/pyfory/format/vectorized.pxi
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from libcpp.memory cimport shared_ptr
-from libc.stdint cimport *
-from pyfory.includes.libformat cimport CArrowWriter
-from pyarrow.lib cimport CMemoryPool, CRecordBatch
-from pyarrow.lib cimport Schema, MemoryPool, check_status
-import pyarrow as pa
-cimport pyarrow.lib as libpa
-
-cdef class ArrowWriter:
- cdef:
- shared_ptr[CSchema] c_schema
- CMemoryPool *c_pool
- shared_ptr[CArrowWriter] c_arrow_writer
-
- def __init__(self, Schema schema, MemoryPool pool=None):
- self.c_schema = schema.sp_schema
- if pool is None:
- pool = pa.default_memory_pool()
- self.c_pool = pool.pool
- check_status(CArrowWriter.Make(
- self.c_schema, self.c_pool, &self.c_arrow_writer))
-
- def write(self, RowData row):
- check_status(self.c_arrow_writer.get().Write(row.data))
-
- def finish(self):
- cdef shared_ptr[CRecordBatch] batch
- check_status(self.c_arrow_writer.get().Finish(&batch))
- return libpa.pyarrow_wrap_batch(batch)
-
- def reset(self):
- self.c_arrow_writer.get().Reset()
diff --git a/python/pyfory/includes/libformat.pxd
b/python/pyfory/includes/libformat.pxd
index 1e8065454..63bb45efe 100755
--- a/python/pyfory/includes/libformat.pxd
+++ b/python/pyfory/includes/libformat.pxd
@@ -183,18 +183,3 @@ cdef extern from "fory/row/writer.h" namespace "fory"
nogil:
int size()
shared_ptr[CArrayData] CopyToArrayData()
-
-
-cdef extern from "fory/columnar/arrow_writer.h" namespace\
- "fory::columnar" nogil:
- cdef cppclass CArrowWriter" fory::columnar::ArrowWriter":
- @staticmethod
- CStatus Make(shared_ptr[CSchema] schema,
- CMemoryPool *pool,
- shared_ptr[CArrowWriter] *writer)
-
- CStatus Write(const shared_ptr[CRow] &row)
-
- CStatus Finish(shared_ptr[CRecordBatch] *record_batch)
-
- void Reset()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]