This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ef309f4a feat(avro): optimize writer performance by directly using
encoder (#445)
ef309f4a is described below
commit ef309f4a4b994bf91d2330b861db96ccb9fad15f
Author: Xinli Shang <[email protected]>
AuthorDate: Tue Dec 30 00:54:54 2025 -0800
feat(avro): optimize writer performance by directly using encoder (#445)
Implement direct Avro encoder to eliminate GenericDatum intermediate
layer, matching the decoder approach for better performance.
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/avro/CMakeLists.txt | 6 -
src/iceberg/avro/avro_direct_decoder.cc | 50 +--
src/iceberg/avro/avro_direct_decoder_internal.h | 2 +-
src/iceberg/avro/avro_direct_encoder.cc | 377 ++++++++++++++++++++
src/iceberg/avro/avro_direct_encoder_internal.h | 59 ++++
src/iceberg/avro/avro_reader.cc | 2 +-
src/iceberg/avro/avro_scan.cc | 204 -----------
src/iceberg/avro/avro_writer.cc | 81 ++++-
src/iceberg/file_writer.h | 4 +
src/iceberg/test/avro_test.cc | 434 ++++++++++++++++++++++++
11 files changed, 965 insertions(+), 255 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ca025345..ea249e19 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -160,6 +160,7 @@ if(ICEBERG_BUILD_BUNDLE)
arrow/arrow_fs_file_io.cc
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
+ avro/avro_direct_encoder.cc
avro/avro_reader.cc
avro/avro_writer.cc
avro/avro_register.cc
diff --git a/src/iceberg/avro/CMakeLists.txt b/src/iceberg/avro/CMakeLists.txt
index a7663ba6..f8213038 100644
--- a/src/iceberg/avro/CMakeLists.txt
+++ b/src/iceberg/avro/CMakeLists.txt
@@ -16,9 +16,3 @@
# under the License.
iceberg_install_all_headers(iceberg/avro)
-
-# avro_scan benchmark executable
-add_executable(avro_scan avro_scan.cc)
-target_link_libraries(avro_scan PRIVATE iceberg_bundle_static)
-set_target_properties(avro_scan PROPERTIES RUNTIME_OUTPUT_DIRECTORY
-
"${CMAKE_BINARY_DIR}/src/iceberg/avro")
diff --git a/src/iceberg/avro/avro_direct_decoder.cc
b/src/iceberg/avro/avro_direct_decoder.cc
index 60f79d21..3ab525d9 100644
--- a/src/iceberg/avro/avro_direct_decoder.cc
+++ b/src/iceberg/avro/avro_direct_decoder.cc
@@ -45,7 +45,7 @@ namespace {
Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
const FieldProjection& projection,
const SchemaField& projected_field,
- ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx);
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext& ctx);
/// \brief Skip an Avro value based on its schema without decoding
Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder) {
@@ -146,7 +146,7 @@ Status SkipAvroValue(const ::avro::NodePtr& avro_node,
::avro::Decoder& decoder)
Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node,
::avro::Decoder& decoder,
const std::span<const FieldProjection>&
projections,
const StructType& struct_type,
- ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext& ctx) {
if (avro_node->type() != ::avro::AVRO_RECORD) {
return InvalidArgument("Expected Avro record, got type: {}",
ToString(avro_node));
}
@@ -157,15 +157,15 @@ Status DecodeStructToBuilder(const ::avro::NodePtr&
avro_node, ::avro::Decoder&
// Build a map from Avro field index to projection index (cached per struct
schema)
// -1 means the field should be skipped
const FieldProjection* cache_key = projections.data();
- auto cache_it = ctx->avro_to_projection_cache.find(cache_key);
+ auto cache_it = ctx.avro_to_projection_cache.find(cache_key);
std::vector<int>* avro_to_projection;
- if (cache_it != ctx->avro_to_projection_cache.end()) {
+ if (cache_it != ctx.avro_to_projection_cache.end()) {
// Use cached mapping
avro_to_projection = &cache_it->second;
} else {
// Build and cache the mapping
- auto [inserted_it, inserted] = ctx->avro_to_projection_cache.emplace(
+ auto [inserted_it, inserted] = ctx.avro_to_projection_cache.emplace(
cache_key, std::vector<int>(avro_node->leaves(), -1));
avro_to_projection = &inserted_it->second;
@@ -217,7 +217,7 @@ Status DecodeStructToBuilder(const ::avro::NodePtr&
avro_node, ::avro::Decoder&
Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
const FieldProjection& element_projection,
const ListType& list_type,
- ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext& ctx) {
if (avro_node->type() != ::avro::AVRO_ARRAY) {
return InvalidArgument("Expected Avro array, got type: {}",
ToString(avro_node));
}
@@ -247,7 +247,7 @@ Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node,
::avro::Decoder& dec
const FieldProjection& key_projection,
const FieldProjection& value_projection,
const MapType& map_type, ::arrow::ArrayBuilder*
array_builder,
- DecodeContext* ctx) {
+ DecodeContext& ctx) {
auto* map_builder =
internal::checked_cast<::arrow::MapBuilder*>(array_builder);
if (avro_node->type() == ::avro::AVRO_MAP) {
@@ -317,7 +317,7 @@ Status DecodeNestedValueToBuilder(const ::avro::NodePtr&
avro_node,
const std::span<const FieldProjection>&
projections,
const NestedType& projected_type,
::arrow::ArrayBuilder* array_builder,
- DecodeContext* ctx) {
+ DecodeContext& ctx) {
switch (projected_type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = internal::checked_cast<const
StructType&>(projected_type);
@@ -354,7 +354,7 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr&
avro_node,
::avro::Decoder& decoder,
const SchemaField& projected_field,
::arrow::ArrayBuilder* array_builder,
- DecodeContext* ctx) {
+ DecodeContext& ctx) {
const auto& projected_type = *projected_field.type();
if (!projected_type.is_primitive()) {
return InvalidArgument("Expected primitive type, got: {}",
projected_type.ToString());
@@ -430,8 +430,8 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr&
avro_node,
ToString(avro_node));
}
auto* builder =
internal::checked_cast<::arrow::StringBuilder*>(array_builder);
- decoder.decodeString(ctx->string_scratch);
- ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->string_scratch));
+ decoder.decodeString(ctx.string_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.string_scratch));
return {};
}
@@ -441,9 +441,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr&
avro_node,
ToString(avro_node));
}
auto* builder =
internal::checked_cast<::arrow::BinaryBuilder*>(array_builder);
- decoder.decodeBytes(ctx->bytes_scratch);
+ decoder.decodeBytes(ctx.bytes_scratch);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(
- ctx->bytes_scratch.data(),
static_cast<int32_t>(ctx->bytes_scratch.size())));
+ ctx.bytes_scratch.data(),
static_cast<int32_t>(ctx.bytes_scratch.size())));
return {};
}
@@ -456,9 +456,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr&
avro_node,
auto* builder =
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
- ctx->bytes_scratch.resize(fixed_type.length());
- decoder.decodeFixed(fixed_type.length(), ctx->bytes_scratch);
- ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
+ ctx.bytes_scratch.resize(fixed_type.length());
+ decoder.decodeFixed(fixed_type.length(), ctx.bytes_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data()));
return {};
}
@@ -472,9 +472,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr&
avro_node,
auto* builder =
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
- ctx->bytes_scratch.resize(16);
- decoder.decodeFixed(16, ctx->bytes_scratch);
- ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
+ ctx.bytes_scratch.resize(16);
+ decoder.decodeFixed(16, ctx.bytes_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data()));
return {};
}
@@ -489,11 +489,11 @@ Status DecodePrimitiveValueToBuilder(const
::avro::NodePtr& avro_node,
size_t byte_width = avro_node->fixedSize();
auto* builder =
internal::checked_cast<::arrow::Decimal128Builder*>(array_builder);
- ctx->bytes_scratch.resize(byte_width);
- decoder.decodeFixed(byte_width, ctx->bytes_scratch);
+ ctx.bytes_scratch.resize(byte_width);
+ decoder.decodeFixed(byte_width, ctx.bytes_scratch);
ICEBERG_ARROW_ASSIGN_OR_RETURN(
- auto decimal,
::arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(),
-
ctx->bytes_scratch.size()));
+ auto decimal,
::arrow::Decimal128::FromBigEndian(ctx.bytes_scratch.data(),
+
ctx.bytes_scratch.size()));
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal));
return {};
}
@@ -548,7 +548,7 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr&
avro_node,
Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
const FieldProjection& projection,
const SchemaField& projected_field,
- ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext& ctx) {
if (avro_node->type() == ::avro::AVRO_UNION) {
const size_t branch_index = decoder.decodeUnionIndex();
@@ -585,7 +585,7 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr&
avro_node, ::avro::Decoder& d
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
const SchemaProjection& projection,
const Schema& projected_schema,
- ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext& ctx) {
return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields,
projected_schema, array_builder, ctx);
}
diff --git a/src/iceberg/avro/avro_direct_decoder_internal.h
b/src/iceberg/avro/avro_direct_decoder_internal.h
index df4587fd..5a2cf224 100644
--- a/src/iceberg/avro/avro_direct_decoder_internal.h
+++ b/src/iceberg/avro/avro_direct_decoder_internal.h
@@ -82,6 +82,6 @@ struct DecodeContext {
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
const SchemaProjection& projection,
const Schema& projected_schema,
- ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx);
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext& ctx);
} // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_direct_encoder.cc
b/src/iceberg/avro/avro_direct_encoder.cc
new file mode 100644
index 00000000..caab7f69
--- /dev/null
+++ b/src/iceberg/avro/avro_direct_encoder.cc
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <cstring>
+
+#include <arrow/array.h>
+#include <arrow/extension_type.h>
+#include <arrow/type.h>
+#include <avro/Specific.hh>
+
+#include "iceberg/avro/avro_direct_encoder_internal.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::avro {
+
+namespace {
+
+// Utility struct for union branch information
+struct UnionBranches {
+ size_t null_index;
+ size_t value_index;
+ ::avro::NodePtr value_node;
+};
+
+Result<UnionBranches> ValidateUnion(const ::avro::NodePtr& union_node) {
+ ICEBERG_PRECHECK(union_node->leaves() == 2,
+ "Union must have exactly 2 branches, got {}",
union_node->leaves());
+
+ const auto& branch_0 = union_node->leafAt(0);
+ const auto& branch_1 = union_node->leafAt(1);
+
+ if (branch_0->type() == ::avro::AVRO_NULL && branch_1->type() !=
::avro::AVRO_NULL) {
+ return UnionBranches{.null_index = 0, .value_index = 1, .value_node =
branch_1};
+ }
+ if (branch_1->type() == ::avro::AVRO_NULL && branch_0->type() !=
::avro::AVRO_NULL) {
+ return UnionBranches{.null_index = 1, .value_index = 0, .value_node =
branch_0};
+ }
+ return InvalidArgument("Union must have exactly one null branch");
+}
+
+} // namespace
+
+Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder&
encoder,
+ const Type& type, const ::arrow::Array& array,
int64_t row_index,
+ EncodeContext& ctx) {
+ ICEBERG_PRECHECK(row_index >= 0 && row_index < array.length(),
+ "Row index {} out of bounds {}", row_index, array.length());
+
+ const bool is_null = array.IsNull(row_index);
+
+ if (avro_node->type() == ::avro::AVRO_UNION) {
+ ICEBERG_ASSIGN_OR_RAISE(auto branches, ValidateUnion(avro_node));
+
+ if (is_null) {
+ encoder.encodeUnionIndex(branches.null_index);
+ encoder.encodeNull();
+ return {};
+ }
+
+ encoder.encodeUnionIndex(branches.value_index);
+ return EncodeArrowToAvro(branches.value_node, encoder, type, array,
row_index, ctx);
+ }
+
+ if (is_null) {
+ return InvalidArgument("Null value in non-nullable field");
+ }
+
+ switch (avro_node->type()) {
+ case ::avro::AVRO_NULL:
+ encoder.encodeNull();
+ return {};
+
+ case ::avro::AVRO_BOOL: {
+ const auto& bool_array =
+ internal::checked_cast<const ::arrow::BooleanArray&>(array);
+ encoder.encodeBool(bool_array.Value(row_index));
+ return {};
+ }
+
+ case ::avro::AVRO_INT: {
+ // AVRO_INT can represent: int32, date (days since epoch)
+ switch (array.type()->id()) {
+ case ::arrow::Type::INT32: {
+ const auto& int32_array =
+ internal::checked_cast<const ::arrow::Int32Array&>(array);
+ encoder.encodeInt(int32_array.Value(row_index));
+ return {};
+ }
+ case ::arrow::Type::DATE32: {
+ const auto& date_array =
+ internal::checked_cast<const ::arrow::Date32Array&>(array);
+ encoder.encodeInt(date_array.Value(row_index));
+ return {};
+ }
+ default:
+ return InvalidArgument("AVRO_INT expects Int32Array or Date32Array,
got {}",
+ array.type()->ToString());
+ }
+ }
+
+ case ::avro::AVRO_LONG: {
+ // AVRO_LONG can represent: int64, time (microseconds), timestamp
(microseconds)
+ switch (array.type()->id()) {
+ case ::arrow::Type::INT64: {
+ const auto& int64_array =
+ internal::checked_cast<const ::arrow::Int64Array&>(array);
+ encoder.encodeLong(int64_array.Value(row_index));
+ return {};
+ }
+ case ::arrow::Type::TIME64: {
+ const auto& time_array =
+ internal::checked_cast<const ::arrow::Time64Array&>(array);
+ encoder.encodeLong(time_array.Value(row_index));
+ return {};
+ }
+ case ::arrow::Type::TIMESTAMP: {
+ const auto& timestamp_array =
+ internal::checked_cast<const ::arrow::TimestampArray&>(array);
+ encoder.encodeLong(timestamp_array.Value(row_index));
+ return {};
+ }
+ default:
+ return InvalidArgument(
+ "AVRO_LONG expects Int64Array, Time64Array, or TimestampArray,
got {}",
+ array.type()->ToString());
+ }
+ }
+
+ case ::avro::AVRO_FLOAT: {
+ const auto& float_array = internal::checked_cast<const
::arrow::FloatArray&>(array);
+ encoder.encodeFloat(float_array.Value(row_index));
+ return {};
+ }
+
+ case ::avro::AVRO_DOUBLE: {
+ const auto& double_array =
+ internal::checked_cast<const ::arrow::DoubleArray&>(array);
+ encoder.encodeDouble(double_array.Value(row_index));
+ return {};
+ }
+
+ case ::avro::AVRO_STRING: {
+ const auto& string_array =
+ internal::checked_cast<const ::arrow::StringArray&>(array);
+ std::string_view value = string_array.GetView(row_index);
+ encoder.encodeString(std::string(value));
+ return {};
+ }
+
+ case ::avro::AVRO_BYTES: {
+ const auto& binary_array =
+ internal::checked_cast<const ::arrow::BinaryArray&>(array);
+ std::string_view value = binary_array.GetView(row_index);
+ ctx.bytes_scratch.assign(value.begin(), value.end());
+ encoder.encodeBytes(ctx.bytes_scratch);
+ return {};
+ }
+
+ case ::avro::AVRO_FIXED: {
+ // Handle UUID
+ if (avro_node->logicalType().type() == ::avro::LogicalType::UUID) {
+ const auto& extension_array =
+ internal::checked_cast<const ::arrow::ExtensionArray&>(array);
+ const auto& fixed_array =
+ internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(
+ *extension_array.storage());
+ std::string_view value = fixed_array.GetView(row_index);
+ ctx.bytes_scratch.assign(value.begin(), value.end());
+ encoder.encodeFixed(ctx.bytes_scratch.data(),
ctx.bytes_scratch.size());
+ return {};
+ }
+
+ // Handle DECIMAL
+ if (avro_node->logicalType().type() == ::avro::LogicalType::DECIMAL) {
+ const auto& decimal_array =
+ internal::checked_cast<const ::arrow::Decimal128Array&>(array);
+ std::string_view decimal_value = decimal_array.GetView(row_index);
+ ctx.bytes_scratch.assign(decimal_value.begin(), decimal_value.end());
+ // Arrow Decimal128 bytes are in little-endian order, Avro requires
big-endian
+ std::ranges::reverse(ctx.bytes_scratch);
+ encoder.encodeFixed(ctx.bytes_scratch.data(),
ctx.bytes_scratch.size());
+ return {};
+ }
+
+ // Handle regular FIXED
+ const auto& fixed_array =
+ internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(array);
+ std::string_view value = fixed_array.GetView(row_index);
+ ctx.bytes_scratch.assign(value.begin(), value.end());
+ encoder.encodeFixed(ctx.bytes_scratch.data(), ctx.bytes_scratch.size());
+ return {};
+ }
+
+ case ::avro::AVRO_RECORD: {
+ ICEBERG_PRECHECK(array.type()->id() == ::arrow::Type::STRUCT,
+ "AVRO_RECORD expects StructArray, got {}",
+ array.type()->ToString());
+ ICEBERG_PRECHECK(type.type_id() == TypeId::kStruct,
+ "AVRO_RECORD expects struct type, got type {}",
type.ToString());
+
+ const auto& struct_array =
+ internal::checked_cast<const ::arrow::StructArray&>(array);
+ const auto& struct_type = internal::checked_cast<const
StructType&>(type);
+ const size_t num_fields = avro_node->leaves();
+
+ ICEBERG_PRECHECK(
+ struct_array.num_fields() == static_cast<int>(num_fields),
+ "Field count mismatch: Arrow struct has {} fields, Avro node has {}
fields",
+ struct_array.num_fields(), num_fields);
+ ICEBERG_PRECHECK(
+ struct_type.fields().size() == num_fields,
+ "Field count mismatch: Iceberg struct has {} fields, Avro node has
{} fields",
+ struct_type.fields().size(), num_fields);
+
+ for (size_t i = 0; i < num_fields; ++i) {
+ const auto& field_node = avro_node->leafAt(i);
+ const auto& field_array = struct_array.field(static_cast<int>(i));
+ const auto& field_schema = struct_type.fields()[i];
+
+ ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(
+ field_node, encoder, *field_schema.type(), *field_array,
row_index, ctx));
+ }
+ return {};
+ }
+
+ case ::avro::AVRO_ARRAY: {
+ const auto& element_node = avro_node->leafAt(0);
+
+ // Handle ListArray
+ if (array.type()->id() == ::arrow::Type::LIST) {
+ const auto& list_array = internal::checked_cast<const
::arrow::ListArray&>(array);
+ const auto& list_type = internal::checked_cast<const ListType&>(type);
+
+ const auto start = list_array.value_offset(row_index);
+ const auto end = list_array.value_offset(row_index + 1);
+ const auto length = end - start;
+
+ encoder.arrayStart();
+ if (length > 0) {
+ encoder.setItemCount(length);
+ const auto& values = list_array.values();
+ const auto& element_type = *list_type.fields()[0].type();
+
+ for (int64_t i = start; i < end; ++i) {
+ encoder.startItem();
+ ICEBERG_RETURN_UNEXPECTED(
+ EncodeArrowToAvro(element_node, encoder, element_type,
*values, i, ctx));
+ }
+ }
+ encoder.arrayEnd();
+ return {};
+ }
+
+ // Handle MapArray (for Avro maps with non-string keys)
+ if (array.type()->id() == ::arrow::Type::MAP) {
+ ICEBERG_PRECHECK(
+ element_node->type() == ::avro::AVRO_RECORD &&
element_node->leaves() == 2,
+ "Expected AVRO_RECORD for map key-value pair, got {}",
+ ::avro::toString(element_node->type()));
+
+ const auto& map_array = internal::checked_cast<const
::arrow::MapArray&>(array);
+ const auto& map_type = internal::checked_cast<const MapType&>(type);
+
+ const auto start = map_array.value_offset(row_index);
+ const auto end = map_array.value_offset(row_index + 1);
+ const auto length = end - start;
+
+ encoder.arrayStart();
+ if (length > 0) {
+ encoder.setItemCount(length);
+ const auto& keys = map_array.keys();
+ const auto& values = map_array.items();
+ const auto& key_type = *map_type.key().type();
+ const auto& value_type = *map_type.value().type();
+
+ // The element_node should be a RECORD with "key" and "value" fields
+ for (int64_t i = start; i < end; ++i) {
+ const auto& key_node = element_node->leafAt(0);
+ const auto& value_node = element_node->leafAt(1);
+
+ encoder.startItem();
+ ICEBERG_RETURN_UNEXPECTED(
+ EncodeArrowToAvro(key_node, encoder, key_type, *keys, i, ctx));
+ ICEBERG_RETURN_UNEXPECTED(
+ EncodeArrowToAvro(value_node, encoder, value_type, *values, i,
ctx));
+ }
+ }
+ encoder.arrayEnd();
+ return {};
+ }
+
+ return InvalidArgument("AVRO_ARRAY must map to ListArray or MapArray,
got {}",
+ array.type()->ToString());
+ }
+
+ case ::avro::AVRO_MAP: {
+ ICEBERG_PRECHECK(array.type()->id() == ::arrow::Type::MAP,
+ "AVRO_MAP expects MapArray, got {}",
array.type()->ToString());
+ ICEBERG_PRECHECK(type.type_id() == TypeId::kMap,
+ "AVRO_MAP expects MapType, got type {}",
type.ToString());
+
+ const auto& map_array = internal::checked_cast<const
::arrow::MapArray&>(array);
+ const auto& map_type = internal::checked_cast<const MapType&>(type);
+
+ const auto start = map_array.value_offset(row_index);
+ const auto end = map_array.value_offset(row_index + 1);
+ const auto length = end - start;
+
+ encoder.mapStart();
+ if (length > 0) {
+ encoder.setItemCount(length);
+ const auto& keys = map_array.keys();
+ const auto& values = map_array.items();
+ const auto& value_type = *map_type.value().type();
+ const auto& value_node = avro_node->leafAt(1);
+
+ ICEBERG_PRECHECK(keys->type()->id() == ::arrow::Type::STRING ||
+ keys->type()->id() == ::arrow::Type::LARGE_STRING,
+ "AVRO_MAP keys must be StringArray, got {}",
+ keys->type()->ToString());
+
+ for (int64_t i = start; i < end; ++i) {
+ encoder.startItem();
+
+ if (keys->type()->id() == ::arrow::Type::STRING) {
+ const auto& string_array =
+ internal::checked_cast<const ::arrow::StringArray&>(*keys);
+ std::string_view key_value = string_array.GetView(i);
+ encoder.encodeString(std::string(key_value));
+ } else {
+ const auto& large_string_array =
+ internal::checked_cast<const
::arrow::LargeStringArray&>(*keys);
+ std::string_view key_value = large_string_array.GetView(i);
+ encoder.encodeString(std::string(key_value));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(
+ EncodeArrowToAvro(value_node, encoder, value_type, *values, i,
ctx));
+ }
+ }
+ encoder.mapEnd();
+ return {};
+ }
+
+ case ::avro::AVRO_ENUM:
+ return NotSupported("ENUM type encoding not yet implemented");
+
+ case ::avro::AVRO_UNION:
+ // Already handled above
+ return Invalid("Unexpected union handling");
+
+ default:
+ return NotSupported("Unsupported Avro type: {}",
+ ::avro::toString(avro_node->type()));
+ }
+}
+
+} // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_direct_encoder_internal.h
b/src/iceberg/avro/avro_direct_encoder_internal.h
new file mode 100644
index 00000000..314c1faa
--- /dev/null
+++ b/src/iceberg/avro/avro_direct_encoder_internal.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include <arrow/record_batch.h>
+#include <avro/Encoder.hh>
+#include <avro/Node.hh>
+
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg::avro {
+
+/// \brief Context for reusing scratch buffers during Avro encoding
+///
+/// Avoids frequent small allocations by reusing temporary buffers across
+/// multiple encode operations. This is particularly important for string,
+/// binary, and fixed-size data types.
+struct EncodeContext {
+ // Scratch buffer for binary/fixed/uuid/decimal data (reused across rows)
+ std::vector<uint8_t> bytes_scratch;
+};
+
+/// \brief Directly encode Arrow data to Avro without GenericDatum
+///
+/// Eliminates the GenericDatum intermediate layer by directly calling Avro
encoder
+/// methods from Arrow arrays.
+///
+/// \param avro_node The Avro schema node for the data being encoded
+/// \param encoder The Avro encoder to write data to
+/// \param type The Iceberg type for the data
+/// \param array The Arrow array containing the data to encode
+/// \param row_index The index of the row to encode within the array
+/// \param ctx Encode context for reusing scratch buffers
+/// \return Status::OK if successful, or an error status
+Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder&
encoder,
+ const Type& type, const ::arrow::Array& array,
int64_t row_index,
+ EncodeContext& ctx);
+
+} // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 106e3865..964f6d1d 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -173,7 +173,7 @@ class AvroReader::Impl {
ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
GetReaderSchema().root(), base_reader_->decoder(), projection_,
*read_schema_,
- context_->builder_.get(), &context_->decode_context_));
+ context_->builder_.get(), context_->decode_context_));
} else {
// GenericDatum-based decoding: decode via GenericDatum intermediate
if (!datum_reader_->read(*context_->datum_)) {
diff --git a/src/iceberg/avro/avro_scan.cc b/src/iceberg/avro/avro_scan.cc
deleted file mode 100644
index 3e690360..00000000
--- a/src/iceberg/avro/avro_scan.cc
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <chrono>
-#include <iostream>
-#include <memory>
-#include <string>
-
-#include <arrow/array.h>
-#include <arrow/c/bridge.h>
-#include <arrow/filesystem/localfs.h>
-#include <arrow/type.h>
-
-#include "iceberg/arrow/arrow_fs_file_io_internal.h"
-#include "iceberg/avro/avro_register.h"
-#include "iceberg/file_reader.h"
-#include "iceberg/schema.h"
-
-void PrintUsage(const char* program_name) {
- std::cerr << "Usage: " << program_name << " [options] <avro_file>\n"
- << "Options:\n"
- << " --skip-datum=<true|false> Use direct decoder (default:
true)\n"
- << " --batch-size=<N> Batch size for reading (default:
4096)\n"
- << " --help Show this help message\n"
- << "\nExample:\n"
- << " " << program_name
- << " --skip-datum=false --batch-size=1000 data.avro\n";
-}
-
-int main(int argc, char* argv[]) {
- iceberg::avro::RegisterAll();
-
- if (argc < 2) {
- PrintUsage(argv[0]);
- return 1;
- }
-
- std::string avro_file;
- bool skip_datum = true;
- int64_t batch_size = 4096;
-
- // Parse arguments
- for (int i = 1; i < argc; ++i) {
- std::string arg = argv[i];
- if (arg == "--help") {
- PrintUsage(argv[0]);
- return 0;
- } else if (arg.starts_with("--skip-datum=")) {
- std::string value = arg.substr(13);
- if (value == "true" || value == "1") {
- skip_datum = true;
- } else if (value == "false" || value == "0") {
- skip_datum = false;
- } else {
- std::cerr << "Invalid value for --skip-datum: " << value << "\n";
- return 1;
- }
- } else if (arg.starts_with("--batch-size=")) {
- batch_size = std::stoll(arg.substr(13));
- if (batch_size <= 0) {
- std::cerr << "Batch size must be positive\n";
- return 1;
- }
- } else if (arg[0] == '-') {
- std::cerr << "Unknown option: " << arg << "\n";
- PrintUsage(argv[0]);
- return 1;
- } else {
- avro_file = arg;
- }
- }
-
- if (avro_file.empty()) {
- std::cerr << "Error: No Avro file specified\n";
- PrintUsage(argv[0]);
- return 1;
- }
-
- std::cout << "Scanning Avro file: " << avro_file << "\n";
- std::cout << "Skip datum: " << (skip_datum ? "true" : "false") << "\n";
- std::cout << "Batch size: " << batch_size << "\n";
- std::cout << std::string(60, '-') << "\n";
-
- auto local_fs = std::make_shared<::arrow::fs::LocalFileSystem>();
- auto file_io =
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs);
-
- // Get file info
- auto file_info_result = local_fs->GetFileInfo(avro_file);
- if (!file_info_result.ok()) {
- std::cerr << "Error: Cannot access file: " <<
file_info_result.status().message()
- << "\n";
- return 1;
- }
- auto file_info = file_info_result.ValueOrDie();
- if (file_info.type() != ::arrow::fs::FileType::File) {
- std::cerr << "Error: Not a file: " << avro_file << "\n";
- return 1;
- }
-
- std::cout << "File size: " << file_info.size() << " bytes\n";
-
- // Configure reader properties
- auto reader_properties = iceberg::ReaderProperties::default_properties();
- reader_properties->Set(iceberg::ReaderProperties::kAvroSkipDatum,
skip_datum);
- reader_properties->Set(iceberg::ReaderProperties::kBatchSize, batch_size);
-
- // Open reader (without projection to read all columns)
- auto reader_result = iceberg::ReaderFactoryRegistry::Open(
- iceberg::FileFormatType::kAvro, {.path = avro_file,
- .length = file_info.size(),
- .io = file_io,
- .projection = nullptr,
- .properties =
std::move(reader_properties)});
-
- if (!reader_result.has_value()) {
- std::cerr << "Error opening reader: " << reader_result.error().message <<
"\n";
- return 1;
- }
-
- auto reader = std::move(reader_result.value());
-
- // Get schema
- auto schema_result = reader->Schema();
- if (!schema_result.has_value()) {
- std::cerr << "Error getting schema: " << schema_result.error().message <<
"\n";
- return 1;
- }
- auto arrow_schema = schema_result.value();
- auto arrow_schema_import = ::arrow::ImportType(&arrow_schema);
- if (!arrow_schema_import.ok()) {
- std::cerr << "Error importing schema: " <<
arrow_schema_import.status().message()
- << "\n";
- return 1;
- }
- std::cout << "Schema: " << arrow_schema_import.ValueOrDie()->ToString() <<
"\n";
- std::cout << std::string(60, '-') << "\n";
-
- // Scan file and measure time
- auto start = std::chrono::high_resolution_clock::now();
-
- int64_t total_rows = 0;
- int64_t batch_count = 0;
-
- while (true) {
- auto batch_result = reader->Next();
- if (!batch_result.has_value()) {
- std::cerr << "Error reading batch: " << batch_result.error().message <<
"\n";
- return 1;
- }
-
- auto batch_opt = batch_result.value();
- if (!batch_opt.has_value()) {
- // End of file
- break;
- }
-
- auto arrow_array = batch_opt.value();
- auto arrow_type = arrow_schema_import.ValueOrDie();
- auto array_import = ::arrow::ImportArray(&arrow_array, arrow_type);
- if (!array_import.ok()) {
- std::cerr << "Error importing array: " <<
array_import.status().message() << "\n";
- return 1;
- }
-
- int64_t batch_rows = array_import.ValueOrDie()->length();
- total_rows += batch_rows;
- batch_count++;
- }
-
- auto end = std::chrono::high_resolution_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end -
start);
-
- // Print results
- std::cout << "\nResults:\n";
- std::cout << " Total rows: " << total_rows << "\n";
- std::cout << " Batches: " << batch_count << "\n";
- std::cout << " Time: " << duration.count() << " ms\n";
- std::cout << " Throughput: "
- << (duration.count() > 0 ? (total_rows * 1000 / duration.count())
: 0)
- << " rows/sec\n";
- std::cout << " Throughput: "
- << (duration.count() > 0
- ? (file_info.size() / 1024.0 / 1024.0) / (duration.count()
/ 1000.0)
- : 0)
- << " MB/sec\n";
-
- return 0;
-}
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 9d65db15..0c640231 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -32,6 +32,7 @@
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
+#include "iceberg/avro/avro_direct_encoder_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
@@ -63,6 +64,7 @@ class AvroWriter::Impl {
Status Open(const WriterOptions& options) {
write_schema_ = options.schema;
+ use_direct_encoder_ =
options.properties->Get(WriterProperties::kAvroSkipDatum);
::avro::NodePtr root;
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_,
&root));
@@ -87,11 +89,23 @@ class AvroWriter::Impl {
vec.assign(value.begin(), value.end());
metadata.emplace(key, std::move(vec));
}
- writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
- std::move(output_stream), *avro_schema_,
- options.properties->Get(WriterProperties::kAvroSyncInterval),
- ::avro::NULL_CODEC /*codec*/, metadata);
- datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
+
+ if (use_direct_encoder_) {
+ // Skip avro::GenericDatum by using encoder provided by
DataFileWriterBase.
+ writer_base_ = std::make_unique<::avro::DataFileWriterBase>(
+ std::move(output_stream), *avro_schema_,
+ options.properties->Get(WriterProperties::kAvroSyncInterval),
+ ::avro::NULL_CODEC /*codec*/, metadata);
+ avro_root_node_ = avro_schema_->root();
+ } else {
+ // Everything via avro::GenericDatum.
+ writer_datum_ =
std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
+ std::move(output_stream), *avro_schema_,
+ options.properties->Get(WriterProperties::kAvroSyncInterval),
+ ::avro::NULL_CODEC /*codec*/, metadata);
+ datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
+ }
+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
}
@@ -100,25 +114,45 @@ class AvroWriter::Impl {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
::arrow::ImportArray(data, &arrow_schema_));
- for (int64_t i = 0; i < result->length(); i++) {
- ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i,
datum_.get()));
- writer_->write(*datum_);
+ if (use_direct_encoder_) {
+ for (int64_t i = 0; i < result->length(); i++) {
+ ICEBERG_RETURN_UNEXPECTED(
+ EncodeArrowToAvro(avro_root_node_, writer_base_->encoder(),
*write_schema_,
+ *result, i, encode_ctx_));
+ writer_base_->incr();
+ }
+ } else {
+ for (int64_t i = 0; i < result->length(); i++) {
+ ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i,
datum_.get()));
+ writer_datum_->write(*datum_);
+ }
}
return {};
}
Status Close() {
- if (writer_ != nullptr) {
- writer_->close();
- writer_.reset();
- ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_,
arrow_output_stream_->Tell());
- ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
+ if (use_direct_encoder_) {
+ if (writer_base_ != nullptr) {
+ writer_base_->close();
+ writer_base_.reset();
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_,
arrow_output_stream_->Tell());
+ ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
+ }
+ } else {
+ if (writer_datum_ != nullptr) {
+ writer_datum_->close();
+ writer_datum_.reset();
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_,
arrow_output_stream_->Tell());
+ ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
+ }
}
return {};
}
- bool Closed() const { return writer_ == nullptr; }
+ bool Closed() const {
+ return use_direct_encoder_ ? writer_base_ == nullptr : writer_datum_ ==
nullptr;
+ }
Result<int64_t> length() {
if (Closed()) {
@@ -136,14 +170,25 @@ class AvroWriter::Impl {
std::shared_ptr<::avro::ValidSchema> avro_schema_;
// Arrow output stream of the Avro file to write
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
- // The avro writer to write the data into a datum.
- std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
- // Reusable Avro datum for writing individual records.
- std::unique_ptr<::avro::GenericDatum> datum_;
// Arrow schema to write data.
ArrowSchema arrow_schema_;
// Total length of the written Avro file.
int64_t total_bytes_ = 0;
+
+ // Flag to determine which encoder to use
+ bool use_direct_encoder_ = true;
+
+ // [Encoder path] Root node of the Avro schema
+ ::avro::NodePtr avro_root_node_;
+ // [Encoder path] The avro writer using direct encoder
+ std::unique_ptr<::avro::DataFileWriterBase> writer_base_;
+ // [Encoder path] Encode context for reusing scratch buffers
+ EncodeContext encode_ctx_;
+
+ // [GenericDatum path] The avro writer to write the data into a datum
+ std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_datum_;
+ // [GenericDatum path] Reusable Avro datum for writing individual records
+ std::unique_ptr<::avro::GenericDatum> datum_;
};
AvroWriter::~AvroWriter() = default;
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index 72ab0da1..959ca72c 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -49,6 +49,10 @@ class WriterProperties : public ConfigBase<WriterProperties>
{
/// \brief The sync interval used by Avro writer.
inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval",
16 * 1024};
+ /// \brief Whether to skip GenericDatum and use direct encoder for Avro
writing.
+ /// When true, uses direct encoder (faster). When false, uses GenericDatum.
+ inline static Entry<bool> kAvroSkipDatum{"write.avro.skip-datum", true};
+
/// TODO(gangwu): add more properties, like compression codec, compression
level, etc.
/// \brief Create a default WriterProperties instance.
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 215462b5..b0ed1080 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -503,6 +503,440 @@ INSTANTIATE_TEST_SUITE_P(DirectDecoderModes,
AvroReaderParameterizedTest,
return info.param ? "DirectDecoder" :
"GenericDatum";
});
+class AvroWriterTest : public TempFileTestBase {
+ protected:
+ static void SetUpTestSuite() { RegisterAll(); }
+
+ void SetUp() override {
+ TempFileTestBase::SetUp();
+ local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
+ file_io_ =
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
+ temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
+ }
+
+ void WriteAvroFile(std::shared_ptr<Schema> schema, const std::string&
json_data) {
+ ArrowSchema arrow_c_schema;
+ ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+
+ auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
+ ASSERT_TRUE(arrow_schema_result.ok());
+ auto arrow_schema = arrow_schema_result.ValueOrDie();
+
+ auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema,
json_data);
+ ASSERT_TRUE(array_result.ok());
+ auto array = array_result.ValueOrDie();
+
+ struct ArrowArray arrow_array;
+ auto export_result = ::arrow::ExportArray(*array, &arrow_array);
+ ASSERT_TRUE(export_result.ok());
+
+ std::unordered_map<std::string, std::string> metadata = {
+ {"writer_test", "direct_encoder"}};
+
+ auto writer_properties = WriterProperties::default_properties();
+ writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_);
+
+ auto writer_result = WriterFactoryRegistry::Open(
+ FileFormatType::kAvro, {.path = temp_avro_file_,
+ .schema = schema,
+ .io = file_io_,
+ .metadata = metadata,
+ .properties = std::move(writer_properties)});
+ ASSERT_TRUE(writer_result.has_value());
+ auto writer = std::move(writer_result.value());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ }
+
+ template <typename VerifyFunc>
+ void VerifyAvroFileContent(VerifyFunc verify_func) {
+ ::avro::DataFileReader<::avro::GenericDatum>
reader(temp_avro_file_.c_str());
+ ::avro::GenericDatum datum(reader.dataSchema());
+
+ size_t row_count = 0;
+ while (reader.read(datum)) {
+ verify_func(datum, row_count);
+ row_count++;
+ }
+ reader.close();
+ }
+
+ std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
+ std::shared_ptr<FileIO> file_io_;
+ std::string temp_avro_file_;
+ bool skip_datum_{true};
+};
+
+// Parameterized test fixture for testing both direct encoder and GenericDatum
modes
+class AvroWriterParameterizedTest : public AvroWriterTest,
+ public ::testing::WithParamInterface<bool>
{
+ protected:
+ void SetUp() override {
+ AvroWriterTest::SetUp();
+ skip_datum_ = GetParam();
+ }
+};
+
+TEST_P(AvroWriterParameterizedTest, WritePrimitiveTypes) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "bool_col",
std::make_shared<BooleanType>()),
+ SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(3, "long_col", std::make_shared<LongType>()),
+ SchemaField::MakeRequired(4, "float_col", std::make_shared<FloatType>()),
+ SchemaField::MakeRequired(5, "double_col",
std::make_shared<DoubleType>()),
+ SchemaField::MakeRequired(6, "string_col",
std::make_shared<StringType>())});
+
+ std::string test_data = R"([
+ [true, 42, 1234567890, 3.14, 2.71828, "hello"],
+ [false, -100, -9876543210, -1.5, 0.0, "world"]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 6);
+
+ if (row_idx == 0) {
+ EXPECT_TRUE(record.fieldAt(0).value<bool>());
+ EXPECT_EQ(record.fieldAt(1).value<int32_t>(), 42);
+ EXPECT_EQ(record.fieldAt(2).value<int64_t>(), 1234567890);
+ EXPECT_FLOAT_EQ(record.fieldAt(3).value<float>(), 3.14f);
+ EXPECT_DOUBLE_EQ(record.fieldAt(4).value<double>(), 2.71828);
+ EXPECT_EQ(record.fieldAt(5).value<std::string>(), "hello");
+ } else if (row_idx == 1) {
+ EXPECT_FALSE(record.fieldAt(0).value<bool>());
+ EXPECT_EQ(record.fieldAt(1).value<int32_t>(), -100);
+ EXPECT_EQ(record.fieldAt(2).value<int64_t>(), -9876543210);
+ EXPECT_FLOAT_EQ(record.fieldAt(3).value<float>(), -1.5f);
+ EXPECT_DOUBLE_EQ(record.fieldAt(4).value<double>(), 0.0);
+ EXPECT_EQ(record.fieldAt(5).value<std::string>(), "world");
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteTemporalTypes) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()),
+ SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()),
+ SchemaField::MakeRequired(3, "timestamp_col",
std::make_shared<TimestampType>())});
+
+ std::string test_data = R"([
+ [18628, 43200000000, 1640995200000000],
+ [18629, 86399000000, 1641081599000000]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 3);
+
+ if (row_idx == 0) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 18628);
+ EXPECT_EQ(record.fieldAt(1).value<int64_t>(), 43200000000);
+ EXPECT_EQ(record.fieldAt(2).value<int64_t>(), 1640995200000000);
+ } else if (row_idx == 1) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 18629);
+ EXPECT_EQ(record.fieldAt(1).value<int64_t>(), 86399000000);
+ EXPECT_EQ(record.fieldAt(2).value<int64_t>(), 1641081599000000);
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteNestedStruct) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(
+ 2, "person",
+ std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(4, "age",
std::make_shared<IntType>())}))});
+
+ std::string test_data = R"([
+ [1, ["Alice", 30]],
+ [2, ["Bob", 25]]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 2);
+
+ if (row_idx == 0) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 1);
+ const auto& person = record.fieldAt(1).value<::avro::GenericRecord>();
+ EXPECT_EQ(person.fieldAt(0).value<std::string>(), "Alice");
+ EXPECT_EQ(person.fieldAt(1).value<int32_t>(), 30);
+ } else if (row_idx == 1) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 2);
+ const auto& person = record.fieldAt(1).value<::avro::GenericRecord>();
+ EXPECT_EQ(person.fieldAt(0).value<std::string>(), "Bob");
+ EXPECT_EQ(person.fieldAt(1).value<int32_t>(), 25);
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteListType) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(2, "tags",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 3, "element",
std::make_shared<StringType>())))});
+
+ std::string test_data = R"([
+ [1, ["tag1", "tag2", "tag3"]],
+ [2, ["foo", "bar"]],
+ [3, []]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 2);
+
+ if (row_idx == 0) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 1);
+ const auto& tags = record.fieldAt(1).value<::avro::GenericArray>();
+ ASSERT_EQ(tags.value().size(), 3);
+ EXPECT_EQ(tags.value()[0].value<std::string>(), "tag1");
+ EXPECT_EQ(tags.value()[1].value<std::string>(), "tag2");
+ EXPECT_EQ(tags.value()[2].value<std::string>(), "tag3");
+ } else if (row_idx == 1) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 2);
+ const auto& tags = record.fieldAt(1).value<::avro::GenericArray>();
+ ASSERT_EQ(tags.value().size(), 2);
+ EXPECT_EQ(tags.value()[0].value<std::string>(), "foo");
+ EXPECT_EQ(tags.value()[1].value<std::string>(), "bar");
+ } else if (row_idx == 2) {
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 3);
+ const auto& tags = record.fieldAt(1).value<::avro::GenericArray>();
+ EXPECT_EQ(tags.value().size(), 0);
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteMapTypeWithStringKey) {
+ auto schema = std::make_shared<iceberg::Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(
+ 1, "properties",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(2, "key",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(3, "value",
std::make_shared<IntType>())))});
+
+ std::string test_data = R"([
+ [[["key1", 100], ["key2", 200]]],
+ [[["a", 1], ["b", 2], ["c", 3]]]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 1);
+
+ const auto& map = record.fieldAt(0).value<::avro::GenericMap>();
+ const auto& map_value = map.value();
+ if (row_idx == 0) {
+ ASSERT_EQ(map_value.size(), 2);
+ // Find entries by key
+ bool found_key1 = false;
+ bool found_key2 = false;
+ for (const auto& entry : map_value) {
+ if (entry.first == "key1") {
+ EXPECT_EQ(entry.second.value<int32_t>(), 100);
+ found_key1 = true;
+ } else if (entry.first == "key2") {
+ EXPECT_EQ(entry.second.value<int32_t>(), 200);
+ found_key2 = true;
+ }
+ }
+ EXPECT_TRUE(found_key1 && found_key2);
+ } else if (row_idx == 1) {
+ ASSERT_EQ(map_value.size(), 3);
+ // Find entries by key
+ bool found_a = false;
+ bool found_b = false;
+ bool found_c = false;
+ for (const auto& entry : map_value) {
+ if (entry.first == "a") {
+ EXPECT_EQ(entry.second.value<int32_t>(), 1);
+ found_a = true;
+ } else if (entry.first == "b") {
+ EXPECT_EQ(entry.second.value<int32_t>(), 2);
+ found_b = true;
+ } else if (entry.first == "c") {
+ EXPECT_EQ(entry.second.value<int32_t>(), 3);
+ found_c = true;
+ }
+ }
+ EXPECT_TRUE(found_a && found_b && found_c);
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteMapTypeWithNonStringKey) {
+ auto schema = std::make_shared<iceberg::Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(
+ 1, "int_map",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(2, "key", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(3, "value",
std::make_shared<StringType>())))});
+
+ std::string test_data = R"([
+ [[[1, "one"], [2, "two"], [3, "three"]]],
+ [[[10, "ten"], [20, "twenty"]]]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 1);
+
+ // Maps with non-string keys are encoded as arrays of key-value records in
Avro
+ const auto& array = record.fieldAt(0).value<::avro::GenericArray>();
+ if (row_idx == 0) {
+ ASSERT_EQ(array.value().size(), 3);
+
+ const auto& entry0 = array.value()[0].value<::avro::GenericRecord>();
+ EXPECT_EQ(entry0.fieldAt(0).value<int32_t>(), 1);
+ EXPECT_EQ(entry0.fieldAt(1).value<std::string>(), "one");
+
+ const auto& entry1 = array.value()[1].value<::avro::GenericRecord>();
+ EXPECT_EQ(entry1.fieldAt(0).value<int32_t>(), 2);
+ EXPECT_EQ(entry1.fieldAt(1).value<std::string>(), "two");
+
+ const auto& entry2 = array.value()[2].value<::avro::GenericRecord>();
+ EXPECT_EQ(entry2.fieldAt(0).value<int32_t>(), 3);
+ EXPECT_EQ(entry2.fieldAt(1).value<std::string>(), "three");
+ } else if (row_idx == 1) {
+ ASSERT_EQ(array.value().size(), 2);
+
+ const auto& entry0 = array.value()[0].value<::avro::GenericRecord>();
+ EXPECT_EQ(entry0.fieldAt(0).value<int32_t>(), 10);
+ EXPECT_EQ(entry0.fieldAt(1).value<std::string>(), "ten");
+
+ const auto& entry1 = array.value()[1].value<::avro::GenericRecord>();
+ EXPECT_EQ(entry1.fieldAt(0).value<int32_t>(), 20);
+ EXPECT_EQ(entry1.fieldAt(1).value<std::string>(), "twenty");
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteEmptyMaps) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(
+ 1, "string_map",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(2, "key",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(3, "value",
std::make_shared<IntType>()))),
+ SchemaField::MakeRequired(
+ 4, "int_map",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(5, "key", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(6, "value",
std::make_shared<StringType>())))});
+
+ // Test empty maps for both string and non-string keys
+ std::string test_data = R"([
+ [[], []],
+ [[["a", 1]], []]
+ ])";
+
+ // Just verify writing succeeds (empty maps are handled correctly by the
encoder)
+ ASSERT_NO_FATAL_FAILURE(WriteAvroFile(schema, test_data));
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteOptionalFields) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()),
+ SchemaField::MakeOptional(3, "age", std::make_shared<IntType>())});
+
+ std::string test_data = R"([
+ [1, "Alice", 30],
+ [2, null, 25],
+ [3, "Charlie", null],
+ [4, null, null]
+ ])";
+
+ WriteAvroFile(schema, test_data);
+
+ VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 3);
+
+ EXPECT_EQ(record.fieldAt(0).value<int32_t>(), static_cast<int32_t>(row_idx
+ 1));
+
+ if (row_idx == 0) {
+ EXPECT_EQ(record.fieldAt(1).unionBranch(), 1); // non-null
+ EXPECT_EQ(record.fieldAt(1).value<std::string>(), "Alice");
+ EXPECT_EQ(record.fieldAt(2).unionBranch(), 1); // non-null
+ EXPECT_EQ(record.fieldAt(2).value<int32_t>(), 30);
+ } else if (row_idx == 1) {
+ EXPECT_EQ(record.fieldAt(1).unionBranch(), 0); // null
+ EXPECT_EQ(record.fieldAt(2).unionBranch(), 1); // non-null
+ EXPECT_EQ(record.fieldAt(2).value<int32_t>(), 25);
+ } else if (row_idx == 2) {
+ EXPECT_EQ(record.fieldAt(1).unionBranch(), 1); // non-null
+ EXPECT_EQ(record.fieldAt(1).value<std::string>(), "Charlie");
+ EXPECT_EQ(record.fieldAt(2).unionBranch(), 0); // null
+ } else if (row_idx == 3) {
+ EXPECT_EQ(record.fieldAt(1).unionBranch(), 0); // null
+ EXPECT_EQ(record.fieldAt(2).unionBranch(), 0); // null
+ }
+ });
+}
+
+TEST_P(AvroWriterParameterizedTest, WriteLargeDataset) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<LongType>()),
+ SchemaField::MakeRequired(2, "value", std::make_shared<DoubleType>())});
+
+ // Generate large dataset JSON
+ std::ostringstream json;
+ json << "[";
+ for (int i = 0; i < 1000; ++i) {
+ if (i > 0) json << ", ";
+ json << "[" << i << ", " << (i * 1.5) << "]";
+ }
+ json << "]";
+
+ WriteAvroFile(schema, json.str());
+
+ size_t expected_row_count = 1000;
+ size_t actual_row_count = 0;
+
+ VerifyAvroFileContent([&](const ::avro::GenericDatum& datum, size_t row_idx)
{
+ ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
+ const auto& record = datum.value<::avro::GenericRecord>();
+ ASSERT_EQ(record.fieldCount(), 2);
+
+ EXPECT_EQ(record.fieldAt(0).value<int64_t>(),
static_cast<int64_t>(row_idx));
+ EXPECT_DOUBLE_EQ(record.fieldAt(1).value<double>(), row_idx * 1.5);
+
+ actual_row_count++;
+ });
+
+ EXPECT_EQ(actual_row_count, expected_row_count);
+}
+
+// Instantiate parameterized tests for both direct encoder and GenericDatum
paths
+INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterParameterizedTest,
+ ::testing::Values(true, false),
+ [](const ::testing::TestParamInfo<bool>& info) {
+ return info.param ? "DirectEncoder" :
"GenericDatum";
+ });
+
TEST_F(AvroReaderTest, BufferSizeConfiguration) {
// Test default buffer size
auto properties1 = ReaderProperties::default_properties();