wgtmac commented on code in PR #445: URL: https://github.com/apache/iceberg-cpp/pull/445#discussion_r2650516215
########## src/iceberg/avro/avro_direct_encoder_internal.h: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <memory> + +#include <arrow/record_batch.h> +#include <avro/Encoder.hh> +#include <avro/Node.hh> + +#include "iceberg/result.h" +#include "iceberg/schema.h" + +namespace iceberg::avro { + +/// \brief Context for reusing scratch buffers during Avro encoding +/// +/// Avoids frequent small allocations by reusing temporary buffers across +/// multiple encode operations. This is particularly important for string, +/// binary, and fixed-size data types. +struct EncodeContext { + // Scratch buffer for binary/fixed/uuid/decimal data (reused across rows) + std::vector<uint8_t> bytes_scratch; +}; + +/// \brief Directly encode Arrow data to Avro without GenericDatum +/// +/// Eliminates the GenericDatum intermediate layer by directly calling Avro encoder +/// methods from Arrow arrays. Matches Java Iceberg's approach for better performance. +/// +/// Features: +/// - All primitive, temporal, and logical types +/// - Nested types (struct, list, map) +/// - Union types for optional fields +/// +/// Error Handling: +/// - Type mismatches → InvalidArgument +/// - Null in non-nullable field → InvalidArgument +/// - Invalid decimal precision → InvalidArgument +/// Review Comment: ```suggestion /// methods from Arrow arrays. /// ``` ########## src/iceberg/avro/avro_direct_encoder.cc: ########## @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <algorithm> +#include <cstring> + +#include <arrow/array.h> +#include <arrow/extension_type.h> +#include <arrow/type.h> +#include <avro/Specific.hh> + +#include "iceberg/avro/avro_constants.h" +#include "iceberg/avro/avro_direct_encoder_internal.h" +#include "iceberg/util/checked_cast.h" + +namespace iceberg::avro { + +namespace { + +// Helper to validate union structure and get branch indices +// Returns {null_index, value_index, value_node} +struct UnionBranches { + size_t null_index; + size_t value_index; + ::avro::NodePtr value_node; +}; + +Result<UnionBranches> ValidateUnion(const ::avro::NodePtr& union_node) { + if (union_node->leaves() != 2) { + return InvalidArgument("Union must have exactly 2 branches, got {}", + union_node->leaves()); + } + + const auto& branch_0 = union_node->leafAt(0); + const auto& branch_1 = union_node->leafAt(1); + + if (branch_0->type() == ::avro::AVRO_NULL && branch_1->type() != ::avro::AVRO_NULL) { + return UnionBranches{.null_index = 0, .value_index = 1, .value_node = branch_1}; + } else if (branch_1->type() == ::avro::AVRO_NULL && + branch_0->type() != ::avro::AVRO_NULL) { + return UnionBranches{.null_index = 1, .value_index = 0, .value_node = branch_0}; + } else { + return InvalidArgument("Union must have exactly one null branch"); + } +} + +} // namespace + +Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& encoder, + const Type& type, const ::arrow::Array& array, int64_t row_index, + EncodeContext* ctx) { + if (!ctx) { Review Comment: I just noticed that we haven't checked null for `DecodeContext* ctx` in the decoder implementation. Can we use `DecodeContext&` and `EncodeContext& ctx` so we don't need to bother with null check? ########## src/iceberg/file_writer.h: ########## @@ -49,6 +49,10 @@ class WriterProperties : public ConfigBase<WriterProperties> { /// \brief The sync interval used by Avro writer. inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024}; + /// \brief Whether to skip GenericDatum and use direct encoder for Avro writing. + /// When true, uses direct encoder (faster). When false, uses GenericDatum (legacy). Review Comment: ```suggestion /// When true, uses direct encoder (faster). When false, uses GenericDatum. ``` ########## src/iceberg/avro/avro_direct_encoder.cc: ########## @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <algorithm> +#include <cstring> + +#include <arrow/array.h> +#include <arrow/extension_type.h> +#include <arrow/type.h> +#include <avro/Specific.hh> + +#include "iceberg/avro/avro_constants.h" +#include "iceberg/avro/avro_direct_encoder_internal.h" +#include "iceberg/util/checked_cast.h" + +namespace iceberg::avro { + +namespace { + +// Helper to validate union structure and get branch indices +// Returns {null_index, value_index, value_node} +struct UnionBranches { + size_t null_index; + size_t value_index; + ::avro::NodePtr value_node; +}; + +Result<UnionBranches> ValidateUnion(const ::avro::NodePtr& union_node) { + if (union_node->leaves() != 2) { + return InvalidArgument("Union must have exactly 2 branches, got {}", + union_node->leaves()); + } + + const auto& branch_0 = union_node->leafAt(0); + const auto& branch_1 = union_node->leafAt(1); + + if (branch_0->type() == ::avro::AVRO_NULL && branch_1->type() != ::avro::AVRO_NULL) { + return UnionBranches{.null_index = 0, .value_index = 1, .value_node = branch_1}; + } else if (branch_1->type() == ::avro::AVRO_NULL && + branch_0->type() != ::avro::AVRO_NULL) { + return UnionBranches{.null_index = 1, .value_index = 0, .value_node = branch_0}; + } else { + return InvalidArgument("Union must have exactly one null branch"); + } +} + +} // namespace + +Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& encoder, + const Type& type, const ::arrow::Array& array, int64_t row_index, + EncodeContext* ctx) { + if (!ctx) { + return InvalidArgument("EncodeContext must not be null"); + } + if (row_index < 0 || row_index >= array.length()) { + return InvalidArgument("Row index {} out of bounds for array of length {}", row_index, + array.length()); + } + + const bool is_null = array.IsNull(row_index); + + // Handle unions (optional fields) + if (avro_node->type() == ::avro::AVRO_UNION) { + ICEBERG_ASSIGN_OR_RAISE(auto branches, ValidateUnion(avro_node)); + + if (is_null) { + encoder.encodeUnionIndex(branches.null_index); + encoder.encodeNull(); + return {}; + } else { + encoder.encodeUnionIndex(branches.value_index); + // Continue with the value branch + return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx); + } + } + + // Non-union null handling + if (is_null) { + return InvalidArgument("Null value in non-nullable field"); + } + + // Encode based on Avro type + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + encoder.encodeNull(); + return {}; + + case ::avro::AVRO_BOOL: { + const auto& bool_array = + internal::checked_cast<const ::arrow::BooleanArray&>(array); + encoder.encodeBool(bool_array.Value(row_index)); + return {}; + } + + case ::avro::AVRO_INT: { + // AVRO_INT can represent: int32, date (days since epoch) + switch (array.type()->id()) { + case ::arrow::Type::INT32: { + const auto& int32_array = + internal::checked_cast<const ::arrow::Int32Array&>(array); + encoder.encodeInt(int32_array.Value(row_index)); + return {}; + } + case ::arrow::Type::DATE32: { + const auto& date_array = + internal::checked_cast<const ::arrow::Date32Array&>(array); + encoder.encodeInt(date_array.Value(row_index)); + return {}; + } + default: + return InvalidArgument("AVRO_INT expects Int32Array or Date32Array, got {}", + array.type()->ToString()); + } + } + + case ::avro::AVRO_LONG: { + // AVRO_LONG can represent: int64, time (microseconds), timestamp (microseconds) + switch (array.type()->id()) { + case ::arrow::Type::INT64: { + const auto& int64_array = + internal::checked_cast<const ::arrow::Int64Array&>(array); + encoder.encodeLong(int64_array.Value(row_index)); + return {}; + } + case ::arrow::Type::TIME64: { + const auto& time_array = + internal::checked_cast<const ::arrow::Time64Array&>(array); + encoder.encodeLong(time_array.Value(row_index)); + return {}; + } + case ::arrow::Type::TIMESTAMP: { + const auto& timestamp_array = + internal::checked_cast<const ::arrow::TimestampArray&>(array); + encoder.encodeLong(timestamp_array.Value(row_index)); + return {}; + } + default: + return InvalidArgument( + "AVRO_LONG expects Int64Array, Time64Array, or TimestampArray, got {}", + array.type()->ToString()); + } + } + + case ::avro::AVRO_FLOAT: { + const auto& float_array = internal::checked_cast<const ::arrow::FloatArray&>(array); + encoder.encodeFloat(float_array.Value(row_index)); + return {}; + } + + case ::avro::AVRO_DOUBLE: { + const auto& double_array = + internal::checked_cast<const ::arrow::DoubleArray&>(array); + encoder.encodeDouble(double_array.Value(row_index)); + return {}; + } + + case ::avro::AVRO_STRING: { + const auto& string_array = + internal::checked_cast<const ::arrow::StringArray&>(array); + std::string_view value = string_array.GetView(row_index); + encoder.encodeString(std::string(value)); + return {}; + } + + case ::avro::AVRO_BYTES: { + const auto& binary_array = + internal::checked_cast<const ::arrow::BinaryArray&>(array); + std::string_view value = binary_array.GetView(row_index); + ctx->bytes_scratch.assign(value.begin(), value.end()); + encoder.encodeBytes(ctx->bytes_scratch); + return {}; + } + + case ::avro::AVRO_FIXED: { + // Handle UUID + if (avro_node->logicalType().type() == ::avro::LogicalType::UUID) { + const auto& extension_array = + internal::checked_cast<const ::arrow::ExtensionArray&>(array); + const auto& fixed_array = + internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>( + *extension_array.storage()); + std::string_view value = fixed_array.GetView(row_index); + ctx->bytes_scratch.assign(value.begin(), value.end()); + encoder.encodeFixed(ctx->bytes_scratch.data(), ctx->bytes_scratch.size()); + return {}; + } + + // Handle DECIMAL + if (avro_node->logicalType().type() == ::avro::LogicalType::DECIMAL) { + const auto& decimal_array = + internal::checked_cast<const ::arrow::Decimal128Array&>(array); + std::string_view decimal_value = decimal_array.GetView(row_index); + ctx->bytes_scratch.assign(decimal_value.begin(), decimal_value.end()); + // Arrow Decimal128 bytes are in little-endian order, Avro requires big-endian + std::ranges::reverse(ctx->bytes_scratch); + encoder.encodeFixed(ctx->bytes_scratch.data(), ctx->bytes_scratch.size()); + return {}; + } + + // Handle regular FIXED + const auto& fixed_array = + internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(array); + std::string_view value = fixed_array.GetView(row_index); + ctx->bytes_scratch.assign(value.begin(), value.end()); + encoder.encodeFixed(ctx->bytes_scratch.data(), ctx->bytes_scratch.size()); + return {}; + } + + case ::avro::AVRO_RECORD: { + if (array.type()->id() != ::arrow::Type::STRUCT) { + return InvalidArgument("AVRO_RECORD expects StructArray, got {}", + array.type()->ToString()); + } + if (!type.is_nested()) { + return InvalidArgument("AVRO_RECORD expects nested type, got type {}", + type.ToString()); + } + + const auto& struct_array = + internal::checked_cast<const ::arrow::StructArray&>(array); + + // AVRO_RECORD corresponds to Iceberg StructType (including Schema which extends + // StructType). Note: ListType and MapType are encoded as AVRO_ARRAY and AVRO_MAP + // respectively, not AVRO_RECORD. + if (type.type_id() != TypeId::kStruct) { + return InvalidArgument("AVRO_RECORD expects StructType, got type {}", + type.ToString()); + } + + // Safe cast: type_id() == kStruct guarantees this is StructType or Schema + // (Schema extends StructType) + const auto& struct_type = static_cast<const StructType&>(type); + const size_t num_fields = avro_node->leaves(); + + for (size_t i = 0; i < num_fields; ++i) { + const auto& field_node = avro_node->leafAt(i); Review Comment: Do we need to check if num of fields mismatch between arrow array and avro node? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
