This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d4ad8f7f feat(avro): support writing multiple blocks (#470)
d4ad8f7f is described below

commit d4ad8f7fb4c51f67bd2a45e431d674d67ef514db
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jan 5 09:48:25 2026 +0800

    feat(avro): support writing multiple blocks (#470)
---
 src/iceberg/avro/avro_writer.cc |  1 +
 src/iceberg/test/avro_test.cc   | 54 ++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index b426a756..307f2fd6 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -80,6 +80,7 @@ class DirectEncoderBackend : public AvroWriteBackend {
 
   Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
                   int64_t row_index) override {
+    writer_->syncIfNeeded();
     ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_, 
writer_->encoder(),
                                                 write_schema, array, row_index,
                                                 encode_ctx_));
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 3ebc4c10..404f763a 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -18,15 +18,19 @@
  */
 
 #include <sstream>
+#include <unordered_map>
 
 #include <arrow/array.h>
 #include <arrow/array/array_base.h>
 #include <arrow/c/bridge.h>
 #include <arrow/json/from_string.h>
+#include <avro/DataFile.hh>
+#include <avro/Generic.hh>
 #include <gtest/gtest.h>
 
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
 #include "iceberg/avro/avro_register.h"
+#include "iceberg/avro/avro_stream_internal.h"
 #include "iceberg/avro/avro_writer.h"
 #include "iceberg/file_reader.h"
 #include "iceberg/metadata_columns.h"
@@ -34,6 +38,7 @@
 #include "iceberg/schema_internal.h"
 #include "iceberg/test/matchers.h"
 #include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
 
 namespace iceberg::avro {
 
@@ -639,7 +644,9 @@ class AvroWriterTest : public ::testing::Test,
     skip_datum_ = GetParam();
   }
 
-  void WriteAvroFile(std::shared_ptr<Schema> schema, const std::string& 
json_data) {
+  void WriteAvroFile(
+      std::shared_ptr<Schema> schema, const std::string& json_data,
+      const std::unordered_map<std::string, std::string>& extra_properties = 
{}) {
     ArrowSchema arrow_c_schema;
     ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
 
@@ -660,6 +667,9 @@ class AvroWriterTest : public ::testing::Test,
 
     auto writer_properties = WriterProperties::default_properties();
     writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_);
+    for (const auto& [key, value] : extra_properties) {
+      writer_properties->mutable_configs().emplace(key, value);
+    }
 
     auto writer_result = WriterFactoryRegistry::Open(
         FileFormatType::kAvro, {.path = temp_avro_file_,
@@ -884,6 +894,48 @@ TEST_P(AvroWriterTest, WriteLargeDataset) {
   VerifyWrittenData(json.str());
 }
 
+TEST_P(AvroWriterTest, MultipleAvroBlocks) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeRequired(2, "name", 
string())});
+
+  const std::string json_data = R"([
+    [1, "Alice_with_a_very_long_name_to_exceed_sync_interval"],
+    [2, "Bob_with_another_very_long_name_to_exceed_sync_interval"],
+    [3, "Charlie_with_yet_another_very_long_name_to_exceed_sync"],
+    [4, "David_with_a_super_long_name_that_will_exceed_interval"],
+    [5, "Eve_with_an_extremely_long_name_to_force_new_block_here"]
+  ])";
+
+  const std::vector<std::pair</*sync_interval*/ std::string, /*num_blocks*/ 
size_t>>
+      test_cases = {{"32", 5}, {"65536", 1}};
+
+  for (const auto& [interval, num_blocks] : test_cases) {
+    WriteAvroFile(schema, json_data,
+                  {{WriterProperties::kAvroSyncInterval.key(), interval}});
+    VerifyWrittenData(json_data);
+
+    // Use raw avro-cpp reader to count blocks by tracking previousSync() 
changes
+    auto mock_io = 
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(file_io_);
+    auto input = mock_io->fs()->OpenInputFile(temp_avro_file_).ValueOrDie();
+    auto input_stream = std::make_unique<AvroInputStream>(std::move(input), 
1024 * 1024);
+    ::avro::DataFileReader<::avro::GenericDatum> 
avro_reader(std::move(input_stream));
+    ::avro::GenericDatum datum(avro_reader.dataSchema());
+
+    size_t block_count = 0;
+    int64_t last_sync = -1;
+
+    while (avro_reader.read(datum)) {
+      if (int64_t current_sync = avro_reader.previousSync(); current_sync != 
last_sync) {
+        block_count++;
+        last_sync = current_sync;
+      }
+    }
+
+    ASSERT_EQ(block_count, num_blocks);
+  }
+}
+
 // Instantiate parameterized tests for both direct encoder and GenericDatum 
paths
 INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
                          ::testing::Values(true, false),

Reply via email to