This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 40565f43 refactor: simplify avro test and use mock fs (#462)
40565f43 is described below
commit 40565f439cb325e953d168d64cdaebdaa78a598e
Author: Gang Wu <[email protected]>
AuthorDate: Wed Dec 31 14:21:22 2025 +0800
refactor: simplify avro test and use mock fs (#462)
---
src/iceberg/test/avro_test.cc | 467 +++++++++++++-----------------------------
1 file changed, 143 insertions(+), 324 deletions(-)
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index b0ed1080..f84d3c81 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -21,13 +21,7 @@
#include <arrow/array/array_base.h>
#include <arrow/c/bridge.h>
-#include <arrow/filesystem/localfs.h>
-#include <arrow/io/file.h>
#include <arrow/json/from_string.h>
-#include <avro/Compiler.hh>
-#include <avro/DataFile.hh>
-#include <avro/Generic.hh>
-#include <avro/GenericDatum.hh>
#include <gtest/gtest.h>
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
@@ -37,49 +31,50 @@
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/test/matchers.h"
-#include "iceberg/test/temp_file_test_base.h"
#include "iceberg/type.h"
namespace iceberg::avro {
-class AvroReaderTest : public TempFileTestBase {
+class AvroReaderTest : public ::testing::Test {
protected:
static void SetUpTestSuite() { RegisterAll(); }
void SetUp() override {
- TempFileTestBase::SetUp();
- local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
- file_io_ =
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
- temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ temp_avro_file_ = "avro_reader_test.avro";
}
bool skip_datum_{true};
void CreateSimpleAvroFile() {
- const std::string avro_schema_json = R"({
- "type": "record",
- "name": "TestRecord",
- "fields": [
- {"name": "id", "type": "int", "field-id": 1},
- {"name": "name", "type": ["null", "string"], "field-id": 2}
- ]
- })";
- auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
-
- const std::vector<std::tuple<int32_t, std::string>> test_data = {
- {1, "Alice"}, {2, "Bob"}, {3, "Charlie"}};
-
- ::avro::DataFileWriter<::avro::GenericDatum>
writer(temp_avro_file_.c_str(),
- avro_schema);
- for (const auto& [id, name] : test_data) {
- ::avro::GenericDatum datum(avro_schema.root());
- auto& record = datum.value<::avro::GenericRecord>();
- record.fieldAt(0).value<int32_t>() = id;
- record.fieldAt(1).selectBranch(1); // non-null
- record.fieldAt(1).value<std::string>() = name;
- writer.write(datum);
- }
- writer.close();
+ // Create simple avro file using the writer API instead of direct Avro
library
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
+
+ ArrowSchema arrow_c_schema;
+ ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ auto array = ::arrow::json::ArrayFromJSONString(
+ ::arrow::struct_(arrow_schema->fields()),
+ R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")
+ .ValueOrDie();
+
+ struct ArrowArray arrow_array;
+ auto export_result = ::arrow::ExportArray(*array, &arrow_array);
+ ASSERT_TRUE(export_result.ok());
+
+ auto writer_result =
+ WriterFactoryRegistry::Open(FileFormatType::kAvro, {
+ .path =
temp_avro_file_,
+ .schema =
schema,
+ .io = file_io_,
+ });
+ ASSERT_TRUE(writer_result.has_value());
+ auto writer = std::move(writer_result.value());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
}
void VerifyNextBatch(Reader& reader, std::string_view expected_json) {
@@ -138,17 +133,14 @@ class AvroReaderTest : public TempFileTestBase {
auto writer = std::move(writer_result.value());
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
-
- auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
- ASSERT_TRUE(file_info_result.ok());
- ASSERT_EQ(file_info_result->size(), writer->length().value());
+ ICEBERG_UNWRAP_OR_FAIL(auto written_length, writer->length());
auto reader_properties = ReaderProperties::default_properties();
reader_properties->Set(ReaderProperties::kAvroSkipDatum, skip_datum_);
auto reader_result = ReaderFactoryRegistry::Open(
FileFormatType::kAvro, {.path = temp_avro_file_,
- .length = file_info_result->size(),
+ .length = written_length,
.io = file_io_,
.projection = schema,
.properties = std::move(reader_properties)});
@@ -167,21 +159,10 @@ class AvroReaderTest : public TempFileTestBase {
}
}
- std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
std::string temp_avro_file_;
};
-// Parameterized test fixture for testing both DirectDecoder and GenericDatum
modes
-class AvroReaderParameterizedTest : public AvroReaderTest,
- public ::testing::WithParamInterface<bool>
{
- protected:
- void SetUp() override {
- AvroReaderTest::SetUp();
- skip_datum_ = GetParam();
- }
-};
-
TEST_F(AvroReaderTest, ReadTwoFields) {
CreateSimpleAvroFile();
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
@@ -238,6 +219,39 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}
+TEST_F(AvroReaderTest, BufferSizeConfiguration) {
+ // Test default buffer size
+ auto properties1 = ReaderProperties::default_properties();
+ ASSERT_EQ(properties1->Get(ReaderProperties::kAvroBufferSize), 1024 * 1024);
+
+ // Test setting custom buffer size
+ auto properties2 = ReaderProperties::default_properties();
+ constexpr int64_t kCustomBufferSize = 2 * 1024 * 1024; // 2MB
+ properties2->Set(ReaderProperties::kAvroBufferSize, kCustomBufferSize);
+ ASSERT_EQ(properties2->Get(ReaderProperties::kAvroBufferSize),
kCustomBufferSize);
+
+ // Test setting via FromMap
+ std::unordered_map<std::string, std::string> config_map = {
+ {"read.avro.buffer-size", "4194304"} // 4MB
+ };
+ auto properties3 = ReaderProperties::FromMap(config_map);
+ ASSERT_EQ(properties3->Get(ReaderProperties::kAvroBufferSize), 4194304);
+
+ // Test that unset returns to default
+ properties2->Unset(ReaderProperties::kAvroBufferSize);
+ ASSERT_EQ(properties2->Get(ReaderProperties::kAvroBufferSize), 1024 * 1024);
+}
+
+// Parameterized test fixture for testing both DirectDecoder and GenericDatum
modes
+class AvroReaderParameterizedTest : public AvroReaderTest,
+ public ::testing::WithParamInterface<bool>
{
+ protected:
+ void SetUp() override {
+ AvroReaderTest::SetUp();
+ skip_datum_ = GetParam();
+ }
+};
+
TEST_P(AvroReaderParameterizedTest, AvroWriterBasicType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});
@@ -408,20 +422,18 @@ TEST_F(AvroReaderTest, ProjectionSubsetAndReorder) {
auto writer = std::move(writer_result.value());
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto written_length, writer->length());
// Read with projected schema: subset of columns (city, id) in different
order
auto read_schema =
std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(4, "city", std::make_shared<StringType>()),
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>())});
- auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
- ASSERT_TRUE(file_info_result.ok());
-
- auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
- {.path = temp_avro_file_,
- .length =
file_info_result->size(),
- .io = file_io_,
- .projection =
read_schema});
+ auto reader_result =
+ ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path =
temp_avro_file_,
+ .length =
written_length,
+ .io = file_io_,
+ .projection =
read_schema});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
@@ -503,15 +515,16 @@ INSTANTIATE_TEST_SUITE_P(DirectDecoderModes,
AvroReaderParameterizedTest,
return info.param ? "DirectDecoder" :
"GenericDatum";
});
-class AvroWriterTest : public TempFileTestBase {
+// Parameterized test fixture for testing both direct encoder and GenericDatum
modes
+class AvroWriterTest : public ::testing::Test,
+ public ::testing::WithParamInterface<bool> {
protected:
static void SetUpTestSuite() { RegisterAll(); }
void SetUp() override {
- TempFileTestBase::SetUp();
- local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
- file_io_ =
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
- temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ temp_avro_file_ = "avro_writer_test.avro";
+ skip_datum_ = GetParam();
}
void WriteAvroFile(std::shared_ptr<Schema> schema, const std::string&
json_data) {
@@ -543,41 +556,62 @@ class AvroWriterTest : public TempFileTestBase {
.metadata = metadata,
.properties = std::move(writer_properties)});
ASSERT_TRUE(writer_result.has_value());
- auto writer = std::move(writer_result.value());
- ASSERT_THAT(writer->Write(&arrow_array), IsOk());
- ASSERT_THAT(writer->Close(), IsOk());
+ writer_ = std::move(writer_result.value());
+ ASSERT_THAT(writer_->Write(&arrow_array), IsOk());
+ ASSERT_THAT(writer_->Close(), IsOk());
+ write_schema_ = schema;
}
- template <typename VerifyFunc>
- void VerifyAvroFileContent(VerifyFunc verify_func) {
- ::avro::DataFileReader<::avro::GenericDatum>
reader(temp_avro_file_.c_str());
- ::avro::GenericDatum datum(reader.dataSchema());
+ void VerifyNextBatch(Reader& reader, std::string_view expected_json) {
+ // Boilerplate to get Arrow schema
+ auto schema_result = reader.Schema();
+ ASSERT_THAT(schema_result, IsOk());
+ auto arrow_c_schema = std::move(schema_result.value());
+ auto import_schema_result = ::arrow::ImportType(&arrow_c_schema);
+ auto arrow_schema = import_schema_result.ValueOrDie();
- size_t row_count = 0;
- while (reader.read(datum)) {
- verify_func(datum, row_count);
- row_count++;
- }
- reader.close();
+ // Boilerplate to get Arrow array
+ auto data = reader.Next();
+ ASSERT_THAT(data, IsOk());
+ ASSERT_TRUE(data.value().has_value());
+ auto arrow_c_array = data.value().value();
+ auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema);
+ auto arrow_array = data_result.ValueOrDie();
+
+ // Verify data
+ auto expected_array =
+ ::arrow::json::ArrayFromJSONString(arrow_schema,
expected_json).ValueOrDie();
+ ASSERT_TRUE(arrow_array->Equals(*expected_array));
+ }
+
+ void VerifyExhausted(Reader& reader) {
+ auto data = reader.Next();
+ ASSERT_THAT(data, IsOk());
+ ASSERT_FALSE(data.value().has_value());
+ }
+
+ void VerifyWrittenData(const std::string& expected_json) {
+ ICEBERG_UNWRAP_OR_FAIL(auto written_length, writer_->length());
+
+ auto reader_result =
+ ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path =
temp_avro_file_,
+ .length =
written_length,
+ .io = file_io_,
+ .projection =
write_schema_});
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+ ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_json));
+ ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}
- std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
std::string temp_avro_file_;
bool skip_datum_{true};
+ std::shared_ptr<Schema> write_schema_;
+ std::unique_ptr<Writer> writer_;
};
-// Parameterized test fixture for testing both direct encoder and GenericDatum
modes
-class AvroWriterParameterizedTest : public AvroWriterTest,
- public ::testing::WithParamInterface<bool>
{
- protected:
- void SetUp() override {
- AvroWriterTest::SetUp();
- skip_datum_ = GetParam();
- }
-};
-
-TEST_P(AvroWriterParameterizedTest, WritePrimitiveTypes) {
+TEST_P(AvroWriterTest, WritePrimitiveTypes) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "bool_col",
std::make_shared<BooleanType>()),
SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()),
@@ -592,31 +626,10 @@ TEST_P(AvroWriterParameterizedTest, WritePrimitiveTypes) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 6);
-
- if (row_idx == 0) {
- EXPECT_TRUE(record.fieldAt(0).value<bool>());
- EXPECT_EQ(record.fieldAt(1).value<int32_t>(), 42);
- EXPECT_EQ(record.fieldAt(2).value<int64_t>(), 1234567890);
- EXPECT_FLOAT_EQ(record.fieldAt(3).value<float>(), 3.14f);
- EXPECT_DOUBLE_EQ(record.fieldAt(4).value<double>(), 2.71828);
- EXPECT_EQ(record.fieldAt(5).value<std::string>(), "hello");
- } else if (row_idx == 1) {
- EXPECT_FALSE(record.fieldAt(0).value<bool>());
- EXPECT_EQ(record.fieldAt(1).value<int32_t>(), -100);
- EXPECT_EQ(record.fieldAt(2).value<int64_t>(), -9876543210);
- EXPECT_FLOAT_EQ(record.fieldAt(3).value<float>(), -1.5f);
- EXPECT_DOUBLE_EQ(record.fieldAt(4).value<double>(), 0.0);
- EXPECT_EQ(record.fieldAt(5).value<std::string>(), "world");
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteTemporalTypes) {
+TEST_P(AvroWriterTest, WriteTemporalTypes) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()),
SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()),
@@ -628,25 +641,10 @@ TEST_P(AvroWriterParameterizedTest, WriteTemporalTypes) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 3);
-
- if (row_idx == 0) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 18628);
- EXPECT_EQ(record.fieldAt(1).value<int64_t>(), 43200000000);
- EXPECT_EQ(record.fieldAt(2).value<int64_t>(), 1640995200000000);
- } else if (row_idx == 1) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 18629);
- EXPECT_EQ(record.fieldAt(1).value<int64_t>(), 86399000000);
- EXPECT_EQ(record.fieldAt(2).value<int64_t>(), 1641081599000000);
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteNestedStruct) {
+TEST_P(AvroWriterTest, WriteNestedStruct) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeRequired(
@@ -661,27 +659,10 @@ TEST_P(AvroWriterParameterizedTest, WriteNestedStruct) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 2);
-
- if (row_idx == 0) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 1);
- const auto& person = record.fieldAt(1).value<::avro::GenericRecord>();
- EXPECT_EQ(person.fieldAt(0).value<std::string>(), "Alice");
- EXPECT_EQ(person.fieldAt(1).value<int32_t>(), 30);
- } else if (row_idx == 1) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 2);
- const auto& person = record.fieldAt(1).value<::avro::GenericRecord>();
- EXPECT_EQ(person.fieldAt(0).value<std::string>(), "Bob");
- EXPECT_EQ(person.fieldAt(1).value<int32_t>(), 25);
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteListType) {
+TEST_P(AvroWriterTest, WriteListType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeRequired(2, "tags",
@@ -695,34 +676,10 @@ TEST_P(AvroWriterParameterizedTest, WriteListType) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 2);
-
- if (row_idx == 0) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 1);
- const auto& tags = record.fieldAt(1).value<::avro::GenericArray>();
- ASSERT_EQ(tags.value().size(), 3);
- EXPECT_EQ(tags.value()[0].value<std::string>(), "tag1");
- EXPECT_EQ(tags.value()[1].value<std::string>(), "tag2");
- EXPECT_EQ(tags.value()[2].value<std::string>(), "tag3");
- } else if (row_idx == 1) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 2);
- const auto& tags = record.fieldAt(1).value<::avro::GenericArray>();
- ASSERT_EQ(tags.value().size(), 2);
- EXPECT_EQ(tags.value()[0].value<std::string>(), "foo");
- EXPECT_EQ(tags.value()[1].value<std::string>(), "bar");
- } else if (row_idx == 2) {
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), 3);
- const auto& tags = record.fieldAt(1).value<::avro::GenericArray>();
- EXPECT_EQ(tags.value().size(), 0);
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteMapTypeWithStringKey) {
+TEST_P(AvroWriterTest, WriteMapTypeWithStringKey) {
auto schema = std::make_shared<iceberg::Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(
1, "properties",
@@ -736,53 +693,10 @@ TEST_P(AvroWriterParameterizedTest,
WriteMapTypeWithStringKey) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 1);
-
- const auto& map = record.fieldAt(0).value<::avro::GenericMap>();
- const auto& map_value = map.value();
- if (row_idx == 0) {
- ASSERT_EQ(map_value.size(), 2);
- // Find entries by key
- bool found_key1 = false;
- bool found_key2 = false;
- for (const auto& entry : map_value) {
- if (entry.first == "key1") {
- EXPECT_EQ(entry.second.value<int32_t>(), 100);
- found_key1 = true;
- } else if (entry.first == "key2") {
- EXPECT_EQ(entry.second.value<int32_t>(), 200);
- found_key2 = true;
- }
- }
- EXPECT_TRUE(found_key1 && found_key2);
- } else if (row_idx == 1) {
- ASSERT_EQ(map_value.size(), 3);
- // Find entries by key
- bool found_a = false;
- bool found_b = false;
- bool found_c = false;
- for (const auto& entry : map_value) {
- if (entry.first == "a") {
- EXPECT_EQ(entry.second.value<int32_t>(), 1);
- found_a = true;
- } else if (entry.first == "b") {
- EXPECT_EQ(entry.second.value<int32_t>(), 2);
- found_b = true;
- } else if (entry.first == "c") {
- EXPECT_EQ(entry.second.value<int32_t>(), 3);
- found_c = true;
- }
- }
- EXPECT_TRUE(found_a && found_b && found_c);
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteMapTypeWithNonStringKey) {
+TEST_P(AvroWriterTest, WriteMapTypeWithNonStringKey) {
auto schema = std::make_shared<iceberg::Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(
1, "int_map",
@@ -796,43 +710,10 @@ TEST_P(AvroWriterParameterizedTest,
WriteMapTypeWithNonStringKey) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 1);
-
- // Maps with non-string keys are encoded as arrays of key-value records in
Avro
- const auto& array = record.fieldAt(0).value<::avro::GenericArray>();
- if (row_idx == 0) {
- ASSERT_EQ(array.value().size(), 3);
-
- const auto& entry0 = array.value()[0].value<::avro::GenericRecord>();
- EXPECT_EQ(entry0.fieldAt(0).value<int32_t>(), 1);
- EXPECT_EQ(entry0.fieldAt(1).value<std::string>(), "one");
-
- const auto& entry1 = array.value()[1].value<::avro::GenericRecord>();
- EXPECT_EQ(entry1.fieldAt(0).value<int32_t>(), 2);
- EXPECT_EQ(entry1.fieldAt(1).value<std::string>(), "two");
-
- const auto& entry2 = array.value()[2].value<::avro::GenericRecord>();
- EXPECT_EQ(entry2.fieldAt(0).value<int32_t>(), 3);
- EXPECT_EQ(entry2.fieldAt(1).value<std::string>(), "three");
- } else if (row_idx == 1) {
- ASSERT_EQ(array.value().size(), 2);
-
- const auto& entry0 = array.value()[0].value<::avro::GenericRecord>();
- EXPECT_EQ(entry0.fieldAt(0).value<int32_t>(), 10);
- EXPECT_EQ(entry0.fieldAt(1).value<std::string>(), "ten");
-
- const auto& entry1 = array.value()[1].value<::avro::GenericRecord>();
- EXPECT_EQ(entry1.fieldAt(0).value<int32_t>(), 20);
- EXPECT_EQ(entry1.fieldAt(1).value<std::string>(), "twenty");
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteEmptyMaps) {
+TEST_P(AvroWriterTest, WriteEmptyMaps) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(
1, "string_map",
@@ -852,10 +733,11 @@ TEST_P(AvroWriterParameterizedTest, WriteEmptyMaps) {
])";
// Just verify writing succeeds (empty maps are handled correctly by the
encoder)
- ASSERT_NO_FATAL_FAILURE(WriteAvroFile(schema, test_data));
+ WriteAvroFile(schema, test_data);
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteOptionalFields) {
+TEST_P(AvroWriterTest, WriteOptionalFields) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()),
@@ -869,35 +751,10 @@ TEST_P(AvroWriterParameterizedTest, WriteOptionalFields) {
])";
WriteAvroFile(schema, test_data);
-
- VerifyAvroFileContent([](const ::avro::GenericDatum& datum, size_t row_idx) {
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 3);
-
- EXPECT_EQ(record.fieldAt(0).value<int32_t>(), static_cast<int32_t>(row_idx
+ 1));
-
- if (row_idx == 0) {
- EXPECT_EQ(record.fieldAt(1).unionBranch(), 1); // non-null
- EXPECT_EQ(record.fieldAt(1).value<std::string>(), "Alice");
- EXPECT_EQ(record.fieldAt(2).unionBranch(), 1); // non-null
- EXPECT_EQ(record.fieldAt(2).value<int32_t>(), 30);
- } else if (row_idx == 1) {
- EXPECT_EQ(record.fieldAt(1).unionBranch(), 0); // null
- EXPECT_EQ(record.fieldAt(2).unionBranch(), 1); // non-null
- EXPECT_EQ(record.fieldAt(2).value<int32_t>(), 25);
- } else if (row_idx == 2) {
- EXPECT_EQ(record.fieldAt(1).unionBranch(), 1); // non-null
- EXPECT_EQ(record.fieldAt(1).value<std::string>(), "Charlie");
- EXPECT_EQ(record.fieldAt(2).unionBranch(), 0); // null
- } else if (row_idx == 3) {
- EXPECT_EQ(record.fieldAt(1).unionBranch(), 0); // null
- EXPECT_EQ(record.fieldAt(2).unionBranch(), 0); // null
- }
- });
+ VerifyWrittenData(test_data);
}
-TEST_P(AvroWriterParameterizedTest, WriteLargeDataset) {
+TEST_P(AvroWriterTest, WriteLargeDataset) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<LongType>()),
SchemaField::MakeRequired(2, "value", std::make_shared<DoubleType>())});
@@ -912,52 +769,14 @@ TEST_P(AvroWriterParameterizedTest, WriteLargeDataset) {
json << "]";
WriteAvroFile(schema, json.str());
-
- size_t expected_row_count = 1000;
- size_t actual_row_count = 0;
-
- VerifyAvroFileContent([&](const ::avro::GenericDatum& datum, size_t row_idx)
{
- ASSERT_EQ(datum.type(), ::avro::AVRO_RECORD);
- const auto& record = datum.value<::avro::GenericRecord>();
- ASSERT_EQ(record.fieldCount(), 2);
-
- EXPECT_EQ(record.fieldAt(0).value<int64_t>(),
static_cast<int64_t>(row_idx));
- EXPECT_DOUBLE_EQ(record.fieldAt(1).value<double>(), row_idx * 1.5);
-
- actual_row_count++;
- });
-
- EXPECT_EQ(actual_row_count, expected_row_count);
+ VerifyWrittenData(json.str());
}
// Instantiate parameterized tests for both direct encoder and GenericDatum
paths
-INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterParameterizedTest,
+INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
::testing::Values(true, false),
[](const ::testing::TestParamInfo<bool>& info) {
return info.param ? "DirectEncoder" :
"GenericDatum";
});
-TEST_F(AvroReaderTest, BufferSizeConfiguration) {
- // Test default buffer size
- auto properties1 = ReaderProperties::default_properties();
- ASSERT_EQ(properties1->Get(ReaderProperties::kAvroBufferSize), 1024 * 1024);
-
- // Test setting custom buffer size
- auto properties2 = ReaderProperties::default_properties();
- constexpr int64_t kCustomBufferSize = 2 * 1024 * 1024; // 2MB
- properties2->Set(ReaderProperties::kAvroBufferSize, kCustomBufferSize);
- ASSERT_EQ(properties2->Get(ReaderProperties::kAvroBufferSize),
kCustomBufferSize);
-
- // Test setting via FromMap
- std::unordered_map<std::string, std::string> config_map = {
- {"read.avro.buffer-size", "4194304"} // 4MB
- };
- auto properties3 = ReaderProperties::FromMap(config_map);
- ASSERT_EQ(properties3->Get(ReaderProperties::kAvroBufferSize), 4194304);
-
- // Test that unset returns to default
- properties2->Unset(ReaderProperties::kAvroBufferSize);
- ASSERT_EQ(properties2->Get(ReaderProperties::kAvroBufferSize), 1024 * 1024);
-}
-
} // namespace iceberg::avro