This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c67e4c1 feat: eliminate GenericDatum in Avro reader for performance 
(#374)
1c67e4c1 is described below

commit 1c67e4c1fa910d6bbcd91b29bc5f721b912de46b
Author: Xinli Shang <[email protected]>
AuthorDate: Mon Dec 15 05:46:24 2025 -0800

    feat: eliminate GenericDatum in Avro reader for performance (#374)
    
    Replace GenericDatum intermediate layer with direct Avro decoder access
    to improve manifest I/O performance.
    
    Changes:
    - Add avro_direct_decoder_internal.h with DecodeAvroToBuilder API
    - Add avro_direct_decoder.cc implementing direct Avro→Arrow decoding
    - Primitive types: bool, int, long, float, double, string, binary, fixed
      - Temporal types: date, time, timestamp
      - Logical types: uuid, decimal
      - Nested types: struct, list, map
      - Union type handling for nullable fields
    - Modify avro_reader.cc to use DataFileReaderBase with direct decoder
      - Replace DataFileReader<GenericDatum> with DataFileReaderBase
      - Use decoder.decodeInt(), decodeLong(), etc. directly
      - Remove GenericDatum allocation and extraction overhead
    - Update CMakeLists.txt to include new decoder source
    
    Performance improvement:
    - Before: Avro binary → GenericDatum → Extract → Arrow (3 steps)
    - After: Avro binary → decoder.decodeInt() → Arrow (2 steps)
---
 src/iceberg/CMakeLists.txt                      |   1 +
 src/iceberg/avro/CMakeLists.txt                 |   6 +
 src/iceberg/avro/avro_direct_decoder.cc         | 593 ++++++++++++++++++++++++
 src/iceberg/avro/avro_direct_decoder_internal.h |  87 ++++
 src/iceberg/avro/avro_reader.cc                 | 136 ++++--
 src/iceberg/avro/avro_scan.cc                   | 204 ++++++++
 src/iceberg/file_reader.h                       |   5 +
 src/iceberg/test/avro_test.cc                   | 273 ++++++++++-
 src/iceberg/test/temp_file_test_base.h          |   9 +-
 9 files changed, 1275 insertions(+), 39 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 6e9eb0ba..a0d93967 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -148,6 +148,7 @@ if(ICEBERG_BUILD_BUNDLE)
   set(ICEBERG_BUNDLE_SOURCES
       arrow/arrow_fs_file_io.cc
       avro/avro_data_util.cc
+      avro/avro_direct_decoder.cc
       avro/avro_reader.cc
       avro/avro_writer.cc
       avro/avro_register.cc
diff --git a/src/iceberg/avro/CMakeLists.txt b/src/iceberg/avro/CMakeLists.txt
index f8213038..a7663ba6 100644
--- a/src/iceberg/avro/CMakeLists.txt
+++ b/src/iceberg/avro/CMakeLists.txt
@@ -16,3 +16,9 @@
 # under the License.
 
 iceberg_install_all_headers(iceberg/avro)
+
+# avro_scan benchmark executable
+add_executable(avro_scan avro_scan.cc)
+target_link_libraries(avro_scan PRIVATE iceberg_bundle_static)
+set_target_properties(avro_scan PROPERTIES RUNTIME_OUTPUT_DIRECTORY
+                                           
"${CMAKE_BINARY_DIR}/src/iceberg/avro")
diff --git a/src/iceberg/avro/avro_direct_decoder.cc 
b/src/iceberg/avro/avro_direct_decoder.cc
new file mode 100644
index 00000000..60f79d21
--- /dev/null
+++ b/src/iceberg/avro/avro_direct_decoder.cc
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+#include <avro/Decoder.hh>
+#include <avro/Node.hh>
+#include <avro/NodeImpl.hh>
+#include <avro/Types.hh>
+
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/avro/avro_direct_decoder_internal.h"
+#include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::avro {
+
+using ::iceberg::arrow::ToErrorKind;
+
+namespace {
+
+/// \brief Forward declaration for mutual recursion.
+Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder,
+                            const FieldProjection& projection,
+                            const SchemaField& projected_field,
+                            ::arrow::ArrayBuilder* array_builder, 
DecodeContext* ctx);
+
+/// \brief Skip an Avro value based on its schema without decoding
+Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder) {
+  switch (avro_node->type()) {
+    case ::avro::AVRO_NULL:
+      decoder.decodeNull();
+      return {};
+
+    case ::avro::AVRO_BOOL:
+      decoder.decodeBool();
+      return {};
+
+    case ::avro::AVRO_INT:
+      decoder.decodeInt();
+      return {};
+
+    case ::avro::AVRO_LONG:
+      decoder.decodeLong();
+      return {};
+
+    case ::avro::AVRO_FLOAT:
+      decoder.decodeFloat();
+      return {};
+
+    case ::avro::AVRO_DOUBLE:
+      decoder.decodeDouble();
+      return {};
+
+    case ::avro::AVRO_STRING:
+      decoder.skipString();
+      return {};
+
+    case ::avro::AVRO_BYTES:
+      decoder.skipBytes();
+      return {};
+
+    case ::avro::AVRO_FIXED:
+      decoder.skipFixed(avro_node->fixedSize());
+      return {};
+
+    case ::avro::AVRO_RECORD: {
+      // Skip all fields in order
+      for (size_t i = 0; i < avro_node->leaves(); ++i) {
+        ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), 
decoder));
+      }
+      return {};
+    }
+
+    case ::avro::AVRO_ENUM:
+      decoder.decodeEnum();
+      return {};
+
+    case ::avro::AVRO_ARRAY: {
+      const auto& element_node = avro_node->leafAt(0);
+      // skipArray() returns count like arrayStart(), must handle all blocks
+      int64_t block_count = decoder.skipArray();
+      while (block_count > 0) {
+        for (int64_t i = 0; i < block_count; ++i) {
+          ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder));
+        }
+        block_count = decoder.arrayNext();
+      }
+      return {};
+    }
+
+    case ::avro::AVRO_MAP: {
+      const auto& value_node = avro_node->leafAt(1);
+      // skipMap() returns count like mapStart(), must handle all blocks
+      int64_t block_count = decoder.skipMap();
+      while (block_count > 0) {
+        for (int64_t i = 0; i < block_count; ++i) {
+          decoder.skipString();  // Skip key (always string in Avro maps)
+          ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder));
+        }
+        block_count = decoder.mapNext();
+      }
+      return {};
+    }
+
+    case ::avro::AVRO_UNION: {
+      const size_t branch_index = decoder.decodeUnionIndex();
+      // Validate branch index
+      const size_t num_branches = avro_node->leaves();
+      if (branch_index >= num_branches) {
+        return InvalidArgument("Union branch index {} out of range [0, {})", 
branch_index,
+                               num_branches);
+      }
+      return SkipAvroValue(avro_node->leafAt(branch_index), decoder);
+    }
+
+    default:
+      return InvalidArgument("Unsupported Avro type for skipping: {}",
+                             ToString(avro_node));
+  }
+}
+
+/// \brief Decode Avro record directly to Arrow struct builder.
+Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, 
::avro::Decoder& decoder,
+                             const std::span<const FieldProjection>& 
projections,
+                             const StructType& struct_type,
+                             ::arrow::ArrayBuilder* array_builder, 
DecodeContext* ctx) {
+  if (avro_node->type() != ::avro::AVRO_RECORD) {
+    return InvalidArgument("Expected Avro record, got type: {}", 
ToString(avro_node));
+  }
+
+  auto* struct_builder = 
internal::checked_cast<::arrow::StructBuilder*>(array_builder);
+  ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append());
+
+  // Build a map from Avro field index to projection index (cached per struct 
schema)
+  // -1 means the field should be skipped
+  const FieldProjection* cache_key = projections.data();
+  auto cache_it = ctx->avro_to_projection_cache.find(cache_key);
+  std::vector<int>* avro_to_projection;
+
+  if (cache_it != ctx->avro_to_projection_cache.end()) {
+    // Use cached mapping
+    avro_to_projection = &cache_it->second;
+  } else {
+    // Build and cache the mapping
+    auto [inserted_it, inserted] = ctx->avro_to_projection_cache.emplace(
+        cache_key, std::vector<int>(avro_node->leaves(), -1));
+    avro_to_projection = &inserted_it->second;
+
+    for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) {
+      const auto& field_projection = projections[proj_idx];
+      if (field_projection.kind == FieldProjection::Kind::kProjected) {
+        size_t avro_field_index = std::get<size_t>(field_projection.from);
+        (*avro_to_projection)[avro_field_index] = static_cast<int>(proj_idx);
+      }
+    }
+  }
+
+  // Read all Avro fields in order (must maintain decoder position)
+  for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) {
+    int proj_idx = (*avro_to_projection)[avro_idx];
+
+    if (proj_idx < 0) {
+      // Skip this field - not in projection
+      const auto& avro_field_node = avro_node->leafAt(avro_idx);
+      ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder));
+    } else {
+      // Decode this field
+      const auto& field_projection = projections[proj_idx];
+      const auto& expected_field = struct_type.fields()[proj_idx];
+      const auto& avro_field_node = avro_node->leafAt(avro_idx);
+      auto* field_builder = struct_builder->field_builder(proj_idx);
+
+      ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(avro_field_node, decoder,
+                                                     field_projection, 
expected_field,
+                                                     field_builder, ctx));
+    }
+  }
+
+  // Handle null fields (fields in projection but not in Avro)
+  for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) {
+    const auto& field_projection = projections[proj_idx];
+    if (field_projection.kind == FieldProjection::Kind::kNull) {
+      auto* field_builder = 
struct_builder->field_builder(static_cast<int>(proj_idx));
+      ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull());
+    } else if (field_projection.kind != FieldProjection::Kind::kProjected) {
+      return InvalidArgument("Unsupported field projection kind: {}",
+                             static_cast<int>(field_projection.kind));
+    }
+  }
+  return {};
+}
+
+/// \brief Decode Avro array directly to Arrow list builder.
+Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder,
+                           const FieldProjection& element_projection,
+                           const ListType& list_type,
+                           ::arrow::ArrayBuilder* array_builder, 
DecodeContext* ctx) {
+  if (avro_node->type() != ::avro::AVRO_ARRAY) {
+    return InvalidArgument("Expected Avro array, got type: {}", 
ToString(avro_node));
+  }
+
+  auto* list_builder = 
internal::checked_cast<::arrow::ListBuilder*>(array_builder);
+  ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append());
+
+  auto* value_builder = list_builder->value_builder();
+  const auto& element_node = avro_node->leafAt(0);
+  const auto& element_field = list_type.fields().back();
+
+  // Read array block count
+  int64_t block_count = decoder.arrayStart();
+  while (block_count != 0) {
+    for (int64_t i = 0; i < block_count; ++i) {
+      ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(
+          element_node, decoder, element_projection, element_field, 
value_builder, ctx));
+    }
+    block_count = decoder.arrayNext();
+  }
+
+  return {};
+}
+
+/// \brief Decode Avro map directly to Arrow map builder.
+Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder,
+                          const FieldProjection& key_projection,
+                          const FieldProjection& value_projection,
+                          const MapType& map_type, ::arrow::ArrayBuilder* 
array_builder,
+                          DecodeContext* ctx) {
+  auto* map_builder = 
internal::checked_cast<::arrow::MapBuilder*>(array_builder);
+
+  if (avro_node->type() == ::avro::AVRO_MAP) {
+    // Handle regular Avro map: map<string, value>
+    const auto& key_node = avro_node->leafAt(0);
+    const auto& value_node = avro_node->leafAt(1);
+    const auto& key_field = map_type.key();
+    const auto& value_field = map_type.value();
+
+    ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append());
+    auto* key_builder = map_builder->key_builder();
+    auto* item_builder = map_builder->item_builder();
+
+    // Read map block count
+    int64_t block_count = decoder.mapStart();
+    while (block_count != 0) {
+      for (int64_t i = 0; i < block_count; ++i) {
+        ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, 
key_projection,
+                                                       key_field, key_builder, 
ctx));
+        ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(
+            value_node, decoder, value_projection, value_field, item_builder, 
ctx));
+      }
+      block_count = decoder.mapNext();
+    }
+
+    return {};
+  } else if (avro_node->type() == ::avro::AVRO_ARRAY && 
HasMapLogicalType(avro_node)) {
+    // Handle array-based map: list<struct<key, value>>
+    const auto& key_field = map_type.key();
+    const auto& value_field = map_type.value();
+
+    ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append());
+    auto* key_builder = map_builder->key_builder();
+    auto* item_builder = map_builder->item_builder();
+
+    const auto& record_node = avro_node->leafAt(0);
+    if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 
2) {
+      return InvalidArgument(
+          "Array-based map must contain records with exactly 2 fields, got: 
{}",
+          ToString(record_node));
+    }
+    const auto& key_node = record_node->leafAt(0);
+    const auto& value_node = record_node->leafAt(1);
+
+    // Read array block count
+    int64_t block_count = decoder.arrayStart();
+    while (block_count != 0) {
+      for (int64_t i = 0; i < block_count; ++i) {
+        ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, 
key_projection,
+                                                       key_field, key_builder, 
ctx));
+        ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(
+            value_node, decoder, value_projection, value_field, item_builder, 
ctx));
+      }
+      block_count = decoder.arrayNext();
+    }
+
+    return {};
+  } else {
+    return InvalidArgument("Expected Avro map or array with map logical type, 
got: {}",
+                           ToString(avro_node));
+  }
+}
+
+/// \brief Decode nested Avro data directly to Arrow array builder.
+Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node,
+                                  ::avro::Decoder& decoder,
+                                  const std::span<const FieldProjection>& 
projections,
+                                  const NestedType& projected_type,
+                                  ::arrow::ArrayBuilder* array_builder,
+                                  DecodeContext* ctx) {
+  switch (projected_type.type_id()) {
+    case TypeId::kStruct: {
+      const auto& struct_type = internal::checked_cast<const 
StructType&>(projected_type);
+      return DecodeStructToBuilder(avro_node, decoder, projections, 
struct_type,
+                                   array_builder, ctx);
+    }
+
+    case TypeId::kList: {
+      if (projections.size() != 1) {
+        return InvalidArgument("Expected 1 projection for list, got: {}",
+                               projections.size());
+      }
+      const auto& list_type = internal::checked_cast<const 
ListType&>(projected_type);
+      return DecodeListToBuilder(avro_node, decoder, projections[0], list_type,
+                                 array_builder, ctx);
+    }
+
+    case TypeId::kMap: {
+      if (projections.size() != 2) {
+        return InvalidArgument("Expected 2 projections for map, got: {}",
+                               projections.size());
+      }
+      const auto& map_type = internal::checked_cast<const 
MapType&>(projected_type);
+      return DecodeMapToBuilder(avro_node, decoder, projections[0], 
projections[1],
+                                map_type, array_builder, ctx);
+    }
+
+    default:
+      return InvalidArgument("Unsupported nested type: {}", 
projected_type.ToString());
+  }
+}
+
+Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
+                                     ::avro::Decoder& decoder,
+                                     const SchemaField& projected_field,
+                                     ::arrow::ArrayBuilder* array_builder,
+                                     DecodeContext* ctx) {
+  const auto& projected_type = *projected_field.type();
+  if (!projected_type.is_primitive()) {
+    return InvalidArgument("Expected primitive type, got: {}", 
projected_type.ToString());
+  }
+
+  switch (projected_type.type_id()) {
+    case TypeId::kBoolean: {
+      if (avro_node->type() != ::avro::AVRO_BOOL) {
+        return InvalidArgument("Expected Avro boolean for boolean field, got: 
{}",
+                               ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::BooleanBuilder*>(array_builder);
+      bool value = decoder.decodeBool();
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      return {};
+    }
+
+    case TypeId::kInt: {
+      if (avro_node->type() != ::avro::AVRO_INT) {
+        return InvalidArgument("Expected Avro int for int field, got: {}",
+                               ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::Int32Builder*>(array_builder);
+      int32_t value = decoder.decodeInt();
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      return {};
+    }
+
+    case TypeId::kLong: {
+      auto* builder = 
internal::checked_cast<::arrow::Int64Builder*>(array_builder);
+      if (avro_node->type() == ::avro::AVRO_LONG) {
+        int64_t value = decoder.decodeLong();
+        ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      } else if (avro_node->type() == ::avro::AVRO_INT) {
+        int32_t value = decoder.decodeInt();
+        
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value)));
+      } else {
+        return InvalidArgument("Expected Avro int/long for long field, got: 
{}",
+                               ToString(avro_node));
+      }
+      return {};
+    }
+
+    case TypeId::kFloat: {
+      if (avro_node->type() != ::avro::AVRO_FLOAT) {
+        return InvalidArgument("Expected Avro float for float field, got: {}",
+                               ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::FloatBuilder*>(array_builder);
+      float value = decoder.decodeFloat();
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      return {};
+    }
+
+    case TypeId::kDouble: {
+      auto* builder = 
internal::checked_cast<::arrow::DoubleBuilder*>(array_builder);
+      if (avro_node->type() == ::avro::AVRO_DOUBLE) {
+        double value = decoder.decodeDouble();
+        ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      } else if (avro_node->type() == ::avro::AVRO_FLOAT) {
+        float value = decoder.decodeFloat();
+        
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value)));
+      } else {
+        return InvalidArgument("Expected Avro float/double for double field, 
got: {}",
+                               ToString(avro_node));
+      }
+      return {};
+    }
+
+    case TypeId::kString: {
+      if (avro_node->type() != ::avro::AVRO_STRING) {
+        return InvalidArgument("Expected Avro string for string field, got: 
{}",
+                               ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::StringBuilder*>(array_builder);
+      decoder.decodeString(ctx->string_scratch);
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->string_scratch));
+      return {};
+    }
+
+    case TypeId::kBinary: {
+      if (avro_node->type() != ::avro::AVRO_BYTES) {
+        return InvalidArgument("Expected Avro bytes for binary field, got: {}",
+                               ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::BinaryBuilder*>(array_builder);
+      decoder.decodeBytes(ctx->bytes_scratch);
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(
+          ctx->bytes_scratch.data(), 
static_cast<int32_t>(ctx->bytes_scratch.size())));
+      return {};
+    }
+
+    case TypeId::kFixed: {
+      if (avro_node->type() != ::avro::AVRO_FIXED) {
+        return InvalidArgument("Expected Avro fixed for fixed field, got: {}",
+                               ToString(avro_node));
+      }
+      const auto& fixed_type = internal::checked_cast<const 
FixedType&>(projected_type);
+      auto* builder =
+          
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
+
+      ctx->bytes_scratch.resize(fixed_type.length());
+      decoder.decodeFixed(fixed_type.length(), ctx->bytes_scratch);
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
+      return {};
+    }
+
+    case TypeId::kUuid: {
+      if (avro_node->type() != ::avro::AVRO_FIXED ||
+          avro_node->logicalType().type() != ::avro::LogicalType::UUID) {
+        return InvalidArgument("Expected Avro fixed for uuid field, got: {}",
+                               ToString(avro_node));
+      }
+
+      auto* builder =
+          
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
+
+      ctx->bytes_scratch.resize(16);
+      decoder.decodeFixed(16, ctx->bytes_scratch);
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
+      return {};
+    }
+
+    case TypeId::kDecimal: {
+      if (avro_node->type() != ::avro::AVRO_FIXED ||
+          avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) {
+        return InvalidArgument(
+            "Expected Avro fixed with DECIMAL logical type for decimal field, 
got: {}",
+            ToString(avro_node));
+      }
+
+      size_t byte_width = avro_node->fixedSize();
+      auto* builder = 
internal::checked_cast<::arrow::Decimal128Builder*>(array_builder);
+
+      ctx->bytes_scratch.resize(byte_width);
+      decoder.decodeFixed(byte_width, ctx->bytes_scratch);
+      ICEBERG_ARROW_ASSIGN_OR_RETURN(
+          auto decimal, 
::arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(),
+                                                           
ctx->bytes_scratch.size()));
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal));
+      return {};
+    }
+
+    case TypeId::kDate: {
+      if (avro_node->type() != ::avro::AVRO_INT ||
+          avro_node->logicalType().type() != ::avro::LogicalType::DATE) {
+        return InvalidArgument(
+            "Expected Avro int with DATE logical type for date field, got: {}",
+            ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::Date32Builder*>(array_builder);
+      int32_t value = decoder.decodeInt();
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      return {};
+    }
+
+    case TypeId::kTime: {
+      if (avro_node->type() != ::avro::AVRO_LONG ||
+          avro_node->logicalType().type() != ::avro::LogicalType::TIME_MICROS) 
{
+        return InvalidArgument(
+            "Expected Avro long with TIME_MICROS for time field, got: {}",
+            ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::Time64Builder*>(array_builder);
+      int64_t value = decoder.decodeLong();
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      return {};
+    }
+
+    case TypeId::kTimestamp:
+    case TypeId::kTimestampTz: {
+      if (avro_node->type() != ::avro::AVRO_LONG ||
+          avro_node->logicalType().type() != 
::avro::LogicalType::TIMESTAMP_MICROS) {
+        return InvalidArgument(
+            "Expected Avro long with TIMESTAMP_MICROS for timestamp field, 
got: {}",
+            ToString(avro_node));
+      }
+      auto* builder = 
internal::checked_cast<::arrow::TimestampBuilder*>(array_builder);
+      int64_t value = decoder.decodeLong();
+      ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+      return {};
+    }
+
+    default:
+      return InvalidArgument("Unsupported primitive type {} to decode from 
avro node {}",
+                             projected_field.type()->ToString(), 
ToString(avro_node));
+  }
+}
+
+/// \brief Dispatch to appropriate handlers based on the projection kind.
+Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder,
+                            const FieldProjection& projection,
+                            const SchemaField& projected_field,
+                            ::arrow::ArrayBuilder* array_builder, 
DecodeContext* ctx) {
+  if (avro_node->type() == ::avro::AVRO_UNION) {
+    const size_t branch_index = decoder.decodeUnionIndex();
+
+    // Validate branch index
+    const size_t num_branches = avro_node->leaves();
+    if (branch_index >= num_branches) {
+      return InvalidArgument("Union branch index {} out of range [0, {})", 
branch_index,
+                             num_branches);
+    }
+
+    const auto& branch_node = avro_node->leafAt(branch_index);
+    if (branch_node->type() == ::avro::AVRO_NULL) {
+      ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
+      return {};
+    } else {
+      return DecodeFieldToBuilder(branch_node, decoder, projection, 
projected_field,
+                                  array_builder, ctx);
+    }
+  }
+
+  const auto& projected_type = *projected_field.type();
+  if (projected_type.is_primitive()) {
+    return DecodePrimitiveValueToBuilder(avro_node, decoder, projected_field,
+                                         array_builder, ctx);
+  } else {
+    const auto& nested_type = internal::checked_cast<const 
NestedType&>(projected_type);
+    return DecodeNestedValueToBuilder(avro_node, decoder, projection.children,
+                                      nested_type, array_builder, ctx);
+  }
+}
+
+}  // namespace
+
+Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder,
+                           const SchemaProjection& projection,
+                           const Schema& projected_schema,
+                           ::arrow::ArrayBuilder* array_builder, 
DecodeContext* ctx) {
+  return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields,
+                                    projected_schema, array_builder, ctx);
+}
+
+}  // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_direct_decoder_internal.h 
b/src/iceberg/avro/avro_direct_decoder_internal.h
new file mode 100644
index 00000000..df4587fd
--- /dev/null
+++ b/src/iceberg/avro/avro_direct_decoder_internal.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <span>
+
+#include <arrow/array/builder_base.h>
+#include <avro/Decoder.hh>
+#include <avro/Node.hh>
+
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/schema_util.h"
+
+namespace iceberg::avro {
+
+/// \brief Context for reusing scratch buffers during Avro decoding
+///
+/// Avoids frequent small allocations by reusing temporary buffers across
+/// multiple decode operations. This is particularly important for string,
+/// binary, and fixed-size data types.
+struct DecodeContext {
+  // Scratch buffer for string decoding (reused across rows)
+  std::string string_scratch;
+  // Scratch buffer for binary/fixed/uuid/decimal data (reused across rows)
+  std::vector<uint8_t> bytes_scratch;
+  // Cache for avro field index to projection index mapping
+  // Key: pointer to projections array (identifies struct schema)
+  // Value: vector mapping avro field index -> projection index (-1 if not 
projected)
+  std::unordered_map<const FieldProjection*, std::vector<int>> 
avro_to_projection_cache;
+};
+
+/// \brief Directly decode Avro data to Arrow array builders without 
GenericDatum
+///
+/// Eliminates the GenericDatum intermediate layer by directly calling Avro 
decoder
+/// methods and immediately appending to Arrow builders. Matches Java Iceberg's
+/// ValueReader approach for better performance.
+///
+/// Features:
+/// - All primitive, temporal, and logical types
+/// - Nested types (struct, list, map)
+/// - Union types with bounds checking
+/// - Field skipping for schema projection
+///
+/// Schema Evolution:
+/// Schema resolution is handled via SchemaProjection (from Project() 
function).
+/// Supports field reordering and missing fields (set to NULL). Default values
+/// are NOT currently supported.
+///
+/// Error Handling:
+/// - Type mismatches → InvalidArgument
+/// - Union branch out of range → InvalidArgument
+/// - Decimal precision insufficient → InvalidArgument
+/// - Missing logical type → InvalidArgument
+///
+/// \param avro_node The Avro schema node for the data being decoded
+/// \param decoder The Avro decoder positioned at the data to read
+/// \param projection The field projections (from Project() function)
+/// \param projected_schema The target Iceberg schema after projection
+/// \param array_builder The Arrow array builder to append decoded data to
+/// \param ctx Decode context for reusing scratch buffers
+/// \return Status::OK if successful, or an error status
+Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& 
decoder,
+                           const SchemaProjection& projection,
+                           const Schema& projected_schema,
+                           ::arrow::ArrayBuilder* array_builder, 
DecodeContext* ctx);
+
+}  // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 932dd0f1..fa7337f0 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -34,6 +34,7 @@
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
 #include "iceberg/arrow/arrow_status_internal.h"
 #include "iceberg/avro/avro_data_util_internal.h"
+#include "iceberg/avro/avro_direct_decoder_internal.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/avro/avro_schema_util_internal.h"
 #include "iceberg/avro/avro_stream_internal.h"
@@ -62,18 +63,19 @@ Result<std::unique_ptr<AvroInputStream>> 
CreateInputStream(const ReaderOptions&
 
 // A stateful context to keep track of the reading progress.
 struct ReadContext {
-  // The datum to reuse for reading the data.
-  std::unique_ptr<::avro::GenericDatum> datum_;
   // The arrow schema to build the record batch.
   std::shared_ptr<::arrow::Schema> arrow_schema_;
   // The builder to build the record batch.
   std::shared_ptr<::arrow::ArrayBuilder> builder_;
+  // GenericDatum for GenericDatum-based decoding (only used if direct decoder 
is
+  // disabled)
+  std::unique_ptr<::avro::GenericDatum> datum_;
+  // Decode context for reusing scratch buffers (only used if direct decoder is
+  // enabled)
+  DecodeContext decode_context_;
 };
 
-// TODO(gang.wu): there are a lot to do to make this reader work.
-// 1. prune the reader schema based on the projection
-// 2. read key-value metadata from the avro file
-// 3. collect basic reader metrics
+// TODO(gang.wu): collect basic reader metrics
 class AvroReader::Impl {
  public:
   Status Open(const ReaderOptions& options) {
@@ -83,6 +85,7 @@ class AvroReader::Impl {
     }
 
     batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
+    use_direct_decoder_ = 
options.properties->Get(ReaderProperties::kAvroSkipDatum);
     read_schema_ = options.projection;
 
     // Open the input stream and adapt to the avro interface.
@@ -91,10 +94,21 @@ class AvroReader::Impl {
     ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
                             CreateInputStream(options, kDefaultBufferSize));
 
-    // Create a base reader without setting reader schema to enable projection.
-    auto base_reader =
-        std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
-    ::avro::ValidSchema file_schema = base_reader->dataSchema();
+    ::avro::ValidSchema file_schema;
+
+    if (use_direct_decoder_) {
+      // Create base reader for direct decoder access
+      auto base_reader =
+          
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
+      file_schema = base_reader->dataSchema();
+      base_reader_ = std::move(base_reader);
+    } else {
+      // Create DataFileReader<GenericDatum> for GenericDatum-based decoding
+      auto datum_reader = 
std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
+          std::move(input_stream));
+      file_schema = datum_reader->dataSchema();
+      datum_reader_ = std::move(datum_reader);
+    }
 
     // Validate field ids in the file schema.
     HasIdVisitor has_id_visitor;
@@ -121,13 +135,22 @@ class AvroReader::Impl {
     // TODO(gangwu): support pruning source fields
     ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, 
file_schema.root(),
                                                  /*prune_source=*/false));
-    reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
-        std::move(base_reader), file_schema);
 
-    if (options.split) {
-      reader_->sync(options.split->offset);
-      split_end_ = options.split->offset + options.split->length;
+    if (use_direct_decoder_) {
+      // Initialize the base reader with the file schema
+      base_reader_->init(file_schema);
+      if (options.split) {
+        base_reader_->sync(options.split->offset);
+        split_end_ = options.split->offset + options.split->length;
+      }
+    } else {
+      // The datum reader is already initialized during construction
+      if (options.split) {
+        datum_reader_->sync(options.split->offset);
+        split_end_ = options.split->offset + options.split->length;
+      }
     }
+
     return {};
   }
 
@@ -137,25 +160,37 @@ class AvroReader::Impl {
     }
 
     while (context_->builder_->length() < batch_size_) {
-      if (split_end_ && reader_->pastSync(split_end_.value())) {
+      if (IsPastSync()) {
         break;
       }
-      if (!reader_->read(*context_->datum_)) {
-        break;
+
+      if (use_direct_decoder_) {
+        // Direct decoder: decode Avro to Arrow without GenericDatum
+        if (!base_reader_->hasMore()) {
+          break;
+        }
+        base_reader_->decr();
+
+        ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
+            GetReaderSchema().root(), base_reader_->decoder(), projection_, 
*read_schema_,
+            context_->builder_.get(), &context_->decode_context_));
+      } else {
+        // GenericDatum-based decoding: decode via GenericDatum intermediate
+        if (!datum_reader_->read(*context_->datum_)) {
+          break;
+        }
+
+        ICEBERG_RETURN_UNEXPECTED(
+            AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_, 
projection_,
+                                 *read_schema_, context_->builder_.get()));
       }
-      ICEBERG_RETURN_UNEXPECTED(
-          AppendDatumToBuilder(reader_->readerSchema().root(), 
*context_->datum_,
-                               projection_, *read_schema_, 
context_->builder_.get()));
     }
 
     return ConvertBuilderToArrowArray();
   }
 
   Status Close() {
-    if (reader_ != nullptr) {
-      reader_->close();
-      reader_.reset();
-    }
+    CloseReader();
     context_.reset();
     return {};
   }
@@ -174,12 +209,12 @@ class AvroReader::Impl {
   }
 
   Result<std::unordered_map<std::string, std::string>> Metadata() {
-    if (reader_ == nullptr) {
+    if ((use_direct_decoder_ && !base_reader_) ||
+        (!use_direct_decoder_ && !datum_reader_)) {
       return Invalid("Reader is not opened");
     }
 
-    const auto& metadata = reader_->metadata();
-
+    const auto& metadata = GetReaderMetadata();
     std::unordered_map<std::string, std::string> metadata_map;
     metadata_map.reserve(metadata.size());
 
@@ -194,7 +229,6 @@ class AvroReader::Impl {
  private:
   Status InitReadContext() {
     context_ = std::make_unique<ReadContext>();
-    context_->datum_ = 
std::make_unique<::avro::GenericDatum>(reader_->readerSchema());
 
     ArrowSchema arrow_schema;
     ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
@@ -214,6 +248,11 @@ class AvroReader::Impl {
     }
     context_->builder_ = builder_result.MoveValueUnsafe();
 
+    // Initialize GenericDatum for GenericDatum-based decoding
+    if (!use_direct_decoder_) {
+      context_->datum_ = 
std::make_unique<::avro::GenericDatum>(GetReaderSchema());
+    }
+
     return {};
   }
 
@@ -238,17 +277,52 @@ class AvroReader::Impl {
     return arrow_array;
   }
 
+  bool IsPastSync() const {
+    if (!split_end_) {
+      return false;
+    }
+    return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value())
+                               : datum_reader_->pastSync(split_end_.value());
+  }
+
+  const ::avro::Metadata& GetReaderMetadata() const {
+    return use_direct_decoder_ ? base_reader_->metadata() : 
datum_reader_->metadata();
+  }
+
+  void CloseReader() {
+    if (use_direct_decoder_) {
+      if (base_reader_) {
+        base_reader_->close();
+        base_reader_.reset();
+      }
+    } else {
+      if (datum_reader_) {
+        datum_reader_->close();
+        datum_reader_.reset();
+      }
+    }
+  }
+
+  const ::avro::ValidSchema& GetReaderSchema() const {
+    return use_direct_decoder_ ? base_reader_->readerSchema()
+                               : datum_reader_->readerSchema();
+  }
+
  private:
   // Max number of rows in the record batch to read.
   int64_t batch_size_{};
+  // Whether to use direct decoder (true) or GenericDatum-based decoder 
(false).
+  bool use_direct_decoder_{true};
   // The end of the split to read and used to terminate the reading.
   std::optional<int64_t> split_end_;
   // The schema to read.
   std::shared_ptr<::iceberg::Schema> read_schema_;
   // The projection result to apply to the read schema.
   SchemaProjection projection_;
-  // The avro reader to read the data into a datum.
-  std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
+  // The avro reader base - provides direct access to decoder for direct 
decoding.
+  std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
+  // The datum reader for GenericDatum-based decoding.
+  std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
   // The context to keep track of the reading progress.
   std::unique_ptr<ReadContext> context_;
 };
diff --git a/src/iceberg/avro/avro_scan.cc b/src/iceberg/avro/avro_scan.cc
new file mode 100644
index 00000000..3e690360
--- /dev/null
+++ b/src/iceberg/avro/avro_scan.cc
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <chrono>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/filesystem/localfs.h>
+#include <arrow/type.h>
+
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/schema.h"
+
+void PrintUsage(const char* program_name) {
+  std::cerr << "Usage: " << program_name << " [options] <avro_file>\n"
+            << "Options:\n"
+            << "  --skip-datum=<true|false>  Use direct decoder (default: 
true)\n"
+            << "  --batch-size=<N>           Batch size for reading (default: 
4096)\n"
+            << "  --help                     Show this help message\n"
+            << "\nExample:\n"
+            << "  " << program_name
+            << " --skip-datum=false --batch-size=1000 data.avro\n";
+}
+
+int main(int argc, char* argv[]) {
+  iceberg::avro::RegisterAll();
+
+  if (argc < 2) {
+    PrintUsage(argv[0]);
+    return 1;
+  }
+
+  std::string avro_file;
+  bool skip_datum = true;
+  int64_t batch_size = 4096;
+
+  // Parse arguments
+  for (int i = 1; i < argc; ++i) {
+    std::string arg = argv[i];
+    if (arg == "--help") {
+      PrintUsage(argv[0]);
+      return 0;
+    } else if (arg.starts_with("--skip-datum=")) {
+      std::string value = arg.substr(13);
+      if (value == "true" || value == "1") {
+        skip_datum = true;
+      } else if (value == "false" || value == "0") {
+        skip_datum = false;
+      } else {
+        std::cerr << "Invalid value for --skip-datum: " << value << "\n";
+        return 1;
+      }
+    } else if (arg.starts_with("--batch-size=")) {
+      batch_size = std::stoll(arg.substr(13));
+      if (batch_size <= 0) {
+        std::cerr << "Batch size must be positive\n";
+        return 1;
+      }
+    } else if (arg[0] == '-') {
+      std::cerr << "Unknown option: " << arg << "\n";
+      PrintUsage(argv[0]);
+      return 1;
+    } else {
+      avro_file = arg;
+    }
+  }
+
+  if (avro_file.empty()) {
+    std::cerr << "Error: No Avro file specified\n";
+    PrintUsage(argv[0]);
+    return 1;
+  }
+
+  std::cout << "Scanning Avro file: " << avro_file << "\n";
+  std::cout << "Skip datum: " << (skip_datum ? "true" : "false") << "\n";
+  std::cout << "Batch size: " << batch_size << "\n";
+  std::cout << std::string(60, '-') << "\n";
+
+  auto local_fs = std::make_shared<::arrow::fs::LocalFileSystem>();
+  auto file_io = 
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs);
+
+  // Get file info
+  auto file_info_result = local_fs->GetFileInfo(avro_file);
+  if (!file_info_result.ok()) {
+    std::cerr << "Error: Cannot access file: " << 
file_info_result.status().message()
+              << "\n";
+    return 1;
+  }
+  auto file_info = file_info_result.ValueOrDie();
+  if (file_info.type() != ::arrow::fs::FileType::File) {
+    std::cerr << "Error: Not a file: " << avro_file << "\n";
+    return 1;
+  }
+
+  std::cout << "File size: " << file_info.size() << " bytes\n";
+
+  // Configure reader properties
+  auto reader_properties = iceberg::ReaderProperties::default_properties();
+  reader_properties->Set(iceberg::ReaderProperties::kAvroSkipDatum, 
skip_datum);
+  reader_properties->Set(iceberg::ReaderProperties::kBatchSize, batch_size);
+
+  // Open reader (without projection to read all columns)
+  auto reader_result = iceberg::ReaderFactoryRegistry::Open(
+      iceberg::FileFormatType::kAvro, {.path = avro_file,
+                                       .length = file_info.size(),
+                                       .io = file_io,
+                                       .projection = nullptr,
+                                       .properties = 
std::move(reader_properties)});
+
+  if (!reader_result.has_value()) {
+    std::cerr << "Error opening reader: " << reader_result.error().message << 
"\n";
+    return 1;
+  }
+
+  auto reader = std::move(reader_result.value());
+
+  // Get schema
+  auto schema_result = reader->Schema();
+  if (!schema_result.has_value()) {
+    std::cerr << "Error getting schema: " << schema_result.error().message << 
"\n";
+    return 1;
+  }
+  auto arrow_schema = schema_result.value();
+  auto arrow_schema_import = ::arrow::ImportType(&arrow_schema);
+  if (!arrow_schema_import.ok()) {
+    std::cerr << "Error importing schema: " << 
arrow_schema_import.status().message()
+              << "\n";
+    return 1;
+  }
+  std::cout << "Schema: " << arrow_schema_import.ValueOrDie()->ToString() << 
"\n";
+  std::cout << std::string(60, '-') << "\n";
+
+  // Scan file and measure time
+  auto start = std::chrono::high_resolution_clock::now();
+
+  int64_t total_rows = 0;
+  int64_t batch_count = 0;
+
+  while (true) {
+    auto batch_result = reader->Next();
+    if (!batch_result.has_value()) {
+      std::cerr << "Error reading batch: " << batch_result.error().message << 
"\n";
+      return 1;
+    }
+
+    auto batch_opt = batch_result.value();
+    if (!batch_opt.has_value()) {
+      // End of file
+      break;
+    }
+
+    auto arrow_array = batch_opt.value();
+    auto arrow_type = arrow_schema_import.ValueOrDie();
+    auto array_import = ::arrow::ImportArray(&arrow_array, arrow_type);
+    if (!array_import.ok()) {
+      std::cerr << "Error importing array: " << 
array_import.status().message() << "\n";
+      return 1;
+    }
+
+    int64_t batch_rows = array_import.ValueOrDie()->length();
+    total_rows += batch_rows;
+    batch_count++;
+  }
+
+  auto end = std::chrono::high_resolution_clock::now();
+  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - 
start);
+
+  // Print results
+  std::cout << "\nResults:\n";
+  std::cout << "  Total rows: " << total_rows << "\n";
+  std::cout << "  Batches: " << batch_count << "\n";
+  std::cout << "  Time: " << duration.count() << " ms\n";
+  std::cout << "  Throughput: "
+            << (duration.count() > 0 ? (total_rows * 1000 / duration.count()) 
: 0)
+            << " rows/sec\n";
+  std::cout << "  Throughput: "
+            << (duration.count() > 0
+                    ? (file_info.size() / 1024.0 / 1024.0) / (duration.count() 
/ 1000.0)
+                    : 0)
+            << " MB/sec\n";
+
+  return 0;
+}
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index 85221a6a..a5af0a41 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -76,6 +76,11 @@ class ReaderProperties : public ConfigBase<ReaderProperties> 
{
   /// \brief The batch size to read.
   inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
 
+  /// \brief Skip GenericDatum in Avro reader for better performance.
+  /// When true, decode directly from Avro to Arrow without GenericDatum 
intermediate.
+  /// Default: true (skip GenericDatum for better performance).
+  inline static Entry<bool> kAvroSkipDatum{"read.avro.skip-datum", true};
+
   /// \brief Create a default ReaderProperties instance.
   static std::unique_ptr<ReaderProperties> default_properties();
 
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 1d421ede..c1bb8bc9 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+#include <sstream>
+
 #include <arrow/array/array_base.h>
 #include <arrow/c/bridge.h>
 #include <arrow/filesystem/localfs.h>
@@ -51,6 +53,8 @@ class AvroReaderTest : public TempFileTestBase {
     temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
   }
 
+  bool skip_datum_{true};
+
   void CreateSimpleAvroFile() {
     const std::string avro_schema_json = R"({
       "type": "record",
@@ -139,11 +143,15 @@ class AvroReaderTest : public TempFileTestBase {
     ASSERT_TRUE(file_info_result.ok());
     ASSERT_EQ(file_info_result->size(), writer->length().value());
 
-    auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
-                                                     {.path = temp_avro_file_,
-                                                      .length = 
file_info_result->size(),
-                                                      .io = file_io_,
-                                                      .projection = schema});
+    auto reader_properties = ReaderProperties::default_properties();
+    reader_properties->Set(ReaderProperties::kAvroSkipDatum, skip_datum_);
+
+    auto reader_result = ReaderFactoryRegistry::Open(
+        FileFormatType::kAvro, {.path = temp_avro_file_,
+                                .length = file_info_result->size(),
+                                .io = file_io_,
+                                .projection = schema,
+                                .properties = std::move(reader_properties)});
     ASSERT_THAT(reader_result, IsOk());
     auto reader = std::move(reader_result.value());
     ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
@@ -164,6 +172,16 @@ class AvroReaderTest : public TempFileTestBase {
   std::string temp_avro_file_;
 };
 
+// Parameterized test fixture for testing both DirectDecoder and GenericDatum 
modes
+class AvroReaderParameterizedTest : public AvroReaderTest,
+                                    public ::testing::WithParamInterface<bool> 
{
+ protected:
+  void SetUp() override {
+    AvroReaderTest::SetUp();
+    skip_datum_ = GetParam();
+  }
+};
+
 TEST_F(AvroReaderTest, ReadTwoFields) {
   CreateSimpleAvroFile();
   auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
@@ -220,7 +238,7 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
   ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
 }
 
-TEST_F(AvroReaderTest, AvroWriterBasicType) {
+TEST_P(AvroReaderParameterizedTest, AvroWriterBasicType) {
   auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
       SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});
 
@@ -229,7 +247,7 @@ TEST_F(AvroReaderTest, AvroWriterBasicType) {
   WriteAndVerify(schema, expected_string);
 }
 
-TEST_F(AvroReaderTest, AvroWriterNestedType) {
+TEST_P(AvroReaderParameterizedTest, AvroWriterNestedType) {
   auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
       SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
       SchemaField::MakeRequired(
@@ -244,4 +262,245 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) {
   WriteAndVerify(schema, expected_string);
 }
 
+TEST_P(AvroReaderParameterizedTest, AllPrimitiveTypes) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "bool_col", 
std::make_shared<BooleanType>()),
+      SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(3, "long_col", std::make_shared<LongType>()),
+      SchemaField::MakeRequired(4, "float_col", std::make_shared<FloatType>()),
+      SchemaField::MakeRequired(5, "double_col", 
std::make_shared<DoubleType>()),
+      SchemaField::MakeRequired(6, "string_col", 
std::make_shared<StringType>()),
+      SchemaField::MakeRequired(7, "binary_col", 
std::make_shared<BinaryType>())});
+
+  std::string expected_string = R"([
+    [true, 42, 1234567890, 3.14, 2.71828, "test", "AQID"],
+    [false, -100, -9876543210, -1.5, 0.0, "hello", "BAUG"]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+// Skipping DecimalType test - requires specific decimal encoding in JSON
+
+TEST_P(AvroReaderParameterizedTest, DateTimeTypes) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()),
+      SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()),
+      SchemaField::MakeRequired(3, "timestamp_col", 
std::make_shared<TimestampType>())});
+
+  // Dates as days since epoch, time/timestamps as microseconds
+  std::string expected_string = R"([
+    [18628, 43200000000, 1640995200000000],
+    [18629, 86399000000, 1641081599000000]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, NestedStruct) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(
+          2, "person",
+          std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
+              SchemaField::MakeRequired(3, "name", 
std::make_shared<StringType>()),
+              SchemaField::MakeRequired(4, "age", std::make_shared<IntType>()),
+              SchemaField::MakeOptional(
+                  5, "address",
+                  
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
+                      SchemaField::MakeRequired(6, "street",
+                                                
std::make_shared<StringType>()),
+                      SchemaField::MakeRequired(7, "city",
+                                                
std::make_shared<StringType>())}))}))});
+
+  std::string expected_string = R"([
+    [1, ["Alice", 30, ["123 Main St", "NYC"]]],
+    [2, ["Bob", 25, ["456 Oak Ave", "LA"]]]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, ListType) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(2, "tags",
+                                
std::make_shared<ListType>(SchemaField::MakeRequired(
+                                    3, "element", 
std::make_shared<StringType>())))});
+
+  std::string expected_string = R"([
+    [1, ["tag1", "tag2", "tag3"]],
+    [2, ["foo", "bar"]],
+    [3, []]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, MapType) {
+  auto schema = std::make_shared<iceberg::Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(
+          1, "properties",
+          std::make_shared<MapType>(
+              SchemaField::MakeRequired(2, "key", 
std::make_shared<StringType>()),
+              SchemaField::MakeRequired(3, "value", 
std::make_shared<IntType>())))});
+
+  std::string expected_string = R"([
+    [[["key1", 100], ["key2", 200]]],
+    [[["a", 1], ["b", 2], ["c", 3]]]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, MapTypeWithNonStringKey) {
+  auto schema = std::make_shared<iceberg::Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(
+          1, "int_map",
+          std::make_shared<MapType>(
+              SchemaField::MakeRequired(2, "key", std::make_shared<IntType>()),
+              SchemaField::MakeRequired(3, "value", 
std::make_shared<StringType>())))});
+
+  std::string expected_string = R"([
+    [[[1, "one"], [2, "two"], [3, "three"]]],
+    [[[10, "ten"], [20, "twenty"]]]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+TEST_F(AvroReaderTest, ProjectionSubsetAndReorder) {
+  // Write file with full schema
+  auto write_schema = 
std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(2, "name", std::make_shared<StringType>()),
+      SchemaField::MakeRequired(3, "age", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(4, "city", std::make_shared<StringType>())});
+
+  std::string write_data = R"([
+    [1, "Alice", 25, "NYC"],
+    [2, "Bob", 30, "SF"],
+    [3, "Charlie", 35, "LA"]
+  ])";
+
+  // Write with full schema
+  ArrowSchema arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(*write_schema, &arrow_c_schema), IsOk());
+  auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
+  ASSERT_TRUE(arrow_schema_result.ok());
+  auto arrow_schema = arrow_schema_result.ValueOrDie();
+
+  auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, 
write_data);
+  ASSERT_TRUE(array_result.ok());
+  auto array = array_result.ValueOrDie();
+
+  struct ArrowArray arrow_array;
+  auto export_result = ::arrow::ExportArray(*array, &arrow_array);
+  ASSERT_TRUE(export_result.ok());
+
+  std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}};
+  auto writer_result =
+      WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = 
temp_avro_file_,
+                                                          .schema = 
write_schema,
+                                                          .io = file_io_,
+                                                          .metadata = 
metadata});
+  ASSERT_TRUE(writer_result.has_value());
+  auto writer = std::move(writer_result.value());
+  ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  // Read with projected schema: subset of columns (city, id) in different 
order
+  auto read_schema = 
std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(4, "city", std::make_shared<StringType>()),
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>())});
+
+  auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
+  ASSERT_TRUE(file_info_result.ok());
+
+  auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
+                                                   {.path = temp_avro_file_,
+                                                    .length = 
file_info_result->size(),
+                                                    .io = file_io_,
+                                                    .projection = 
read_schema});
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+
+  // Verify reordered subset
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyNextBatch(*reader, R"([["NYC", 1], ["SF", 2], ["LA", 3]])"));
+  ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
+}
+
+TEST_P(AvroReaderParameterizedTest, ComplexNestedTypes) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(2, "nested_list",
+                                
std::make_shared<ListType>(SchemaField::MakeRequired(
+                                    3, "element",
+                                    
std::make_shared<ListType>(SchemaField::MakeRequired(
+                                        4, "element", 
std::make_shared<IntType>())))))});
+
+  std::string expected_string = R"([
+    [1, [[1, 2], [3, 4]]],
+    [2, [[5], [6, 7, 8]]]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, OptionalFieldsWithNulls) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()),
+      SchemaField::MakeOptional(3, "age", std::make_shared<IntType>())});
+
+  std::string expected_string = R"([
+    [1, "Alice", 30],
+    [2, null, 25],
+    [3, "Charlie", null],
+    [4, null, null]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+// Test both direct decoder and GenericDatum paths
+TEST_P(AvroReaderParameterizedTest, LargeDataset) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<LongType>()),
+      SchemaField::MakeRequired(2, "value", std::make_shared<DoubleType>())});
+
+  // Generate large dataset JSON
+  std::ostringstream json;
+  json << "[";
+  for (int i = 0; i < 1000; ++i) {
+    if (i > 0) json << ", ";
+    json << "[" << i << ", " << (i * 1.5) << "]";
+  }
+  json << "]";
+
+  WriteAndVerify(schema, json.str());
+}
+
+TEST_P(AvroReaderParameterizedTest, EmptyCollections) {
+  auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+      SchemaField::MakeRequired(2, "list_col",
+                                
std::make_shared<ListType>(SchemaField::MakeRequired(
+                                    3, "element", 
std::make_shared<IntType>())))});
+
+  std::string expected_string = R"([
+    [1, []],
+    [2, [10, 20, 30]]
+  ])";
+
+  WriteAndVerify(schema, expected_string);
+}
+
+INSTANTIATE_TEST_SUITE_P(DirectDecoderModes, AvroReaderParameterizedTest,
+                         ::testing::Bool(),
+                         [](const ::testing::TestParamInfo<bool>& info) {
+                           return info.param ? "DirectDecoder" : 
"GenericDatum";
+                         });
+
 }  // namespace iceberg::avro
diff --git a/src/iceberg/test/temp_file_test_base.h 
b/src/iceberg/test/temp_file_test_base.h
index 8e20e2ca..4b3131d1 100644
--- a/src/iceberg/test/temp_file_test_base.h
+++ b/src/iceberg/test/temp_file_test_base.h
@@ -118,7 +118,14 @@ class TempFileTestBase : public ::testing::Test {
   /// \brief Get the test name for inclusion in the filename
   std::string TestInfo() const {
     if (const auto info = 
::testing::UnitTest::GetInstance()->current_test_info(); info) {
-      return std::format("{}_{}", info->test_suite_name(), info->name());
+      std::string result = std::format("{}_{}", info->test_suite_name(), 
info->name());
+      // Replace slashes (from parameterized tests) with underscores to avoid 
path issues
+      for (auto& c : result) {
+        if (c == '/') {
+          c = '_';
+        }
+      }
+      return result;
     }
     return "unknown_test";
   }

Reply via email to