This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 80c3563 feat: add binary row serializers and blob implementation (#31)
80c3563 is described below
commit 80c3563461b1a3439abe5c75b88f579a2e2e949f
Author: lszskye <[email protected]>
AuthorDate: Mon Jun 1 14:09:42 2026 +0800
feat: add binary row serializers and blob implementation (#31)
---
src/paimon/common/data/binary_row_writer.cpp | 1 +
src/paimon/common/data/blob.cpp | 118 +++
.../data/serializer/binary_row_serializer.cpp | 56 ++
.../common/data/serializer/binary_row_serializer.h | 69 ++
.../data/serializer/binary_row_serializer_test.cpp | 73 ++
.../data/serializer/binary_serializer_utils.cpp | 218 ++++++
.../data/serializer/binary_serializer_utils.h | 55 ++
.../serializer/binary_serializer_utils_test.cpp | 206 ++++++
.../data/serializer/row_compacted_serializer.cpp | 637 ++++++++++++++++
.../data/serializer/row_compacted_serializer.h | 202 +++++
.../serializer/row_compacted_serializer_test.cpp | 821 +++++++++++++++++++++
11 files changed, 2456 insertions(+)
diff --git a/src/paimon/common/data/binary_row_writer.cpp
b/src/paimon/common/data/binary_row_writer.cpp
index 3c1deb4..c9b5ce0 100644
--- a/src/paimon/common/data/binary_row_writer.cpp
+++ b/src/paimon/common/data/binary_row_writer.cpp
@@ -161,6 +161,7 @@ Result<BinaryRowWriter::FieldSetterFunc>
BinaryRowWriter::CreateFieldSetter(
}
auto decimal_value =
DataDefine::GetVariantValue<Decimal>(field);
assert(decimal_value.Scale() == scale);
+ (void)scale;
return writer->WriteDecimal(field_idx, decimal_value,
precision);
};
return field_setter;
diff --git a/src/paimon/common/data/blob.cpp b/src/paimon/common/data/blob.cpp
new file mode 100644
index 0000000..6695327
--- /dev/null
+++ b/src/paimon/common/data/blob.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/blob.h"
+
+#include <memory>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "paimon/common/data/blob_descriptor.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/io/offset_input_stream.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/stream_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class MemoryPool;
+
+class Blob::Impl {
+ public:
+ explicit Impl(std::unique_ptr<BlobDescriptor>&& descriptor)
+ : descriptor_(std::move(descriptor)) {}
+
+ PAIMON_UNIQUE_PTR<Bytes> SerializeDescriptor(const
std::shared_ptr<MemoryPool>& pool) const {
+ return descriptor_->Serialize(pool);
+ }
+
+ const BlobDescriptor* GetDescriptor() const {
+ return descriptor_.get();
+ }
+
+ const std::string& Uri() const {
+ return descriptor_->Uri();
+ }
+
+ private:
+ std::unique_ptr<BlobDescriptor> descriptor_;
+};
+
+Result<std::unique_ptr<Blob>> Blob::FromPath(const std::string& path) {
+ return FromPath(path, /*offset=*/0, /*length=*/-1);
+}
+
+Result<std::unique_ptr<Blob>> Blob::FromPath(const std::string& path, int64_t
offset,
+ int64_t length) {
+ PAIMON_ASSIGN_OR_RAISE(std::string normalized_path,
PathUtil::NormalizePath(path));
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlobDescriptor> descriptor,
+ BlobDescriptor::Create(normalized_path, offset,
length));
+ auto impl = std::make_unique<Impl>(std::move(descriptor));
+ return std::unique_ptr<Blob>(new Blob(std::move(impl)));
+}
+
+Blob::Blob(std::unique_ptr<Impl>&& impl) : impl_(std::move(impl)) {}
+Blob::~Blob() = default;
+
+Result<std::unique_ptr<Blob>> Blob::FromDescriptor(const char* buffer,
uint64_t length) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlobDescriptor> descriptor,
+ BlobDescriptor::Deserialize(buffer, length));
+
+ auto impl = std::make_unique<Impl>(std::move(descriptor));
+ return std::unique_ptr<Blob>(new Blob(std::move(impl)));
+}
+
+PAIMON_UNIQUE_PTR<Bytes> Blob::ToDescriptor(const std::shared_ptr<MemoryPool>&
pool) const {
+ return impl_->SerializeDescriptor(pool);
+}
+
+const std::string& Blob::Uri() const {
+ return impl_->Uri();
+}
+
+Result<std::unique_ptr<InputStream>> Blob::NewInputStream(
+ const std::shared_ptr<FileSystem>& fs) const {
+ if (fs == nullptr) {
+ return Status::Invalid("file system is nullptr");
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> file,
+ fs->Open(impl_->GetDescriptor()->Uri()));
+
+ return OffsetInputStream::Create(std::move(file),
impl_->GetDescriptor()->Length(),
+ impl_->GetDescriptor()->Offset());
+}
+
+Result<PAIMON_UNIQUE_PTR<Bytes>> Blob::ToData(const
std::shared_ptr<FileSystem>& fs,
+ const
std::shared_ptr<MemoryPool>& pool) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream,
NewInputStream(fs));
+ return StreamUtils::ReadAsyncFully(std::move(input_stream), pool);
+}
+
+Result<std::unique_ptr<ArrowSchema>> Blob::ArrowField(
+ const std::string& field_name, std::unordered_map<std::string,
std::string> metadata) {
+ auto blob_field = BlobUtils::ToArrowField(field_name, /*nullable=*/false,
metadata);
+ auto field = std::make_unique<::ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportField(*blob_field,
field.get()));
+ return field;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_row_serializer.cpp
b/src/paimon/common/data/serializer/binary_row_serializer.cpp
new file mode 100644
index 0000000..826f755
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_row_serializer.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_row_serializer.h"
+
+#include <algorithm>
+#include <cassert>
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+
+Status BinaryRowSerializer::Serialize(const BinaryRow& record,
+ MemorySegmentOutputStream* target) const
{
+ target->WriteValue<int32_t>(record.GetSizeInBytes());
+ return SerializeWithoutLength(record, target);
+}
+
+Status BinaryRowSerializer::SerializeWithoutLength(const BinaryRow& record,
+ MemorySegmentOutputStream*
target) const {
+ target->Write(record.GetSegment(), record.GetOffset(),
record.GetSizeInBytes());
+ return Status::OK();
+}
+
+Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source)
const {
+ BinaryRow row(num_fields_);
+ PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(read_length,
pool_.get());
+ PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
+ row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
+ return row;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_row_serializer.h
b/src/paimon/common/data/serializer/binary_row_serializer.h
new file mode 100644
index 0000000..b1f0a05
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_row_serializer.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/common/data/binary_row.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class DataInputStream;
+class MemorySegmentOutputStream;
+class MemoryPool;
+
+/// Serializer for `BinaryRow`.
+class BinaryRowSerializer {
+ public:
+ BinaryRowSerializer(int32_t num_fields, const std::shared_ptr<MemoryPool>&
pool)
+ : num_fields_(num_fields),
+
fixed_length_part_size_(BinaryRow::CalculateFixPartSizeInBytes(num_fields)),
+ pool_(pool) {}
+
+ int32_t GetArity() const {
+ return num_fields_;
+ }
+
+ static constexpr int32_t LENGTH_SIZE_IN_BYTES = 4;
+
+ int32_t GetFixedLengthPartSize() const {
+ return fixed_length_part_size_;
+ }
+
+ /// @return Fixed part length to serialize one row.
+ int32_t GetSerializedRowFixedPartLength() const {
+ return GetFixedLengthPartSize() + LENGTH_SIZE_IN_BYTES;
+ }
+
+ Result<BinaryRow> Deserialize(DataInputStream* source) const;
+
+ Status Serialize(const BinaryRow& row, MemorySegmentOutputStream* target)
const;
+
+ private:
+ Status SerializeWithoutLength(const BinaryRow& record,
MemorySegmentOutputStream* target) const;
+
+ int32_t num_fields_;
+ int32_t fixed_length_part_size_;
+ std::shared_ptr<MemoryPool> pool_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_row_serializer_test.cpp
b/src/paimon/common/data/serializer/binary_row_serializer_test.cpp
new file mode 100644
index 0000000..22c2f6b
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_row_serializer_test.cpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_row_serializer.h"
+
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BinaryRowSerializerTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ serializer_ = std::make_unique<BinaryRowSerializer>(3, pool_);
+ }
+
+ std::shared_ptr<MemoryPool> pool_;
+ std::unique_ptr<BinaryRowSerializer> serializer_;
+};
+
+TEST_F(BinaryRowSerializerTest, SerializeAndDeserialize) {
+ // Create a BinaryRow with some test data
+ BinaryRow row(3);
+ BinaryRowWriter writer(&row, 20, pool_.get());
+ writer.WriteInt(0, 42);
+ writer.WriteString(1, BinaryString::FromString("test", pool_.get()));
+ writer.WriteDouble(2, 3.14);
+
+ // Serialize the BinaryRow
+ MemorySegmentOutputStream output_stream(1, pool_);
+ ASSERT_OK(serializer_->Serialize(row, &output_stream));
+ auto segments = output_stream.Segments();
+ auto bytes =
+ MemorySegmentUtils::CopyToBytes(segments, 0,
output_stream.CurrentSize(), pool_.get());
+
+ // Deserialize the BinaryRow
+ auto input_stream = std::make_shared<ByteArrayInputStream>(bytes->data(),
bytes->size());
+ DataInputStream in(input_stream);
+ ASSERT_OK_AND_ASSIGN(BinaryRow deserialized_row,
serializer_->Deserialize(&in));
+ EXPECT_EQ(deserialized_row.GetInt(0), 42);
+ EXPECT_EQ(deserialized_row.GetString(1).ToString(), "test");
+ EXPECT_EQ(deserialized_row.GetDouble(2), 3.14);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/serializer/binary_serializer_utils.cpp
b/src/paimon/common/data/serializer/binary_serializer_utils.cpp
new file mode 100644
index 0000000..4675e54
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_serializer_utils.cpp
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_serializer_utils.h"
+
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/date_time_utils.h"
+namespace paimon {
+Result<std::shared_ptr<BinaryArray>> BinarySerializerUtils::WriteBinaryArray(
+ const std::shared_ptr<InternalArray>& value, const
std::shared_ptr<arrow::DataType>& type,
+ MemoryPool* pool) {
+ if (auto binary_array = std::dynamic_pointer_cast<BinaryArray>(value)) {
+ return binary_array;
+ }
+ auto binary_array = std::make_shared<BinaryArray>();
+ auto list_type = std::dynamic_pointer_cast<arrow::ListType>(type);
+ assert(list_type);
+ auto value_type = list_type->value_type();
+ // TODO(xinyu.lxy): reuse BinaryWriter
+ BinaryArrayWriter binary_writer(binary_array.get(), value->Size(),
+
BinaryArrayWriter::GetElementSize(value_type->id()), pool);
+ for (int32_t i = 0; i < value->Size(); i++) {
+ PAIMON_RETURN_NOT_OK(WriteBinaryData(value_type, value.get(), i,
&binary_writer, pool));
+ }
+ binary_writer.Complete();
+ return binary_array;
+}
+
+Result<std::shared_ptr<BinaryMap>> BinarySerializerUtils::WriteBinaryMap(
+ const std::shared_ptr<InternalMap>& value, const
std::shared_ptr<arrow::DataType>& type,
+ MemoryPool* pool) {
+ if (auto binary_map = std::dynamic_pointer_cast<BinaryMap>(value)) {
+ return binary_map;
+ }
+ auto map_type = std::dynamic_pointer_cast<arrow::MapType>(type);
+ assert(map_type);
+ auto key_type = map_type->key_type();
+ auto value_type = map_type->item_type();
+ auto key_array = value->KeyArray();
+ auto value_array = value->ValueArray();
+ assert(key_array && value_array);
+ assert(key_array->Size() == value_array->Size());
+ BinaryArray binary_key_array;
+ BinaryArrayWriter binary_key_writer(&binary_key_array, key_array->Size(),
+
BinaryArrayWriter::GetElementSize(key_type->id()), pool);
+ BinaryArray binary_value_array;
+ BinaryArrayWriter binary_value_writer(&binary_value_array,
value_array->Size(),
+
BinaryArrayWriter::GetElementSize(value_type->id()),
+ pool);
+ for (int32_t i = 0; i < key_array->Size(); i++) {
+ PAIMON_RETURN_NOT_OK(
+ WriteBinaryData(key_type, key_array.get(), i, &binary_key_writer,
pool));
+ PAIMON_RETURN_NOT_OK(
+ WriteBinaryData(value_type, value_array.get(), i,
&binary_value_writer, pool));
+ }
+ binary_key_writer.Complete();
+ binary_value_writer.Complete();
+ return BinaryMap::ValueOf(binary_key_array, binary_value_array, pool);
+}
+
+Result<std::shared_ptr<BinaryRow>> BinarySerializerUtils::WriteBinaryRow(
+ const std::shared_ptr<InternalRow>& value, const
std::shared_ptr<arrow::DataType>& type,
+ MemoryPool* pool) {
+ if (auto binary_row = std::dynamic_pointer_cast<BinaryRow>(value)) {
+ return binary_row;
+ }
+
+ auto struct_type = std::dynamic_pointer_cast<arrow::StructType>(type);
+ assert(struct_type);
+ auto field_count = struct_type->num_fields();
+ auto binary_row = std::make_shared<BinaryRow>(field_count);
+ BinaryRowWriter binary_writer(binary_row.get(), /*initial_size=*/1024,
pool);
+ for (int32_t i = 0; i < field_count; i++) {
+ PAIMON_RETURN_NOT_OK(
+ WriteBinaryData(struct_type->field(i)->type(), value.get(), i,
&binary_writer, pool));
+ }
+ binary_writer.Complete();
+ return binary_row;
+}
+
+Status BinarySerializerUtils::WriteBinaryData(const
std::shared_ptr<arrow::DataType>& type,
+ const DataGetters* getter,
int32_t pos,
+ BinaryWriter* writer,
MemoryPool* pool) {
+ assert(getter && writer && pool);
+ arrow::Type::type type_id = type->id();
+ auto array_writer = dynamic_cast<BinaryArrayWriter*>(writer);
+ if (getter->IsNullAt(pos)) {
+ // compatible with Java Paimon
+ if (array_writer) {
+ array_writer->SetNullAt(pos, type_id);
+ return Status::OK();
+ } else if (type_id != arrow::Type::type::DECIMAL &&
+ type_id != arrow::Type::type::TIMESTAMP) {
+ // if row writer, exclude decimal and timestamp when set null
+ writer->SetNullAt(pos);
+ return Status::OK();
+ }
+ }
+ switch (type_id) {
+ case arrow::Type::type::BOOL: {
+ writer->WriteBoolean(pos, getter->GetBoolean(pos));
+ break;
+ }
+ case arrow::Type::type::INT8: {
+ writer->WriteByte(pos, getter->GetByte(pos));
+ break;
+ }
+ case arrow::Type::type::INT16: {
+ writer->WriteShort(pos, getter->GetShort(pos));
+ break;
+ }
+ case arrow::Type::type::DATE32: {
+ writer->WriteInt(pos, getter->GetDate(pos));
+ break;
+ }
+ case arrow::Type::type::INT32: {
+ writer->WriteInt(pos, getter->GetInt(pos));
+ break;
+ }
+ case arrow::Type::type::INT64: {
+ writer->WriteLong(pos, getter->GetLong(pos));
+ break;
+ }
+ case arrow::Type::type::FLOAT: {
+ writer->WriteFloat(pos, getter->GetFloat(pos));
+ break;
+ }
+ case arrow::Type::type::DOUBLE: {
+ writer->WriteDouble(pos, getter->GetDouble(pos));
+ break;
+ }
+ case arrow::Type::type::STRING: {
+ writer->WriteStringView(pos, getter->GetStringView(pos));
+ break;
+ }
+ case arrow::Type::type::BINARY: {
+ writer->WriteStringView(pos, getter->GetStringView(pos));
+ break;
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto timestamp_type =
arrow::internal::checked_pointer_cast<arrow::TimestampType>(type);
+ assert(timestamp_type);
+ int32_t precision =
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+ if (getter->IsNullAt(pos)) {
+ // compatible with Java Paimon
+ if (!Timestamp::IsCompact(precision)) {
+ writer->WriteTimestamp(pos, std::nullopt, precision);
+ } else {
+ writer->SetNullAt(pos);
+ }
+ } else {
+ writer->WriteTimestamp(pos, getter->GetTimestamp(pos,
precision), precision);
+ }
+ break;
+ }
+ case arrow::Type::type::DECIMAL: {
+ auto* decimal_type =
arrow::internal::checked_cast<arrow::Decimal128Type*>(type.get());
+ assert(decimal_type);
+ auto precision = decimal_type->precision();
+ auto scale = decimal_type->scale();
+ if (getter->IsNullAt(pos)) {
+ // compatible with Java Paimon
+ if (!Decimal::IsCompact(precision)) {
+ writer->WriteDecimal(pos, std::nullopt, precision);
+ } else {
+ writer->SetNullAt(pos);
+ }
+ } else {
+ writer->WriteDecimal(pos, getter->GetDecimal(pos, precision,
scale), precision);
+ }
+ break;
+ }
+ case arrow::Type::type::LIST: {
+ auto internal_array = getter->GetArray(pos);
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryArray> binary_array,
+ WriteBinaryArray(internal_array, type,
pool));
+ writer->WriteArray(pos, *binary_array);
+ break;
+ }
+ case arrow::Type::type::MAP: {
+ auto internal_map = getter->GetMap(pos);
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryMap> binary_map,
+ WriteBinaryMap(internal_map, type, pool));
+ writer->WriteMap(pos, *binary_map);
+ break;
+ }
+ case arrow::Type::type::STRUCT: {
+ auto internal_row = getter->GetRow(pos, type->num_fields());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryRow> binary_row,
+ WriteBinaryRow(internal_row, type, pool));
+ writer->WriteRow(pos, *binary_row);
+ break;
+ }
+ default:
+ return Status::Invalid(
+ fmt::format("type {} not support in WriteBinaryData in binary
serializer utils",
+ type->ToString()));
+ }
+ return Status::OK();
+}
+} // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_serializer_utils.h
b/src/paimon/common/data/serializer/binary_serializer_utils.h
new file mode 100644
index 0000000..f024b94
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_serializer_utils.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_writer.h"
+
+namespace paimon {
+/// Utils for generate `BinaryRow`, `BinaryArray`, `BinaryMap` from
`InternalRow`, `InternalArray`,
+/// and `InternalMap`.
+class BinarySerializerUtils {
+ public:
+ BinarySerializerUtils() = delete;
+ ~BinarySerializerUtils() = delete;
+
+ static Result<std::shared_ptr<BinaryArray>> WriteBinaryArray(
+ const std::shared_ptr<InternalArray>& value, const
std::shared_ptr<arrow::DataType>& type,
+ MemoryPool* pool);
+
+ static Result<std::shared_ptr<BinaryMap>> WriteBinaryMap(
+ const std::shared_ptr<InternalMap>& value, const
std::shared_ptr<arrow::DataType>& type,
+ MemoryPool* pool);
+
+ static Result<std::shared_ptr<BinaryRow>> WriteBinaryRow(
+ const std::shared_ptr<InternalRow>& value, const
std::shared_ptr<arrow::DataType>& type,
+ MemoryPool* pool);
+
+ private:
+ static Status WriteBinaryData(const std::shared_ptr<arrow::DataType>& type,
+ const DataGetters* getter, int32_t pos,
BinaryWriter* writer,
+ MemoryPool* pool);
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_serializer_utils_test.cpp
b/src/paimon/common/data/serializer/binary_serializer_utils_test.cpp
new file mode 100644
index 0000000..f557d46
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_serializer_utils_test.cpp
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_serializer_utils.h"
+
+#include <string>
+
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(BinarySerializerUtilsTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ std::shared_ptr<arrow::DataType> arrow_type =
+ arrow::struct_({arrow::field("f1", arrow::boolean()),
arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()), arrow::field("f4",
arrow::int32()),
+ arrow::field("field_null", arrow::int32()),
+ arrow::field("f5", arrow::int64()), arrow::field("f6",
arrow::float32()),
+ arrow::field("f7", arrow::float64()),
arrow::field("f8", arrow::utf8()),
+ arrow::field("f9", arrow::binary()),
arrow::field("f10", arrow::date32()),
+ arrow::field("f11",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f12", arrow::decimal128(5, 2))});
+
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+ [true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141, "20250327",
"banana", 2026, 5000001, "5.12"]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ auto columnar_row = std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
+ struct_array->fields(),
pool, /*row_id=*/0);
+
+ ASSERT_OK_AND_ASSIGN(auto binary_row,
BinarySerializerUtils::WriteBinaryRow(
+ columnar_row, arrow_type,
pool.get()));
+
+ // check result
+ ASSERT_EQ(binary_row->GetFieldCount(), 13);
+ ASSERT_FALSE(binary_row->IsNullAt(0));
+
+ ASSERT_EQ(binary_row->GetBoolean(0), true);
+ ASSERT_EQ(binary_row->GetByte(1), 0);
+ ASSERT_EQ(binary_row->GetShort(2), 32767);
+ ASSERT_EQ(binary_row->GetInt(3), 2147483647);
+ ASSERT_TRUE(binary_row->IsNullAt(4));
+ ASSERT_EQ(binary_row->GetLong(5), 4294967295l);
+ ASSERT_EQ(binary_row->GetFloat(6), 0.5);
+ ASSERT_EQ(binary_row->GetDouble(7), 1.141);
+ ASSERT_EQ(binary_row->GetString(8), BinaryString::FromString("20250327",
pool.get()));
+ auto f9_bytes = std::make_shared<Bytes>("banana", pool.get());
+ ASSERT_EQ(*binary_row->GetBinary(9), *f9_bytes);
+ ASSERT_EQ(binary_row->GetDate(10), 2026);
+ ASSERT_EQ(binary_row->GetTimestamp(11, 9),
+ Timestamp(/*millisecond=*/5, /*nano_of_millisecond*/ 1));
+ ASSERT_EQ(binary_row->GetDecimal(12, 5, 2),
+ Decimal(5, 2, DecimalUtils::StrToInt128("512").value()));
+}
+
+TEST(BinarySerializerUtilsTest, TestNestedType) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto key_field = arrow::field("key", arrow::int32());
+ auto value_field = arrow::field("value", arrow::int32());
+
+ auto inner_child1 = arrow::field(
+ "inner1", arrow::map(arrow::int32(), arrow::field("inner_list",
arrow::list(value_field))));
+ auto inner_child2 = arrow::field(
+ "inner2", arrow::map(arrow::int32(),
+ arrow::field("inner_map",
arrow::map(arrow::int32(), value_field))));
+ auto inner_child3 = arrow::field(
+ "inner3",
+ arrow::map(arrow::int32(),
+ arrow::field("inner_struct", arrow::struct_({key_field,
value_field}))));
+
+ auto arrow_type = arrow::struct_({inner_child1, inner_child2,
inner_child3});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[[100, [1, 2, 3, 4]], [101, [5, 6, 7]]],
+[[200, [[500, 1]]], [201, [[501, 2]]]],
+[[600, [100, 200]]]]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ ASSERT_OK_AND_ASSIGN(auto binary_row,
BinarySerializerUtils::WriteBinaryRow(
+ columnar_row, arrow_type,
pool.get()));
+
+ // check result
+ ASSERT_EQ(binary_row->GetFieldCount(), 3);
+
+ // for inner_child1
+ ASSERT_EQ(binary_row->GetMap(0)->KeyArray()->ToIntArray().value(),
+ std::vector<int32_t>({100, 101}));
+ auto value1 = binary_row->GetMap(0)->ValueArray();
+ ASSERT_EQ(value1->Size(), 2);
+ ASSERT_EQ(value1->GetArray(0)->ToIntArray().value(),
std::vector<int32_t>({1, 2, 3, 4}));
+ ASSERT_EQ(value1->GetArray(1)->ToIntArray().value(),
std::vector<int32_t>({5, 6, 7}));
+
+ // for inner_child2
+ ASSERT_EQ(binary_row->GetMap(1)->KeyArray()->ToIntArray().value(),
+ std::vector<int32_t>({200, 201}));
+ auto value2 = binary_row->GetMap(1)->ValueArray();
+ ASSERT_EQ(value2->Size(), 2);
+ ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({500}));
+ ASSERT_EQ(value2->GetMap(0)->ValueArray()->ToIntArray().value(),
std::vector<int32_t>({1}));
+ ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({501}));
+ ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(),
std::vector<int32_t>({2}));
+
+ // for inner_child3
+ ASSERT_EQ(binary_row->GetMap(2)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({600}));
+ auto value3 = binary_row->GetMap(2)->ValueArray();
+ ASSERT_EQ(value3->Size(), 1);
+ ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+ ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(1), 200);
+}
+
+TEST(BinarySerializerUtilsTest, TestNestedTypeWithNull) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto key_field = arrow::field("key", arrow::int32());
+ auto value_field = arrow::field("value", arrow::int32());
+
+ auto inner_child1 = arrow::field(
+ "inner1", arrow::map(arrow::int32(), arrow::field("inner_list",
arrow::list(value_field))));
+ auto inner_child2 = arrow::field(
+ "inner2", arrow::map(arrow::int32(),
+ arrow::field("inner_map",
arrow::map(arrow::int32(), value_field))));
+ auto inner_child3 = arrow::field(
+ "inner3",
+ arrow::map(arrow::int32(),
+ arrow::field("inner_struct", arrow::struct_({key_field,
value_field}))));
+
+ auto arrow_type = arrow::struct_({inner_child1, inner_child2,
inner_child3});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[[100, null], [101, [5, 6, null]]],
+[[200, [[500, null]]], [201, [[501, 2]]]],
+[[600, [100, null]]]]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ ASSERT_OK_AND_ASSIGN(auto binary_row,
BinarySerializerUtils::WriteBinaryRow(
+ columnar_row, arrow_type,
pool.get()));
+
+ // check result
+ ASSERT_EQ(binary_row->GetFieldCount(), 3);
+
+ // for inner_child1
+ ASSERT_EQ(binary_row->GetMap(0)->KeyArray()->ToIntArray().value(),
+ std::vector<int32_t>({100, 101}));
+ auto value1 = binary_row->GetMap(0)->ValueArray();
+ ASSERT_EQ(value1->Size(), 2);
+ ASSERT_TRUE(value1->IsNullAt(0));
+ ASSERT_EQ(value1->GetArray(1)->GetInt(0), 5);
+ ASSERT_EQ(value1->GetArray(1)->GetInt(1), 6);
+ ASSERT_TRUE(value1->GetArray(1)->IsNullAt(2));
+
+ // for inner_child2
+ ASSERT_EQ(binary_row->GetMap(1)->KeyArray()->ToIntArray().value(),
+ std::vector<int32_t>({200, 201}));
+ auto value2 = binary_row->GetMap(1)->ValueArray();
+ ASSERT_EQ(value2->Size(), 2);
+ ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({500}));
+ ASSERT_TRUE(value2->GetMap(0)->ValueArray()->IsNullAt(0));
+ ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({501}));
+ ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(),
std::vector<int32_t>({2}));
+
+ // for inner_child3
+ ASSERT_EQ(binary_row->GetMap(2)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({600}));
+ auto value3 = binary_row->GetMap(2)->ValueArray();
+ ASSERT_EQ(value3->Size(), 1);
+ ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+ ASSERT_TRUE(value3->GetRow(0, /*num_fields=*/2)->IsNullAt(1));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.cpp
b/src/paimon/common/data/serializer/row_compacted_serializer.cpp
new file mode 100644
index 0000000..4bf247b
--- /dev/null
+++ b/src/paimon/common/data/serializer/row_compacted_serializer.cpp
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/data/serializer/row_compacted_serializer.h"
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/data/generic_row.h"
+#include "paimon/common/data/serializer/binary_serializer_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/fields_comparator.h"
+
+namespace paimon {
+Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
+ const std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<MemoryPool>& pool) {
+ std::vector<InternalRow::FieldGetterFunc> getters(schema->num_fields());
+ std::vector<RowCompactedSerializer::FieldWriter>
writers(schema->num_fields());
+ std::vector<RowCompactedSerializer::FieldReader>
readers(schema->num_fields());
+ for (int32_t i = 0; i < schema->num_fields(); i++) {
+ auto field_type = schema->field(i)->type();
+ PAIMON_ASSIGN_OR_RAISE(getters[i],
+ InternalRow::CreateFieldGetter(i, field_type,
/*use_view=*/true));
+ PAIMON_ASSIGN_OR_RAISE(writers[i], CreateFieldWriter(field_type,
pool));
+ PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type,
pool));
+ }
+ return std::unique_ptr<RowCompactedSerializer>(new RowCompactedSerializer(
+ schema, std::move(getters), std::move(writers), std::move(readers),
pool));
+}
+
+Result<int32_t> RowCompactedSerializer::CompareField(const FieldInfo&
field_info,
+ RowReader* reader1,
RowReader* reader2) {
+ auto type = field_info.type_id;
+ switch (type) {
+ case arrow::Type::type::BOOL: {
+ auto val1 = reader1->ReadValue<bool>();
+ auto val2 = reader2->ReadValue<bool>();
+ return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+ }
+ case arrow::Type::type::INT8: {
+ auto val1 = reader1->ReadValue<char>();
+ auto val2 = reader2->ReadValue<char>();
+ return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+ }
+ case arrow::Type::type::INT16: {
+ auto val1 = reader1->ReadValue<int16_t>();
+ auto val2 = reader2->ReadValue<int16_t>();
+ return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+ }
+ case arrow::Type::type::INT32:
+ case arrow::Type::type::DATE32: {
+ auto val1 = reader1->ReadValue<int32_t>();
+ auto val2 = reader2->ReadValue<int32_t>();
+ return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+ }
+ case arrow::Type::type::INT64: {
+ auto val1 = reader1->ReadValue<int64_t>();
+ auto val2 = reader2->ReadValue<int64_t>();
+ return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+ }
+ case arrow::Type::type::FLOAT: {
+ auto val1 = reader1->ReadValue<float>();
+ auto val2 = reader2->ReadValue<float>();
+ return FieldsComparator::CompareFloatingPoint(val1, val2);
+ }
+ case arrow::Type::type::DOUBLE: {
+ auto val1 = reader1->ReadValue<double>();
+ auto val2 = reader2->ReadValue<double>();
+ return FieldsComparator::CompareFloatingPoint(val1, val2);
+ }
+ case arrow::Type::type::STRING:
+ case arrow::Type::type::BINARY: {
+ PAIMON_ASSIGN_OR_RAISE(std::string_view val1,
reader1->ReadStringView());
+ PAIMON_ASSIGN_OR_RAISE(std::string_view val2,
reader2->ReadStringView());
+ int32_t cmp = val1.compare(val2);
+ return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ PAIMON_ASSIGN_OR_RAISE(Timestamp val1,
reader1->ReadTimestamp(field_info.precision));
+ PAIMON_ASSIGN_OR_RAISE(Timestamp val2,
reader2->ReadTimestamp(field_info.precision));
+ return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+ }
+ case arrow::Type::type::DECIMAL: {
+ PAIMON_ASSIGN_OR_RAISE(Decimal val1,
+ reader1->ReadDecimal(field_info.precision,
field_info.scale));
+ PAIMON_ASSIGN_OR_RAISE(Decimal val2,
+ reader2->ReadDecimal(field_info.precision,
field_info.scale));
+ int32_t cmp = val1.CompareTo(val2);
+ return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+ }
+ default:
+ return Status::NotImplemented(
+ fmt::format("Do not support comparing type {} in CompareField",
+ static_cast<int32_t>(field_info.type_id)));
+ }
+}
+
+Result<MemorySlice::SliceComparator>
RowCompactedSerializer::CreateSliceComparator(
+ const std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<MemoryPool>& pool) {
+ int32_t bit_set_in_bytes =
RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields());
+ auto row_reader1 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
+ auto row_reader2 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
+
+ std::vector<FieldInfo> field_infos(schema->num_fields());
+ for (int32_t i = 0; i < schema->num_fields(); i++) {
+ auto field_type = schema->field(i)->type();
+ field_infos[i].type_id = field_type->id();
+ if (field_type->id() == arrow::Type::type::TIMESTAMP) {
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
+ assert(timestamp_type);
+ field_infos[i].precision =
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+ } else if (field_type->id() == arrow::Type::type::DECIMAL) {
+ auto decimal_type =
+
arrow::internal::checked_pointer_cast<arrow::Decimal128Type>(field_type);
+ assert(decimal_type);
+ field_infos[i].precision = decimal_type->precision();
+ field_infos[i].scale = decimal_type->scale();
+ }
+ }
+
+ auto comparator = [row_reader1, row_reader2, field_infos](
+ const MemorySlice& slice1, const MemorySlice&
slice2) -> Result<int32_t> {
+ row_reader1->PointTo(slice1.GetSegment(), slice1.Offset());
+ row_reader2->PointTo(slice2.GetSegment(), slice2.Offset());
+ for (int32_t i = 0; i < static_cast<int32_t>(field_infos.size()); i++)
{
+ bool is_null1 = row_reader1->IsNullAt(i);
+ bool is_null2 = row_reader2->IsNullAt(i);
+ if (!is_null1 || !is_null2) {
+ if (is_null1) {
+ return -1;
+ } else if (is_null2) {
+ return 1;
+ } else {
+ PAIMON_ASSIGN_OR_RAISE(
+ int32_t comp,
+ CompareField(field_infos[i], row_reader1.get(),
row_reader2.get()));
+ if (comp != 0) {
+ return comp;
+ }
+ }
+ }
+ }
+ return 0;
+ };
+ return std::function<Result<int32_t>(const MemorySlice&, const
MemorySlice&)>(comparator);
+}
+
+Result<std::shared_ptr<Bytes>> RowCompactedSerializer::SerializeToBytes(const
InternalRow& row) {
+ if (!row_writer_) {
+ row_writer_ =
std::make_unique<RowWriter>(CalculateBitSetInBytes(getters_.size()), pool_);
+ }
+ row_writer_->Reset();
+ PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, row.GetRowKind());
+ row_writer_->WriteRowKind(*row_kind);
+ for (size_t i = 0; i < getters_.size(); i++) {
+ VariantType field = getters_[i](row);
+ PAIMON_RETURN_NOT_OK(writers_[i](i, field, row_writer_.get()));
+ }
+ return row_writer_->CopyBuffer();
+}
+
+Result<std::unique_ptr<InternalRow>> RowCompactedSerializer::Deserialize(
+ const std::shared_ptr<Bytes>& bytes) {
+ if (!row_reader_) {
+ row_reader_ =
std::make_unique<RowReader>(CalculateBitSetInBytes(getters_.size()), pool_);
+ }
+ row_reader_->PointTo(bytes);
+ auto row = std::make_unique<GenericRow>(getters_.size());
+ PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind,
row_reader_->ReadRowKind());
+ row->SetRowKind(row_kind);
+ for (size_t i = 0; i < readers_.size(); i++) {
+ PAIMON_ASSIGN_OR_RAISE(VariantType field, readers_[i](i,
row_reader_.get()));
+ row->SetField(i, field);
+ }
+ row->AddDataHolder(bytes);
+ return row;
+}
+
+RowCompactedSerializer::RowCompactedSerializer(
+ const std::shared_ptr<arrow::Schema>& schema,
+ std::vector<InternalRow::FieldGetterFunc>&& getters,
+ std::vector<RowCompactedSerializer::FieldWriter>&& writers,
+ std::vector<RowCompactedSerializer::FieldReader>&& readers,
+ const std::shared_ptr<MemoryPool>& pool)
+ : pool_(pool),
+ schema_(schema),
+ getters_(std::move(getters)),
+ writers_(std::move(writers)),
+ readers_(std::move(readers)) {}
+
+Result<RowCompactedSerializer::FieldReader>
RowCompactedSerializer::CreateFieldReader(
+ const std::shared_ptr<arrow::DataType>& field_type, const
std::shared_ptr<MemoryPool>& pool) {
+ arrow::Type::type type = field_type->id();
+ RowCompactedSerializer::FieldReader field_reader;
+ switch (type) {
+ case arrow::Type::type::BOOL: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<bool>());
+ };
+ break;
+ }
+ case arrow::Type::type::INT8: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<char>());
+ };
+ break;
+ }
+ case arrow::Type::type::INT16: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<int16_t>());
+ };
+ break;
+ }
+ case arrow::Type::type::INT32:
+ case arrow::Type::type::DATE32: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<int32_t>());
+ };
+ break;
+ }
+ case arrow::Type::type::INT64: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<int64_t>());
+ };
+ break;
+ }
+ case arrow::Type::type::FLOAT: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<float>());
+ };
+ break;
+ }
+ case arrow::Type::type::DOUBLE: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ return VariantType(reader->ReadValue<double>());
+ };
+ break;
+ }
+ case arrow::Type::type::STRING: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value,
reader->ReadStringView());
+ return value;
+ };
+ break;
+ }
+ case arrow::Type::type::BINARY: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value,
reader->ReadBinary());
+ return value;
+ };
+ break;
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
+ int32_t precision =
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+ field_reader = [precision](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value,
reader->ReadTimestamp(precision));
+ return value;
+ };
+ break;
+ }
+ case arrow::Type::type::DECIMAL: {
+ auto* decimal_type =
+
arrow::internal::checked_cast<arrow::Decimal128Type*>(field_type.get());
+ assert(decimal_type);
+ auto precision = decimal_type->precision();
+ auto scale = decimal_type->scale();
+ field_reader = [precision, scale](int32_t pos,
+ RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value,
reader->ReadDecimal(precision, scale));
+ return value;
+ };
+ break;
+ }
+ case arrow::Type::type::LIST: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadArray());
+ return value;
+ };
+ break;
+ }
+ case arrow::Type::type::MAP: {
+ field_reader = [](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadMap());
+ return value;
+ };
+ break;
+ }
+ case arrow::Type::type::STRUCT: {
+ auto* struct_type =
arrow::internal::checked_cast<arrow::StructType*>(field_type.get());
+ assert(struct_type);
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<RowCompactedSerializer> serializer,
+
RowCompactedSerializer::Create(arrow::schema(struct_type->fields()), pool));
+ field_reader = [serializer](int32_t pos, RowReader* reader) ->
Result<VariantType> {
+ PAIMON_ASSIGN_OR_RAISE(VariantType value,
reader->ReadRow(serializer));
+ return value;
+ };
+ break;
+ }
+ default:
+ return Status::Invalid(
+ fmt::format("type {} not support in FieldReader in row
compacted serializer",
+ field_type->ToString()));
+ }
+
+ RowCompactedSerializer::FieldReader ret =
+ [field_reader](int32_t pos, RowReader* reader) -> Result<VariantType> {
+ if (reader->IsNullAt(pos)) {
+ return VariantType(NullType());
+ }
+ return field_reader(pos, reader);
+ };
+ return ret;
+}
+
+Result<RowCompactedSerializer::FieldWriter>
RowCompactedSerializer::CreateFieldWriter(
+ const std::shared_ptr<arrow::DataType>& field_type, const
std::shared_ptr<MemoryPool>& pool) {
+ arrow::Type::type type = field_type->id();
+ RowCompactedSerializer::FieldWriter field_writer;
+ switch (type) {
+ case arrow::Type::type::BOOL: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<bool>(DataDefine::GetVariantValue<bool>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::INT8: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<char>(DataDefine::GetVariantValue<char>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::INT16: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<int16_t>(DataDefine::GetVariantValue<int16_t>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::INT32:
+ case arrow::Type::type::DATE32: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<int32_t>(DataDefine::GetVariantValue<int32_t>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::INT64: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<int64_t>(DataDefine::GetVariantValue<int64_t>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::FLOAT: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<float>(DataDefine::GetVariantValue<float>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::DOUBLE: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ return
writer->WriteValue<double>(DataDefine::GetVariantValue<double>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::STRING: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ const auto* view =
DataDefine::GetVariantPtr<std::string_view>(field);
+ if (view) {
+ return writer->WriteStringView(*view);
+ }
+ return
writer->WriteString(DataDefine::GetVariantValue<BinaryString>(field));
+ };
+ break;
+ }
+ case arrow::Type::type::BINARY: {
+ field_writer = [](int32_t pos, const VariantType& field,
RowWriter* writer) -> Status {
+ const auto* view =
DataDefine::GetVariantPtr<std::string_view>(field);
+ if (view) {
+ return writer->WriteStringView(*view);
+ }
+ return writer->WriteBinary(
+
DataDefine::GetVariantValue<std::shared_ptr<Bytes>>(field).get());
+ };
+ break;
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
+ int32_t precision =
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+ field_writer = [precision](int32_t pos, const VariantType& field,
+ RowWriter* writer) -> Status {
+ return
writer->WriteTimestamp(DataDefine::GetVariantValue<Timestamp>(field),
+ precision);
+ };
+ break;
+ }
+ case arrow::Type::type::DECIMAL: {
+ auto* decimal_type =
+
arrow::internal::checked_cast<arrow::Decimal128Type*>(field_type.get());
+ assert(decimal_type);
+ auto precision = decimal_type->precision();
+ field_writer = [precision](int32_t pos, const VariantType& field,
+ RowWriter* writer) -> Status {
+ return
writer->WriteDecimal(DataDefine::GetVariantValue<Decimal>(field), precision);
+ };
+ break;
+ }
+ case arrow::Type::type::LIST: {
+ field_writer = [field_type](int32_t pos, const VariantType& field,
+ RowWriter* writer) -> Status {
+ return writer->WriteArray(
+
DataDefine::GetVariantValue<std::shared_ptr<InternalArray>>(field), field_type);
+ };
+ break;
+ }
+ case arrow::Type::type::MAP: {
+ field_writer = [field_type](int32_t pos, const VariantType& field,
+ RowWriter* writer) -> Status {
+ return writer->WriteMap(
+
DataDefine::GetVariantValue<std::shared_ptr<InternalMap>>(field), field_type);
+ };
+ break;
+ }
+ case arrow::Type::type::STRUCT: {
+ auto struct_type =
arrow::internal::checked_pointer_cast<arrow::StructType>(field_type);
+ assert(struct_type);
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<RowCompactedSerializer> serializer,
+
RowCompactedSerializer::Create(arrow::schema(struct_type->fields()), pool));
+ field_writer = [serializer](int32_t pos, const VariantType& field,
+ RowWriter* writer) -> Status {
+ return writer->WriteRow(
+
DataDefine::GetVariantValue<std::shared_ptr<InternalRow>>(field), serializer);
+ };
+ break;
+ }
+ default:
+ return Status::Invalid(
+ fmt::format("type {} not support in FieldWriter in row
compacted serializer",
+ field_type->ToString()));
+ }
+
+ RowCompactedSerializer::FieldWriter ret = [field_writer](int32_t pos,
const VariantType& field,
+ RowWriter*
writer) -> Status {
+ if (DataDefine::IsVariantNull(field)) {
+ writer->SetNullAt(pos);
+ return Status::OK();
+ }
+ return field_writer(pos, field, writer);
+ };
+ return ret;
+}
+
+RowCompactedSerializer::RowWriter::RowWriter(int32_t header_size_in_bytes,
+ const
std::shared_ptr<MemoryPool>& pool)
+ : header_size_in_bytes_(header_size_in_bytes), pool_(pool) {
+ int32_t initial_capacity = std::max(64, header_size_in_bytes);
+ SetBuffer(std::make_shared<Bytes>(initial_capacity, pool_.get()));
+ position_ = header_size_in_bytes_;
+}
+
+Status RowCompactedSerializer::RowWriter::WriteDecimal(const Decimal& value,
int32_t precision) {
+ if (Decimal::IsCompact(precision)) {
+ return WriteValue<int64_t>(value.ToUnscaledLong());
+ } else {
+ auto value_bytes = value.ToUnscaledBytes();
+ return WriteBinary(&value_bytes);
+ }
+}
+
+Status RowCompactedSerializer::RowWriter::WriteTimestamp(const Timestamp&
value,
+ int32_t precision) {
+ if (Timestamp::IsCompact(precision)) {
+ return WriteValue<int64_t>(value.GetMillisecond());
+ } else {
+ PAIMON_RETURN_NOT_OK(WriteValue<int64_t>(value.GetMillisecond()));
+ return WriteUnsignedInt(value.GetNanoOfMillisecond());
+ }
+}
+
+Status RowCompactedSerializer::RowWriter::WriteRow(
+ const std::shared_ptr<InternalRow>& value,
+ const std::shared_ptr<RowCompactedSerializer>& serializer) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes,
serializer->SerializeToBytes(*value));
+ return WriteBinary(bytes.get());
+}
+
+Status RowCompactedSerializer::RowWriter::WriteArray(const
std::shared_ptr<InternalArray>& value,
+ const
std::shared_ptr<arrow::DataType>& type) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryArray> binary_array,
+ BinarySerializerUtils::WriteBinaryArray(value,
type, pool_.get()));
+ return WriteSegment(binary_array->GetSegment(), binary_array->GetOffset(),
+ binary_array->GetSizeInBytes());
+}
+
+Status RowCompactedSerializer::RowWriter::WriteMap(const
std::shared_ptr<InternalMap>& value,
+ const
std::shared_ptr<arrow::DataType>& type) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryMap> binary_map,
+ BinarySerializerUtils::WriteBinaryMap(value, type,
pool_.get()));
+ return WriteSegment(binary_map->GetSegment(), binary_map->GetOffset(),
+ binary_map->GetSizeInBytes());
+}
+
+std::shared_ptr<Bytes> RowCompactedSerializer::RowWriter::CopyBuffer() const {
+ return Bytes::CopyOf(*buffer_, position_, pool_.get());
+}
+
+Status RowCompactedSerializer::RowWriter::WriteUnsignedInt(int32_t value) {
+ EnsureCapacity(VarLengthIntUtils::kMaxVarIntSize);
+ PAIMON_ASSIGN_OR_RAISE(int32_t len,
+ VarLengthIntUtils::EncodeInt(value, buffer_->data()
+ position_));
+ position_ += len;
+ return Status::OK();
+}
+
+Status RowCompactedSerializer::RowWriter::WriteSegment(const MemorySegment&
segment, int32_t off,
+ int32_t len) {
+ PAIMON_RETURN_NOT_OK(WriteUnsignedInt(len));
+ EnsureCapacity(len);
+ MemorySegmentUtils::CopyToBytes({segment}, off, buffer_.get(), position_,
len);
+ position_ += len;
+ return Status::OK();
+}
+
+void RowCompactedSerializer::RowWriter::EnsureCapacity(int32_t size) {
+ if (static_cast<int32_t>(buffer_->size()) - position_ < size) {
+ Grow(size);
+ }
+}
+
+void RowCompactedSerializer::RowWriter::Grow(int32_t min_capacity_add) {
+ auto current_len = static_cast<int32_t>(buffer_->size());
+ int32_t new_len = std::max(current_len * 2, current_len +
min_capacity_add);
+
+ auto new_buffer = std::make_shared<Bytes>(new_len, pool_.get());
+ memcpy(new_buffer->data(), buffer_->data(), current_len);
+ SetBuffer(std::move(new_buffer));
+}
+
+void RowCompactedSerializer::RowWriter::SetBuffer(std::shared_ptr<Bytes>
new_buffer) {
+ buffer_ = std::move(new_buffer);
+ segment_ = MemorySegment::Wrap(buffer_);
+}
+
+void RowCompactedSerializer::RowReader::PointTo(const std::shared_ptr<Bytes>&
bytes) {
+ MemorySegment seg = MemorySegment::Wrap(bytes);
+ PointTo(seg, 0);
+}
+
+void RowCompactedSerializer::RowReader::PointTo(const MemorySegment& segment,
int32_t offset) {
+ segment_ = segment;
+
+ offset_ = offset;
+ position_ = offset + header_size_in_bytes_;
+}
+
+Result<const RowKind*> RowCompactedSerializer::RowReader::ReadRowKind() const {
+ char b = segment_.Get(offset_);
+ return RowKind::FromByteValue(static_cast<int8_t>(b));
+}
+
+Result<std::string_view> RowCompactedSerializer::RowReader::ReadStringView() {
+ PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+ std::string_view str(segment_.Data() + position_, length);
+ position_ += length;
+ return str;
+}
+
+Result<std::shared_ptr<Bytes>> RowCompactedSerializer::RowReader::ReadBinary()
{
+ PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+ auto bytes = std::make_shared<Bytes>(length, pool_.get());
+ segment_.Get(position_, bytes.get(), /*offset=*/0, length);
+ position_ += length;
+ return bytes;
+}
+
+Result<Decimal> RowCompactedSerializer::RowReader::ReadDecimal(int32_t
precision, int32_t scale) {
+ if (Decimal::IsCompact(precision)) {
+ return Decimal::FromUnscaledLong(ReadValue<int64_t>(), precision,
scale);
+ } else {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, ReadBinary());
+ return Decimal::FromUnscaledBytes(precision, scale, bytes.get());
+ }
+}
+
+Result<Timestamp> RowCompactedSerializer::RowReader::ReadTimestamp(int32_t
precision) {
+ if (Timestamp::IsCompact(precision)) {
+ return Timestamp::FromEpochMillis(ReadValue<int64_t>());
+ }
+ auto milliseconds = ReadValue<int64_t>();
+ PAIMON_ASSIGN_OR_RAISE(int32_t nanos_of_millisecond, ReadUnsignedInt());
+ return Timestamp::FromEpochMillis(milliseconds, nanos_of_millisecond);
+}
+
+Result<std::shared_ptr<InternalArray>>
RowCompactedSerializer::RowReader::ReadArray() {
+ auto value = std::make_shared<BinaryArray>();
+ PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+ value->PointTo(segment_, position_, length);
+ position_ += length;
+ return value;
+}
+
+Result<std::shared_ptr<InternalMap>>
RowCompactedSerializer::RowReader::ReadMap() {
+ auto value = std::make_shared<BinaryMap>();
+ PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+ value->PointTo(segment_, position_, length);
+ position_ += length;
+ return value;
+}
+
+Result<std::shared_ptr<InternalRow>>
RowCompactedSerializer::RowReader::ReadRow(
+ const std::shared_ptr<RowCompactedSerializer>& serializer) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, ReadBinary());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InternalRow> result,
serializer->Deserialize(bytes));
+ return result;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.h
b/src/paimon/common/data/serializer/row_compacted_serializer.h
new file mode 100644
index 0000000..9acda1c
--- /dev/null
+++ b/src/paimon/common/data/serializer/row_compacted_serializer.h
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <functional>
+
+#include "arrow/api.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_writer.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/utils/var_length_int_utils.h"
+namespace paimon {
+
+class RowCompactedSerializer {
+ public:
+ static Result<std::unique_ptr<RowCompactedSerializer>> Create(
+ const std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<MemoryPool>& pool);
+
+ static int32_t CalculateBitSetInBytes(int32_t arity) {
+ return (arity + 7 + BinaryRow::HEADER_SIZE_IN_BITS) / 8;
+ }
+
+ Result<std::shared_ptr<Bytes>> SerializeToBytes(const InternalRow& row);
+
+ Result<std::unique_ptr<InternalRow>> Deserialize(const
std::shared_ptr<Bytes>& bytes);
+
+ static Result<MemorySlice::SliceComparator> CreateSliceComparator(
+ const std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<MemoryPool>& pool);
+
+ private:
+ struct FieldInfo {
+ arrow::Type::type type_id;
+ int32_t precision = -1;
+ int32_t scale = -1;
+ };
+
+ class RowWriter {
+ public:
+ RowWriter(int32_t header_size_in_bytes, const
std::shared_ptr<MemoryPool>& pool);
+
+ void Reset() {
+ position_ = header_size_in_bytes_;
+ std::memset(buffer_->data(), 0, header_size_in_bytes_);
+ }
+
+ void WriteRowKind(const RowKind& kind) {
+ (*buffer_)[0] = static_cast<char>(kind.ToByteValue());
+ }
+
+ void SetNullAt(int32_t pos) {
+ MemorySegmentUtils::BitSet(&segment_, 0, pos +
BinaryRow::HEADER_SIZE_IN_BITS);
+ }
+
+ template <typename T>
+ Status WriteValue(const T& value) {
+ EnsureCapacity(sizeof(T));
+ segment_.PutValue<T>(position_, value);
+ position_ += sizeof(T);
+ return Status::OK();
+ }
+
+ Status WriteString(const BinaryString& value) {
+ return WriteSegment(value.GetSegment(), value.GetOffset(),
value.GetSizeInBytes());
+ }
+
+ Status WriteStringView(const std::string_view& view) {
+ return WriteBinary(&view);
+ }
+
+ template <typename T>
+ Status WriteBinary(const T* bytes) {
+ PAIMON_RETURN_NOT_OK(WriteUnsignedInt(bytes->size()));
+ EnsureCapacity(bytes->size());
+ memcpy(buffer_->data() + position_, bytes->data(), bytes->size());
+ position_ += bytes->size();
+ return Status::OK();
+ }
+
+ Status WriteDecimal(const Decimal& value, int32_t precision);
+ Status WriteTimestamp(const Timestamp& value, int32_t precision);
+
+ Status WriteArray(const std::shared_ptr<InternalArray>& value,
+ const std::shared_ptr<arrow::DataType>& type);
+ Status WriteMap(const std::shared_ptr<InternalMap>& value,
+ const std::shared_ptr<arrow::DataType>& type);
+
+ Status WriteRow(const std::shared_ptr<InternalRow>& value,
+ const std::shared_ptr<RowCompactedSerializer>&
serializer);
+
+ std::shared_ptr<Bytes> CopyBuffer() const;
+
+ private:
+ Status WriteUnsignedInt(int32_t value);
+ Status WriteSegment(const MemorySegment& segment, int32_t off, int32_t
len);
+ void EnsureCapacity(int32_t size);
+ void Grow(int32_t min_capacity_add);
+ void SetBuffer(std::shared_ptr<Bytes> new_buffer);
+
+ private:
+ const int32_t header_size_in_bytes_;
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<Bytes> buffer_;
+ MemorySegment segment_;
+ int32_t position_ = 0;
+ };
+
+ class RowReader {
+ public:
+ RowReader(int32_t header_size_in_bytes, const
std::shared_ptr<MemoryPool>& pool)
+ : header_size_in_bytes_(header_size_in_bytes), pool_(pool) {}
+
+ void PointTo(const std::shared_ptr<Bytes>& bytes);
+
+ void PointTo(const MemorySegment& segment, int32_t offset);
+
+ Result<const RowKind*> ReadRowKind() const;
+
+ bool IsNullAt(int32_t pos) const {
+ return MemorySegmentUtils::BitGet(segment_, offset_,
+ pos +
BinaryRow::HEADER_SIZE_IN_BITS);
+ }
+
+ template <typename T>
+ T ReadValue() {
+ T value = segment_.GetValue<T>(position_);
+ position_ += sizeof(T);
+ return value;
+ }
+
+ Result<std::string_view> ReadStringView();
+
+ Result<std::shared_ptr<Bytes>> ReadBinary();
+
+ Result<Decimal> ReadDecimal(int32_t precision, int32_t scale);
+
+ Result<Timestamp> ReadTimestamp(int32_t precision);
+
+ Result<std::shared_ptr<InternalArray>> ReadArray();
+ Result<std::shared_ptr<InternalMap>> ReadMap();
+ Result<std::shared_ptr<InternalRow>> ReadRow(
+ const std::shared_ptr<RowCompactedSerializer>& serializer);
+
+ private:
+ Result<int32_t> ReadUnsignedInt() {
+ return VarLengthIntUtils::DecodeInt(segment_.Data(), &position_);
+ }
+
+ private:
+ const int32_t header_size_in_bytes_;
+ std::shared_ptr<MemoryPool> pool_;
+ MemorySegment segment_;
+ int32_t offset_ = 0;
+ int32_t position_ = 0;
+ };
+
+ /// Read and compare a single field from two RowReaders.
+ static Result<int32_t> CompareField(const FieldInfo& field_info,
RowReader* reader1,
+ RowReader* reader2);
+
+ using FieldWriter = std::function<Status(int32_t, const VariantType&,
RowWriter*)>;
+ using FieldReader = std::function<Result<VariantType>(int32_t,
RowReader*)>;
+
+ private:
+ RowCompactedSerializer(const std::shared_ptr<arrow::Schema>& schema,
+ std::vector<InternalRow::FieldGetterFunc>&& getters,
+ std::vector<FieldWriter>&& writers,
std::vector<FieldReader>&& readers,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ static Result<FieldReader> CreateFieldReader(const
std::shared_ptr<arrow::DataType>& field_type,
+ const
std::shared_ptr<MemoryPool>& pool);
+ static Result<FieldWriter> CreateFieldWriter(const
std::shared_ptr<arrow::DataType>& field_type,
+ const
std::shared_ptr<MemoryPool>& pool);
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<arrow::Schema> schema_;
+ std::vector<InternalRow::FieldGetterFunc> getters_;
+ std::vector<FieldWriter> writers_;
+ std::vector<FieldReader> readers_;
+ std::unique_ptr<RowWriter> row_writer_;
+ std::unique_ptr<RowReader> row_reader_;
+};
+
+} // namespace paimon
diff --git
a/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp
b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp
new file mode 100644
index 0000000..cf11108
--- /dev/null
+++ b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/row_compacted_serializer.h"
+
+#include <string>
+
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(RowCompactedSerializerTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ std::shared_ptr<arrow::DataType> arrow_type = arrow::struct_({
+ arrow::field("f1", arrow::boolean()),
+ arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()),
+ arrow::field("f4", arrow::int32()),
+ arrow::field("f5", arrow::int64()),
+ arrow::field("f6", arrow::float32()),
+ arrow::field("f7", arrow::float64()),
+ arrow::field("f8", arrow::utf8()),
+ arrow::field("f9", arrow::binary()),
+ arrow::field("f10", arrow::date32()),
+ arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f12", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("f13", arrow::decimal128(5, 2)),
+ arrow::field("f14", arrow::decimal128(30, 5)),
+ });
+
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+ [true, 0, 32767, 2147483647, 4294967295, 0.5, 1.141, "20250327",
"banana", 2026, 1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+ [null, null, null, null, null, null, null, null, null, null, null,
null, null, null]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+ {
+ auto columnar_row = std::make_shared<ColumnarRow>(
+ /*struct_array=*/nullptr, struct_array->fields(), pool,
/*row_id=*/0);
+ columnar_row->SetRowKind(RowKind::UpdateAfter());
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto de_row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(*(de_row->GetRowKind().value()), *(RowKind::UpdateAfter()));
+ ASSERT_EQ(de_row->GetFieldCount(), 14);
+ ASSERT_FALSE(de_row->IsNullAt(0));
+
+ ASSERT_EQ(de_row->GetBoolean(0), true);
+ ASSERT_EQ(de_row->GetByte(1), 0);
+ ASSERT_EQ(de_row->GetShort(2), 32767);
+ ASSERT_EQ(de_row->GetInt(3), 2147483647);
+ ASSERT_EQ(de_row->GetLong(4), 4294967295l);
+ ASSERT_EQ(de_row->GetFloat(5), 0.5);
+ ASSERT_EQ(de_row->GetDouble(6), 1.141);
+ ASSERT_EQ(de_row->GetStringView(7), "20250327");
+ auto f9_bytes = std::make_shared<Bytes>("banana", pool.get());
+ ASSERT_EQ(*de_row->GetBinary(8), *f9_bytes);
+ ASSERT_EQ(de_row->GetDate(9), 2026);
+ ASSERT_EQ(de_row->GetTimestamp(10, 9),
+ Timestamp(/*millisecond=*/1732603136054ll,
/*nano_of_millisecond*/ 54));
+ ASSERT_EQ(de_row->GetTimestamp(11, 0),
+ Timestamp(/*millisecond=*/11000ll, /*nano_of_millisecond*/
0));
+ ASSERT_EQ(de_row->GetDecimal(12, 5, 2),
+ Decimal(5, 2, DecimalUtils::StrToInt128("5502").value()));
+ ASSERT_EQ(de_row->GetDecimal(13, 30, 5),
+ Decimal(30, 5,
DecimalUtils::StrToInt128("-12345678998765432145678").value()));
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 2, 0, 0, 1, 0, 255, 127, 255, 255, 255, 127, 255, 255, 255,
255, 0, 0, 0,
+ 0, 0, 0, 0, 63, 168, 198, 75, 55, 137, 65, 242, 63, 8,
50, 48, 50, 53,
+ 48, 51, 50, 55, 6, 98, 97, 110, 97, 110, 97, 234, 7, 0,
0, 54, 200, 49,
+ 103, 147, 1, 0, 0, 54, 248, 42, 0, 0, 0, 0, 0, 0,
126, 21, 0, 0,
+ 0, 0, 0, 0, 10, 253, 98, 189, 73, 88, 213, 98, 56, 248,
242};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+ }
+ {
+ // all null
+ auto columnar_row = std::make_shared<ColumnarRow>(
+ /*struct_array=*/nullptr, struct_array->fields(), pool,
/*row_id=*/1);
+ columnar_row->SetRowKind(RowKind::UpdateAfter());
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto de_row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(*(de_row->GetRowKind().value()), *(RowKind::UpdateAfter()));
+ ASSERT_EQ(de_row->GetFieldCount(), 14);
+ ASSERT_TRUE(de_row->IsNullAt(0));
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {2, 255, 63};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+ }
+}
+
+TEST(RowCompactedSerializerTest, TestNestedType) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto key_field = arrow::field("key", arrow::int32());
+ auto value_field = arrow::field("value", arrow::int32());
+
+ auto inner_child1 = arrow::field(
+ "inner1", arrow::map(arrow::int32(), arrow::field("inner_list",
arrow::list(value_field))));
+ auto inner_child2 = arrow::field(
+ "inner2", arrow::map(arrow::int32(),
+ arrow::field("inner_map",
arrow::map(arrow::int32(), value_field))));
+ auto inner_child3 = arrow::field(
+ "inner3",
+ arrow::map(arrow::int32(),
+ arrow::field("inner_struct", arrow::struct_({key_field,
value_field}))));
+
+ auto arrow_type = arrow::struct_({inner_child1, inner_child2,
inner_child3});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[[100, [1, 2, 3, 4]], [101, [5, 6, 7]]],
+[[200, [[500, 1]]], [201, [[501, 2]]]],
+[[600, [100, 200]]]]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 3);
+
+ // for inner_child1
+ ASSERT_EQ(row->GetMap(0)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({100, 101}));
+ auto value1 = row->GetMap(0)->ValueArray();
+ ASSERT_EQ(value1->Size(), 2);
+ ASSERT_EQ(value1->GetArray(0)->ToIntArray().value(),
std::vector<int32_t>({1, 2, 3, 4}));
+ ASSERT_EQ(value1->GetArray(1)->ToIntArray().value(),
std::vector<int32_t>({5, 6, 7}));
+
+ // for inner_child2
+ ASSERT_EQ(row->GetMap(1)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({200, 201}));
+ auto value2 = row->GetMap(1)->ValueArray();
+ ASSERT_EQ(value2->Size(), 2);
+ ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({500}));
+ ASSERT_EQ(value2->GetMap(0)->ValueArray()->ToIntArray().value(),
std::vector<int32_t>({1}));
+ ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({501}));
+ ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(),
std::vector<int32_t>({2}));
+
+ // for inner_child3
+ ASSERT_EQ(row->GetMap(2)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({600}));
+ auto value3 = row->GetMap(2)->ValueArray();
+ ASSERT_EQ(value3->Size(), 1);
+ ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+ ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(1), 200);
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 0, 0, 92, 16, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 100, 0,
0, 0, 101, 0, 0,
+ 0, 2, 0, 0, 0, 0, 0, 0, 0, 24, 0, 0, 0, 24, 0, 0, 0,
24, 0, 0, 0, 48,
+ 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0,
0, 0, 3, 0, 0,
+ 0, 4, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0,
6, 0, 0, 0, 7,
+ 0, 0, 0, 0, 0, 0, 0, 124, 16, 0, 0, 0, 2, 0, 0, 0, 0,
0, 0, 0, 200, 0,
+ 0, 0, 201, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 36, 0, 0,
0, 24, 0, 0, 0,
+ 36, 0, 0, 0, 64, 0, 0, 0, 16, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 0, 244, 1,
+ 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0,
0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 245,
1, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 60, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 88, 2, 0, 0,
0, 0, 0, 0, 1,
+ 0, 0, 0, 0, 0, 0, 0, 24, 0, 0, 0, 16, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 0, 100, 0, 0, 0, 0, 0, 0, 0, 200, 0, 0, 0, 0, 0, 0, 0};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+}
+
+TEST(RowCompactedSerializerTest, TestNestedTypeWithNull) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto key_field = arrow::field("key", arrow::int32());
+ auto value_field = arrow::field("value", arrow::int32());
+
+ auto inner_child1 = arrow::field(
+ "inner1", arrow::map(arrow::int32(), arrow::field("inner_list",
arrow::list(value_field))));
+ auto inner_child2 = arrow::field(
+ "inner2", arrow::map(arrow::int32(),
+ arrow::field("inner_map",
arrow::map(arrow::int32(), value_field))));
+ auto inner_child3 = arrow::field(
+ "inner3",
+ arrow::map(arrow::int32(),
+ arrow::field("inner_struct", arrow::struct_({key_field,
value_field}))));
+
+ auto arrow_type = arrow::struct_({inner_child1, inner_child2,
inner_child3});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[[100, null], [101, [5, 6, null]]],
+[[200, [[500, null]]], [201, [[501, 2]]]],
+[[600, [100, null]]]]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 3);
+
+ // for inner_child1
+ ASSERT_EQ(row->GetMap(0)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({100, 101}));
+ auto value1 = row->GetMap(0)->ValueArray();
+ ASSERT_EQ(value1->Size(), 2);
+ ASSERT_TRUE(value1->IsNullAt(0));
+ ASSERT_EQ(value1->GetArray(1)->GetInt(0), 5);
+ ASSERT_EQ(value1->GetArray(1)->GetInt(1), 6);
+ ASSERT_TRUE(value1->GetArray(1)->IsNullAt(2));
+
+ // for inner_child2
+ ASSERT_EQ(row->GetMap(1)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({200, 201}));
+ auto value2 = row->GetMap(1)->ValueArray();
+ ASSERT_EQ(value2->Size(), 2);
+ ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({500}));
+ ASSERT_TRUE(value2->GetMap(0)->ValueArray()->IsNullAt(0));
+ ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({501}));
+ ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(),
std::vector<int32_t>({2}));
+
+ // for inner_child3
+ ASSERT_EQ(row->GetMap(2)->KeyArray()->ToIntArray().value(),
std::vector<int32_t>({600}));
+ auto value3 = row->GetMap(2)->ValueArray();
+ ASSERT_EQ(value3->Size(), 1);
+ ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+ ASSERT_TRUE(value3->GetRow(0, /*num_fields=*/2)->IsNullAt(1));
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 0, 0, 68, 16, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0,
0, 101, 0, 0,
+ 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 24,
0, 0, 0, 24,
+ 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0, 6, 0, 0,
0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 124, 16, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0,
200, 0, 0, 0,
+ 201, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 36, 0, 0, 0, 24, 0,
0, 0, 36, 0,
+ 0, 0, 64, 0, 0, 0, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
244, 1, 0, 0,
+ 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
+ 0, 0, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 245, 1, 0, 0,
0, 0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 60, 16,
+ 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 88, 2, 0, 0, 0, 0, 0,
0, 1, 0, 0,
+ 0, 0, 0, 0, 0, 24, 0, 0, 0, 16, 0, 0, 0, 0, 2, 0, 0, 0,
0, 0, 0, 100,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+}
+
+TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal) {
+ // test map/list with nested child types
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto inner_child1 = arrow::field(
+ "inner1",
+ arrow::map(
+ arrow::int32(),
+ arrow::field("inner_list", arrow::list(arrow::field("f0",
arrow::decimal128(5, 2))))));
+ auto inner_child2 = arrow::field(
+ "inner2",
+ arrow::map(
+ arrow::int32(),
+ arrow::field("inner_list", arrow::list(arrow::field("f1",
arrow::decimal128(30, 5))))));
+ auto inner_child3 = arrow::field(
+ "inner3",
+ arrow::map(arrow::int32(),
+ arrow::field("inner_struct",
+ arrow::struct_({
+ arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("f2", arrow::decimal128(5,
2)),
+ arrow::field("f3", arrow::decimal128(30,
5)),
+ }))));
+
+ auto arrow_type = arrow::struct_({inner_child1, inner_child2,
inner_child3});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[[100, ["5.12", "6.12"]]],
+[[200, ["-123456789987654321.45678", "23456789987654321.45678"]]],
+[[300, [1732603136054000054, 11, "7.89", "3456789987654321.45678"]]]],
+[[[102, [null, "6.12"]]],
+[[202, ["-123456789987654321.45678", null]]],
+[[302, [null, null, null, null]]]]
+ ])")
+ .ValueOrDie();
+
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+
+ {
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 3);
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 0, 0, 60, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
0, 100, 0, 0,
+ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 24,
0, 0, 0, 16,
+ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0,
0, 0, 0, 0,
+ 0, 100, 2, 0, 0, 0, 0, 0, 0, 92, 16, 0, 0, 0,
1, 0, 0, 0,
+ 0, 0, 0, 0, 200, 0, 0, 0, 0, 0, 0, 0, 1, 0,
0, 0, 0, 0,
+ 0, 0, 56, 0, 0, 0, 16, 0, 0, 0, 2, 0, 0, 0,
0, 0, 0, 0,
+ 10, 0, 0, 0, 24, 0, 0, 0, 9, 0, 0, 0, 40, 0,
0, 0, 253, 98,
+ 189, 73, 88, 213, 98, 56, 248, 242, 0, 0, 0, 0, 0, 0,
127, 40, 213, 221,
+ 111, 235, 135, 7, 14, 0, 0, 0, 0, 0, 0, 0, 100, 16,
0, 0, 0, 1,
+ 0, 0, 0, 0, 0, 0, 0, 44, 1, 0, 0, 0, 0, 0,
0, 1, 0, 0,
+ 0, 0, 0, 0, 0, 64, 0, 0, 0, 16, 0, 0, 0, 0,
0, 0, 0, 0,
+ 0, 0, 0, 54, 0, 0, 0, 40, 0, 0, 0, 248, 42, 0,
0, 0, 0, 0,
+ 0, 21, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 48,
0, 0, 0, 54,
+ 200, 49, 103, 147, 1, 0, 0, 18, 189, 66, 129, 228, 46, 71,
7, 14, 0, 0,
+ 0, 0, 0, 0, 0};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+ }
+ {
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/1);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 3);
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 0, 0, 60, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
102, 0, 0, 0, 0, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 24, 0, 0, 0, 16, 0,
0, 0, 2, 0, 0,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 2, 0,
0, 0, 0, 0, 0,
+ 76, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 202, 0, 0,
0, 0, 0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 0, 0, 40, 0, 0, 0, 16, 0, 0, 0,
2, 0, 0, 0, 2,
+ 0, 0, 0, 10, 0, 0, 0, 24, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 253, 98,
+ 189, 73, 88, 213, 98, 56, 248, 242, 0, 0, 0, 0, 0, 0, 100,
16, 0, 0, 0, 1, 0,
+ 0, 0, 0, 0, 0, 0, 46, 1, 0, 0, 0, 0, 0, 0, 1, 0,
0, 0, 0, 0, 0,
+ 0, 64, 0, 0, 0, 16, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 0, 0, 0, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+ }
+}
+TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) {
+ // test struct with nested child types
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto inner_child1 = arrow::field(
+ "inner1",
+ arrow::map(
+ arrow::int32(),
+ arrow::field("inner_list", arrow::list(arrow::field("f0",
arrow::decimal128(5, 2))))));
+ auto inner_child2 = arrow::field(
+ "inner2",
+ arrow::map(
+ arrow::int32(),
+ arrow::field("inner_list", arrow::list(arrow::field("f1",
arrow::decimal128(30, 5))))));
+ auto inner_child3 = arrow::field(
+ "inner3",
+ arrow::map(arrow::int32(),
+ arrow::field("inner_struct",
+ arrow::struct_({
+ arrow::field("f0",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f1",
arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("f2", arrow::decimal128(5,
2)),
+ arrow::field("f3", arrow::decimal128(30,
5)),
+ }))));
+ auto child1 =
+ arrow::field("child1", arrow::struct_({inner_child1, inner_child2,
inner_child3}));
+ auto child2 = arrow::field("child2", arrow::utf8());
+ auto arrow_type = arrow::struct_({child1, child2});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[[[100, ["5.12", "6.12"]]],
+[[200, ["-123456789987654321.45678", "23456789987654321.45678"]]],
+[[300, [1732603136054000054, 11, "7.89", "3456789987654321.45678"]]]],
+"Alice"],
+[[[[102, [null, "6.12"]]],
+[[202, ["-123456789987654321.45678", null]]],
+[[302, [null, null, null, null]]]],
+"Bob"]
+ ])")
+ .ValueOrDie();
+
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+
+ {
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 2);
+ auto inner_row = row->GetRow(0, 3);
+ ASSERT_EQ(inner_row->GetFieldCount(), 3);
+ ASSERT_EQ(row->GetStringView(1), "Alice");
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 0, 0, 129, 2, 0, 0, 60, 16, 0, 0, 0, 1, 0, 0,
0, 0, 0, 0,
+ 0, 100, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 0, 24,
+ 0, 0, 0, 16, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
0, 0, 2, 0,
+ 0, 0, 0, 0, 0, 100, 2, 0, 0, 0, 0, 0, 0,
92, 16, 0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 0, 0, 200, 0, 0, 0, 0, 0,
0, 0, 1, 0,
+ 0, 0, 0, 0, 0, 0, 56, 0, 0, 0, 16, 0, 0, 0,
2, 0, 0, 0,
+ 0, 0, 0, 0, 10, 0, 0, 0, 24, 0, 0, 0, 9, 0,
0, 0, 40, 0,
+ 0, 0, 253, 98, 189, 73, 88, 213, 98, 56, 248, 242, 0, 0,
0, 0, 0, 0,
+ 127, 40, 213, 221, 111, 235, 135, 7, 14, 0, 0, 0, 0, 0,
0, 0, 100, 16,
+ 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 44, 1, 0,
0, 0, 0, 0,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0,
16, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 54, 0, 0, 0, 40, 0, 0,
0, 248, 42, 0,
+ 0, 0, 0, 0, 0, 21, 3, 0, 0, 0, 0, 0, 0, 9,
0, 0, 0, 48,
+ 0, 0, 0, 54, 200, 49, 103, 147, 1, 0, 0, 18, 189,
66, 129, 228, 46, 71,
+ 7, 14, 0, 0, 0, 0, 0, 0, 0, 5, 65, 108, 105,
99, 101};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+ }
+ {
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/1);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 2);
+ auto inner_row = row->GetRow(0, 3);
+ ASSERT_EQ(inner_row->GetFieldCount(), 3);
+ ASSERT_EQ(row->GetStringView(1), "Bob");
+
+ // test compatibility
+ std::vector<uint8_t> java_bytes = {
+ 0, 0, 241, 1, 0, 0, 60, 16, 0, 0, 0, 1, 0, 0, 0, 0,
0, 0, 0, 102, 0,
+ 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 24, 0,
0, 0, 16, 0, 0,
+ 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 100, 2, 0, 0,
+ 0, 0, 0, 0, 76, 16, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
0, 202, 0, 0, 0,
+ 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 40, 0, 0, 0,
16, 0, 0, 0, 2,
+ 0, 0, 0, 2, 0, 0, 0, 10, 0, 0, 0, 24, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 0, 0, 253, 98, 189, 73, 88, 213, 98, 56, 248, 242, 0, 0, 0, 0,
0, 0, 100, 16, 0,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 46, 1, 0, 0, 0, 0,
0, 0, 1, 0, 0,
+ 0, 0, 0, 0, 0, 64, 0, 0, 0, 16, 0, 0, 0, 0, 15, 0,
0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 48, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 66,
111, 98};
+ std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() +
bytes->size());
+ ASSERT_EQ(java_bytes, cpp_bytes);
+ }
+}
+
+TEST(RowCompactedSerializerTest, TestListType) {
+ auto pool = GetDefaultPool();
+ // prepare data
+ auto inner_child1 = arrow::field("inner1", arrow::list(arrow::int32()));
+ auto arrow_type = arrow::struct_({inner_child1});
+ // each inner child per row
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+
R"([
+[[5, 6, 7]],
+[[1, 2, 3]],
+[[4]]
+ ])")
+ .ValueOrDie();
+ auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ auto columnar_row =
+ std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
struct_array->fields(), pool,
+ /*row_id=*/0);
+
+ // serialize and deserialize
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+ ASSERT_OK_AND_ASSIGN(auto bytes,
serializer->SerializeToBytes(*columnar_row));
+ ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+ // check result
+ ASSERT_EQ(row->GetFieldCount(), 1);
+
+ // for inner_child1
+ ASSERT_EQ(row->GetArray(0)->ToIntArray().value(), std::vector<int32_t>({5,
6, 7}));
+}
+
+TEST(RowCompactedSerializerTest, TestSliceComparator) {
+ auto pool = GetDefaultPool();
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::boolean()),
+ arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()),
+ arrow::field("f4", arrow::int32()),
+ arrow::field("f5", arrow::int64()),
+ arrow::field("f6", arrow::float32()),
+ arrow::field("f7", arrow::float64()),
+ arrow::field("f8", arrow::utf8()),
+ arrow::field("f9", arrow::binary()),
+ arrow::field("f10", arrow::date32()),
+ arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f12", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("f13", arrow::decimal128(5, 2)),
+ arrow::field("f14", arrow::decimal128(30, 5)),
+ };
+ auto check_result = [&](const std::shared_ptr<arrow::DataType>& type,
+ const std::shared_ptr<arrow::Array>& array) {
+ auto struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(array);
+ ASSERT_TRUE(struct_array);
+ ASSERT_OK_AND_ASSIGN(auto serializer,
+
RowCompactedSerializer::Create(arrow::schema(type->fields()), pool));
+ auto columnar_row1 = std::make_shared<ColumnarRow>(
+ /*struct_array=*/nullptr, struct_array->fields(), pool,
/*row_id=*/0);
+ auto columnar_row2 = std::make_shared<ColumnarRow>(
+ /*struct_array=*/nullptr, struct_array->fields(), pool,
/*row_id=*/1);
+
+ ASSERT_OK_AND_ASSIGN(auto bytes1,
serializer->SerializeToBytes(*columnar_row1));
+ auto slice1 = MemorySlice::Wrap(bytes1);
+ ASSERT_OK_AND_ASSIGN(auto bytes2,
serializer->SerializeToBytes(*columnar_row2));
+ auto slice2 = MemorySlice::Wrap(bytes2);
+
+ ASSERT_OK_AND_ASSIGN(auto comparator,
RowCompactedSerializer::CreateSliceComparator(
+
arrow::schema(type->fields()), pool));
+ ASSERT_EQ(-1, comparator(slice1, slice2).value());
+ ASSERT_EQ(1, comparator(slice2, slice1).value());
+ ASSERT_EQ(0, comparator(slice1, slice1).value());
+ ASSERT_EQ(0, comparator(slice2, slice2).value());
+ };
+ // test various type
+ {
+ auto type = arrow::struct_({fields[0], fields[1], fields[2],
fields[3]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [false, 0, 32767, 2147483647],
+ [true, 0, 32767, 2147483647]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1], fields[2],
fields[3]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [true, 0, 32767, 2147483647],
+ [true, 1, 32767, 2147483647]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1], fields[2],
fields[3]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [true, 0, 32766, 2147483647],
+ [true, 0, 32767, 2147483647]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1], fields[2],
fields[3]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [true, 0, 32767, 2147483646],
+ [true, 0, 32767, 2147483647]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[4], fields[5], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [4294967295, 10.1, 100.123],
+ [4294967296, 10.1, 100.123]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[4], fields[5], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [4294967295, 10.1, 100.123],
+ [4294967295, 10.11, 100.123]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[4], fields[5], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [4294967295, 10.1, 100.123],
+ [4294967295, 10.1, 100.124]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[7], fields[8], fields[9]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ ["Alice", "这是一个中文", 10],
+ ["Bob", "这是一个中文", 10]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[7], fields[8], fields[9]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ ["Alice", "这是一个中文", 10],
+ ["Alice", "这是一个中文!", 10]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[7], fields[8], fields[9]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ ["Alice", "这是一个中文", 10],
+ ["Alice", "这是一个中文", 20]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[10], fields[11], fields[12],
fields[13]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+ [1732603136054000055, 11, "55.02", "-123456789987654321.45678"]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[10], fields[11], fields[12],
fields[13]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+ [1732603136054000054, 12, "55.02", "-123456789987654321.45678"]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[10], fields[11], fields[12],
fields[13]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+ [1732603136054000054, 11, "55.03", "-123456789987654321.45678"]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[10], fields[11], fields[12],
fields[13]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+ [1732603136054000054, 11, "55.02", "-123456789987654321.45670"]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ // test null
+ {
+ auto type = arrow::struct_({fields[0], fields[1]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [false, null],
+ [false, 20]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [null, 20],
+ [null, 21]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [false, null],
+ [true, 20]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [null, 21],
+ [false, 20]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[0], fields[1]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [null, null],
+ [null, 20]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ // test float & double
+ // -infinity < -0.0 < +0.0 < +infinity < NaN == NaN
+ {
+ auto type = arrow::struct_({fields[5], fields[5], fields[5],
fields[6], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [-Inf, -0.0, 0.0, Inf, NaN],
+ [-0.0, -0.0, 0.0, Inf, NaN]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[5], fields[5], fields[5],
fields[6], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [-Inf, -0.0, 0.0, Inf, NaN],
+ [-Inf, 0.0, 0.0, Inf, NaN]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[5], fields[5], fields[5],
fields[6], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [-Inf, -0.0, 0.0, 0.0, NaN],
+ [-Inf, -0.0, 0.0, Inf, NaN]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+ {
+ auto type = arrow::struct_({fields[5], fields[5], fields[5],
fields[6], fields[6]});
+ std::shared_ptr<arrow::Array> array =
arrow::ipc::internal::json::ArrayFromJSON(type,
+
R"([
+ [-Inf, -0.0, 0.0, Inf, Inf],
+ [-Inf, -0.0, 0.0, Inf, NaN]
+ ])")
+ .ValueOrDie();
+ check_result(type, array);
+ }
+}
+
+} // namespace paimon::test