Copilot commented on code in PR #88: URL: https://github.com/apache/paimon-cpp/pull/88#discussion_r3425381329
########## src/paimon/format/avro/avro_writer_builder.h: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <utility> Review Comment: This header uses `std::optional` but does not include `<optional>`, which can cause compilation failures depending on transitive includes. Also, `batch_size` is unused in the inline constructor, which commonly triggers `-Wunused-parameter` (often treated as error). Add `#include <optional>` and either remove the parameter name (if it must stay for signature compatibility) or explicitly mark it unused (e.g., cast to void) in the constructor body. ########## src/paimon/format/avro/avro_file_batch_reader.cpp: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_file_batch_reader.h" + +#include <memory> +#include <utility> + +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/format/avro/avro_input_stream_impl.h" +#include "paimon/format/avro/avro_schema_converter.h" +#include "paimon/reader/batch_reader.h" + +namespace paimon::avro { + +AvroFileBatchReader::AvroFileBatchReader(const std::shared_ptr<InputStream>& input_stream, + const std::shared_ptr<::arrow::DataType>& file_data_type, + std::unique_ptr<::avro::DataFileReaderBase>&& reader, + std::unique_ptr<arrow::ArrayBuilder>&& array_builder, + std::unique_ptr<arrow::MemoryPool>&& arrow_pool, + int32_t batch_size, + const std::shared_ptr<MemoryPool>& pool) + : pool_(pool), + arrow_pool_(std::move(arrow_pool)), + input_stream_(input_stream), + file_data_type_(file_data_type), + reader_(std::move(reader)), + array_builder_(std::move(array_builder)), + batch_size_(batch_size), + metrics_(std::make_shared<MetricsImpl>()) {} + +AvroFileBatchReader::~AvroFileBatchReader() { + DoClose(); +} + +void AvroFileBatchReader::DoClose() { + if (!close_) { + reader_->close(); + close_ = true; + } +} + +Result<std::unique_ptr<AvroFileBatchReader>> AvroFileBatchReader::Create( + const std::shared_ptr<InputStream>& input_stream, int32_t batch_size, + const std::shared_ptr<MemoryPool>& pool) { + if (batch_size <= 0) { + return Status::Invalid( + fmt::format("invalid batch size {}, must be larger than 0", batch_size)); + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::DataFileReaderBase> reader, + CreateDataFileReader(input_stream, pool)); + const auto& avro_file_schema = reader->dataSchema(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::DataType> file_data_type, + AvroSchemaConverter::AvroSchemaToArrowDataType(avro_file_schema)); + auto arrow_pool = GetArrowPool(pool); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr<arrow::ArrayBuilder> array_builder, + arrow::MakeBuilder(file_data_type, arrow_pool.get())); + return std::unique_ptr<AvroFileBatchReader>( + new AvroFileBatchReader(input_stream, file_data_type, std::move(reader), + std::move(array_builder), std::move(arrow_pool), batch_size, pool)); +} + +Result<std::unique_ptr<::avro::DataFileReaderBase>> AvroFileBatchReader::CreateDataFileReader( + const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<MemoryPool>& pool) { + PAIMON_RETURN_NOT_OK(input_stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + try { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::InputStream> in, + AvroInputStreamImpl::Create(input_stream, BUFFER_SIZE, pool)); + auto reader = std::make_unique<::avro::DataFileReaderBase>(std::move(in)); + reader->init(); + return reader; + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("build avro reader failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("build avro reader failed. {}", e.what())); + } catch (...) { + return Status::Invalid("build avro reader failed. unknown error"); + } +} + +Result<BatchReader::ReadBatch> AvroFileBatchReader::NextBatch() { + if (next_row_to_read_ == std::numeric_limits<uint64_t>::max()) { + next_row_to_read_ = 0; + } + try { + while (array_builder_->length() < batch_size_) { + if (!reader_->hasMore()) { + break; + } + reader_->decr(); + PAIMON_RETURN_NOT_OK(AvroDirectDecoder::DecodeAvroToBuilder( + reader_->dataSchema().root(), read_fields_projection_, &reader_->decoder(), + array_builder_.get(), &decode_context_)); + } + previous_first_row_ = next_row_to_read_; + next_row_to_read_ += array_builder_->length(); + if (array_builder_->length() == 0) { + return BatchReader::MakeEofBatch(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array, + array_builder_->Finish()); + std::unique_ptr<ArrowArray> c_array = std::make_unique<ArrowArray>(); + std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + return make_pair(std::move(c_array), std::move(c_schema)); Review Comment: `make_pair` is used without a `std::` qualifier and there is no visible `using std::make_pair;` in this file. This is a build-breaking compile error. Change to `std::make_pair(...)`. ########## src/paimon/format/avro/avro_writer_builder.h: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <utility> + +#include "avro/DataFile.hh" +#include "avro/Stream.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/format/avro/avro_format_defs.h" +#include "paimon/format/avro/avro_format_writer.h" +#include "paimon/format/avro/avro_output_stream_impl.h" +#include "paimon/format/writer_builder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow +namespace paimon { +class FormatWriter; +class OutputStream; +} // namespace paimon + +namespace paimon::avro { + +class AvroWriterBuilder : public WriterBuilder { + public: + AvroWriterBuilder(const std::shared_ptr<arrow::Schema>& schema, int32_t batch_size, + const std::map<std::string, std::string>& options) + : pool_(GetDefaultPool()), schema_(schema), options_(options) {} + + WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) override { + pool_ = pool; + return this; + } + + Result<std::unique_ptr<FormatWriter>> Build(const std::shared_ptr<OutputStream>& out, + const std::string& compression) override { + auto output_stream = std::make_unique<AvroOutputStreamImpl>(out, BUFFER_SIZE, pool_); + PAIMON_ASSIGN_OR_RAISE( + std::string file_compression, + OptionsUtils::GetValueFromMap<std::string>(options_, AVRO_CODEC, compression)); + PAIMON_ASSIGN_OR_RAISE(::avro::Codec codec, + ToAvroCompressionKind(StringUtils::ToLowerCase(file_compression))); + PAIMON_ASSIGN_OR_RAISE(std::optional<int32_t> compression_level, + GetAvroCompressionLevel(codec)); + return AvroFormatWriter::Create(std::move(output_stream), schema_, codec, + compression_level); + } + + private: + static constexpr int32_t BUFFER_SIZE = 1024 * 1024; + + static Result<::avro::Codec> ToAvroCompressionKind(const std::string& file_compression) { Review Comment: `ToAvroCompressionKind` and `GetAvroCompressionLevel` are `private`, but the new tests call them directly. Either adjust tests to validate behavior through the public `Build()` API, or make these helpers part of a public/`protected` API intended for reuse/testing (e.g., `public` static for the codec mapping). Keeping them private while tests depend on them will break builds. ########## src/paimon/format/avro/avro_file_batch_reader.cpp: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_file_batch_reader.h" + +#include <memory> +#include <utility> + +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/format/avro/avro_input_stream_impl.h" +#include "paimon/format/avro/avro_schema_converter.h" +#include "paimon/reader/batch_reader.h" + +namespace paimon::avro { + +AvroFileBatchReader::AvroFileBatchReader(const std::shared_ptr<InputStream>& input_stream, + const std::shared_ptr<::arrow::DataType>& file_data_type, + std::unique_ptr<::avro::DataFileReaderBase>&& reader, + std::unique_ptr<arrow::ArrayBuilder>&& array_builder, + std::unique_ptr<arrow::MemoryPool>&& arrow_pool, + int32_t batch_size, + const std::shared_ptr<MemoryPool>& pool) + : pool_(pool), + arrow_pool_(std::move(arrow_pool)), + input_stream_(input_stream), + file_data_type_(file_data_type), + reader_(std::move(reader)), + array_builder_(std::move(array_builder)), + batch_size_(batch_size), + metrics_(std::make_shared<MetricsImpl>()) {} + +AvroFileBatchReader::~AvroFileBatchReader() { + DoClose(); +} + +void AvroFileBatchReader::DoClose() { + if (!close_) { + reader_->close(); + close_ = true; + } +} + +Result<std::unique_ptr<AvroFileBatchReader>> AvroFileBatchReader::Create( + const std::shared_ptr<InputStream>& input_stream, int32_t batch_size, + const std::shared_ptr<MemoryPool>& pool) { + if (batch_size <= 0) { + return Status::Invalid( + fmt::format("invalid batch size {}, must be larger than 0", batch_size)); + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::DataFileReaderBase> reader, + CreateDataFileReader(input_stream, pool)); + const auto& avro_file_schema = reader->dataSchema(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::DataType> file_data_type, + AvroSchemaConverter::AvroSchemaToArrowDataType(avro_file_schema)); + auto arrow_pool = GetArrowPool(pool); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr<arrow::ArrayBuilder> array_builder, + arrow::MakeBuilder(file_data_type, arrow_pool.get())); + return std::unique_ptr<AvroFileBatchReader>( + new AvroFileBatchReader(input_stream, file_data_type, std::move(reader), + std::move(array_builder), std::move(arrow_pool), batch_size, pool)); +} + +Result<std::unique_ptr<::avro::DataFileReaderBase>> AvroFileBatchReader::CreateDataFileReader( + const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<MemoryPool>& pool) { + PAIMON_RETURN_NOT_OK(input_stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + try { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::InputStream> in, + AvroInputStreamImpl::Create(input_stream, BUFFER_SIZE, pool)); + auto reader = std::make_unique<::avro::DataFileReaderBase>(std::move(in)); + reader->init(); + return reader; + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("build avro reader failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("build avro reader failed. {}", e.what())); + } catch (...) { + return Status::Invalid("build avro reader failed. unknown error"); + } +} + +Result<BatchReader::ReadBatch> AvroFileBatchReader::NextBatch() { + if (next_row_to_read_ == std::numeric_limits<uint64_t>::max()) { + next_row_to_read_ = 0; + } + try { + while (array_builder_->length() < batch_size_) { + if (!reader_->hasMore()) { + break; + } + reader_->decr(); + PAIMON_RETURN_NOT_OK(AvroDirectDecoder::DecodeAvroToBuilder( + reader_->dataSchema().root(), read_fields_projection_, &reader_->decoder(), + array_builder_.get(), &decode_context_)); + } + previous_first_row_ = next_row_to_read_; + next_row_to_read_ += array_builder_->length(); + if (array_builder_->length() == 0) { + return BatchReader::MakeEofBatch(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array, + array_builder_->Finish()); + std::unique_ptr<ArrowArray> c_array = std::make_unique<ArrowArray>(); + std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + return make_pair(std::move(c_array), std::move(c_schema)); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro reader next batch failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro reader next batch failed. {}", e.what())); + } catch (...) { + return Status::Invalid("avro reader next batch failed. unknown error"); + } +} + +Status AvroFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, + const std::shared_ptr<Predicate>& predicate, + const std::optional<RoaringBitmap32>& selection_bitmap) { + if (!read_schema) { + return Status::Invalid("SetReadSchema failed: read schema cannot be nullptr"); + } + // TODO(menglingda.mld): support predicate + if (selection_bitmap) { + // TODO(menglingda.mld): support bitmap + } + previous_first_row_ = std::numeric_limits<uint64_t>::max(); + next_row_to_read_ = std::numeric_limits<uint64_t>::max(); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> arrow_read_schema, + arrow::ImportSchema(read_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema, + ArrowUtils::DataTypeToSchema(file_data_type_)); + PAIMON_ASSIGN_OR_RAISE(read_fields_projection_, + CalculateReadFieldsProjection(file_schema, arrow_read_schema->fields())); + array_builder_->Reset(); + std::shared_ptr<::arrow::DataType> read_data_type = arrow::struct_(arrow_read_schema->fields()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(array_builder_, + arrow::MakeBuilder(read_data_type, arrow_pool_.get())); + return Status::OK(); +} + +Result<std::set<size_t>> AvroFileBatchReader::CalculateReadFieldsProjection( + const std::shared_ptr<::arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) { + std::set<size_t> projection_set; + PAIMON_ASSIGN_OR_RAISE(std::vector<int32_t> projection, + ArrowUtils::CreateProjection(file_schema, read_fields)); + int32_t prev_index = -1; + for (auto& index : projection) { + if (index <= prev_index) { + return Status::Invalid( + "SetReadSchema failed: read schema fields order is different from file schema"); + } + prev_index = index; + projection_set.insert(index); + } + return projection_set; +} + +Result<std::unique_ptr<::ArrowSchema>> AvroFileBatchReader::GetFileSchema() const { + assert(reader_); + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*file_data_type_, c_schema.get())); + return c_schema; +} + +Result<uint64_t> AvroFileBatchReader::GetNumberOfRows() const { + if (!total_rows_) { + PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, input_stream_->GetPos()); + ScopeGuard stream_guard([this, current_pos]() -> void { + // reset input stream position to original position + Status status = input_stream_->Seek(current_pos, SeekOrigin::FS_SEEK_SET); + (void)status; + }); Review Comment: The cleanup path explicitly ignores `Seek()` failures when restoring the input stream position. If the seek fails, subsequent reads may operate on the wrong position, producing incorrect results and making errors hard to diagnose. Consider capturing the failure in a way the caller can observe (e.g., store an internal error state, or avoid reusing the same stream by creating a separate stream instance for `GetNumberOfRows()` if supported by the filesystem abstraction). ########## src/paimon/format/avro/avro_file_batch_reader_test.cpp: ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_file_batch_reader.h" + +#include <memory> +#include <string> +#include <utility> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/api.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/testing/utils/timezone_guard.h" + +namespace paimon::avro::test { + +class AvroFileBatchReaderTest : public ::testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + ASSERT_OK_AND_ASSIGN(file_format_, + FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}})); + fs_ = std::make_shared<LocalFileSystem>(); + dir_ = ::paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + pool_ = GetDefaultPool(); + } + void TearDown() override {} + + void WriteData(const std::shared_ptr<arrow::Array>& src_array, const std::string& file_path, + const std::string& compression) { + arrow::Schema src_schema(src_array->type()->fields()); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(src_schema, &c_schema).ok()); + ASSERT_OK_AND_ASSIGN(auto writer_builder, + file_format_->CreateWriterBuilder(&c_schema, /*batch_size=*/-1)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out, + fs_->Create(file_path, /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, compression)); + + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*src_array, &arrow_array).ok()); + ASSERT_OK(writer->AddBatch(&arrow_array)); + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + } + + std::pair<std::unique_ptr<FileBatchReader>, std::shared_ptr<arrow::ChunkedArray>> ReadData( + const std::string& file_path, int32_t read_batch_size) { + EXPECT_OK_AND_ASSIGN(auto reader_builder, + file_format_->CreateReaderBuilder(read_batch_size)); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file_path)); + EXPECT_OK_AND_ASSIGN(auto batch_reader, reader_builder->Build(in)); + EXPECT_OK_AND_ASSIGN(auto result_array, ::paimon::test::ReadResultCollector::CollectResult( + batch_reader.get())); + return std::make_pair(std::move(batch_reader), result_array); + } + + private: + std::shared_ptr<MemoryPool> pool_; + std::shared_ptr<FileFormat> file_format_; + std::shared_ptr<FileSystem> fs_; + std::unique_ptr<paimon::test::UniqueTestDirectory> dir_; +}; + +TEST_F(AvroFileBatchReaderTest, TestReadDataWithNull) { + std::string path = paimon::test::GetDataDir() + "/avro/data/avro_with_null"; + auto [reader_holder, result_array] = ReadData(path, /*read_batch_size=*/1024); + + arrow::FieldVector fields = { + arrow::field("_KEY_f0", arrow::utf8(), /*nullable=*/true), + arrow::field("_SEQUENCE_NUMBER", arrow::int64(), /*nullable=*/true), + arrow::field("_VALUE_KIND", arrow::int32(), /*nullable=*/true), + arrow::field("f0", arrow::utf8(), /*nullable=*/true), + arrow::field("f1", arrow::utf8(), /*nullable=*/true), + arrow::field("f2", arrow::int32(), /*nullable=*/true), + arrow::field("f3", arrow::float64(), /*nullable=*/true)}; + + auto arrow_data_type = arrow::struct_(fields); + + std::shared_ptr<arrow::ChunkedArray> expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([ + ["Alex", 2, 3, "Alex", "20250326", 18, 10.1], + ["Bob", 3, 3, "Bob", "20250326", 19, 11.1], + ["Evan", 1, 0, "Evan", "20250326", null, 14.1] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()) << array_status.ToString(); + ASSERT_TRUE(result_array->Equals(expected_array)); + ASSERT_TRUE(expected_array->Equals(result_array)); + auto read_metrics = reader_holder->GetReaderMetrics(); + ASSERT_TRUE(read_metrics); +} + +TEST_F(AvroFileBatchReaderTest, TestReadWithDifferentBatchSize) { + std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro"); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int64()), arrow::field("f3", arrow::float32()), + arrow::field("f4", arrow::float64()), arrow::field("f5", arrow::utf8()), + arrow::field("f6", arrow::binary())}; + auto arrow_data_type = arrow::struct_(fields); + + size_t length = 600; + std::string data_str = "["; + for (size_t i = 0; i < length; i++) { + if (i % 3 == 0) { + data_str.append(fmt::format(R"([{}, {}, {}, {}, {}, "str_{}", "bin_{}"])", "true", i, + i * 100000000000L, i * 0.12, i * 123.45678901, i, i)); + } else if (i % 3 == 1) { + data_str.append(fmt::format(R"([{}, -{}, -{}, -{}, -{}, "string_{}", "binary_{}"])", Review Comment: This test file uses `fmt::format` but does not include the corresponding header (`fmt/format.h`). Relying on transitive includes is brittle and can break builds depending on include order. Add an explicit `#include \"fmt/format.h\"`. ########## src/paimon/format/avro/avro_writer_builder.h: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <utility> + +#include "avro/DataFile.hh" +#include "avro/Stream.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/format/avro/avro_format_defs.h" +#include "paimon/format/avro/avro_format_writer.h" +#include "paimon/format/avro/avro_output_stream_impl.h" +#include "paimon/format/writer_builder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow +namespace paimon { +class FormatWriter; +class OutputStream; +} // namespace paimon + +namespace paimon::avro { + +class AvroWriterBuilder : public WriterBuilder { + public: + AvroWriterBuilder(const std::shared_ptr<arrow::Schema>& schema, int32_t batch_size, + const std::map<std::string, std::string>& options) + : pool_(GetDefaultPool()), schema_(schema), options_(options) {} + + WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) override { + pool_ = pool; + return this; + } + + Result<std::unique_ptr<FormatWriter>> Build(const std::shared_ptr<OutputStream>& out, + const std::string& compression) override { + auto output_stream = std::make_unique<AvroOutputStreamImpl>(out, BUFFER_SIZE, pool_); + PAIMON_ASSIGN_OR_RAISE( + std::string file_compression, + OptionsUtils::GetValueFromMap<std::string>(options_, AVRO_CODEC, compression)); + PAIMON_ASSIGN_OR_RAISE(::avro::Codec codec, + ToAvroCompressionKind(StringUtils::ToLowerCase(file_compression))); + PAIMON_ASSIGN_OR_RAISE(std::optional<int32_t> compression_level, + GetAvroCompressionLevel(codec)); + return AvroFormatWriter::Create(std::move(output_stream), schema_, codec, + compression_level); + } + + private: + static constexpr int32_t BUFFER_SIZE = 1024 * 1024; + + static Result<::avro::Codec> ToAvroCompressionKind(const std::string& file_compression) { + if (file_compression == "zstd" || file_compression == "zstandard") { + return ::avro::Codec::ZSTD_CODEC; + } else if (file_compression == "snappy") { + return ::avro::Codec::SNAPPY_CODEC; + } else if (file_compression == "null" || file_compression == "none") { + return ::avro::Codec::NULL_CODEC; + } else if (file_compression == "deflate") { + return ::avro::Codec::DEFLATE_CODEC; + } else { + return Status::Invalid("unknown compression " + file_compression); + } + } + Result<std::optional<int32_t>> GetAvroCompressionLevel(const ::avro::Codec& codec) { Review Comment: This header uses `std::optional` but does not include `<optional>`, which can cause compilation failures depending on transitive includes. Also, `batch_size` is unused in the inline constructor, which commonly triggers `-Wunused-parameter` (often treated as error). Add `#include <optional>` and either remove the parameter name (if it must stay for signature compatibility) or explicitly mark it unused (e.g., cast to void) in the constructor body. ########## src/paimon/format/avro/avro_writer_builder.h: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <utility> + +#include "avro/DataFile.hh" +#include "avro/Stream.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/format/avro/avro_format_defs.h" +#include "paimon/format/avro/avro_format_writer.h" +#include "paimon/format/avro/avro_output_stream_impl.h" +#include "paimon/format/writer_builder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow +namespace paimon { +class FormatWriter; +class OutputStream; +} // namespace paimon + +namespace paimon::avro { + +class AvroWriterBuilder : public WriterBuilder { + public: + AvroWriterBuilder(const std::shared_ptr<arrow::Schema>& schema, int32_t batch_size, + const std::map<std::string, std::string>& options) + : pool_(GetDefaultPool()), schema_(schema), options_(options) {} Review Comment: This header uses `std::optional` but does not include `<optional>`, which can cause compilation failures depending on transitive includes. Also, `batch_size` is unused in the inline constructor, which commonly triggers `-Wunused-parameter` (often treated as error). Add `#include <optional>` and either remove the parameter name (if it must stay for signature compatibility) or explicitly mark it unused (e.g., cast to void) in the constructor body. ########## src/paimon/format/avro/avro_writer_builder_test.cpp: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_writer_builder.h" + +#include "avro/DataFile.hh" +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +TEST(AvroWriterBuilderTest, HandlesValidCompressions) { + ASSERT_OK_AND_ASSIGN(::avro::Codec zstd_codec, + AvroWriterBuilder::ToAvroCompressionKind("zstd")); Review Comment: This test code will not compile and/or will crash at runtime: (1) `ToAvroCompressionKind` is `private` in `AvroWriterBuilder`; (2) `Build(nullptr, ...)` passes a null `OutputStream` which is very likely dereferenced inside `AvroOutputStreamImpl` / Avro writer initialization; (3) it dereferences `avro_file_writer` without checking the `dynamic_cast` result; and (4) it accesses `AvroFormatWriter::writer_` and `DataFileWriterBase` internals (`codec_`, `compressionLevel_`), which are not exposed as public API. Prefer validating codec behavior by writing to a real temp `OutputStream` (e.g., via `LocalFileSystem`) and inspecting file metadata, or add a narrow test-only accessor/friend in `AvroFormatWriter` rather than reaching into private members. ########## src/paimon/format/avro/avro_writer_builder.h: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <utility> + +#include "avro/DataFile.hh" +#include "avro/Stream.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/format/avro/avro_format_defs.h" +#include "paimon/format/avro/avro_format_writer.h" +#include "paimon/format/avro/avro_output_stream_impl.h" +#include "paimon/format/writer_builder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow +namespace paimon { +class FormatWriter; +class OutputStream; +} // namespace paimon + +namespace paimon::avro { + +class AvroWriterBuilder : public WriterBuilder { + public: + AvroWriterBuilder(const std::shared_ptr<arrow::Schema>& schema, int32_t batch_size, + const std::map<std::string, std::string>& options) + : pool_(GetDefaultPool()), schema_(schema), options_(options) {} + + WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) override { + pool_ = pool; + return this; + } + + Result<std::unique_ptr<FormatWriter>> Build(const std::shared_ptr<OutputStream>& out, + const std::string& compression) override { + auto output_stream = std::make_unique<AvroOutputStreamImpl>(out, BUFFER_SIZE, pool_); + PAIMON_ASSIGN_OR_RAISE( + std::string file_compression, + OptionsUtils::GetValueFromMap<std::string>(options_, AVRO_CODEC, compression)); + PAIMON_ASSIGN_OR_RAISE(::avro::Codec codec, + ToAvroCompressionKind(StringUtils::ToLowerCase(file_compression))); + PAIMON_ASSIGN_OR_RAISE(std::optional<int32_t> compression_level, + GetAvroCompressionLevel(codec)); + return AvroFormatWriter::Create(std::move(output_stream), schema_, codec, + compression_level); + } + + private: + static constexpr int32_t BUFFER_SIZE = 1024 * 1024; + + static Result<::avro::Codec> ToAvroCompressionKind(const std::string& file_compression) { + if (file_compression == "zstd" || file_compression == "zstandard") { + return ::avro::Codec::ZSTD_CODEC; + } else if (file_compression == "snappy") { + return ::avro::Codec::SNAPPY_CODEC; + } else if (file_compression == "null" || file_compression == "none") { + return ::avro::Codec::NULL_CODEC; + } else if (file_compression == "deflate") { + return ::avro::Codec::DEFLATE_CODEC; + } else { + return Status::Invalid("unknown compression " + file_compression); + } + } + Result<std::optional<int32_t>> GetAvroCompressionLevel(const ::avro::Codec& codec) { Review Comment: `ToAvroCompressionKind` and `GetAvroCompressionLevel` are `private`, but the new tests call them directly. Either adjust tests to validate behavior through the public `Build()` API, or make these helpers part of a public/`protected` API intended for reuse/testing (e.g., `public` static for the codec mapping). Keeping them private while tests depend on them will break builds. ########## src/paimon/format/avro/avro_writer_builder_test.cpp: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_writer_builder.h" + +#include "avro/DataFile.hh" +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +TEST(AvroWriterBuilderTest, HandlesValidCompressions) { + ASSERT_OK_AND_ASSIGN(::avro::Codec zstd_codec, + AvroWriterBuilder::ToAvroCompressionKind("zstd")); + ASSERT_EQ(zstd_codec, ::avro::Codec::ZSTD_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec zstandard_codec, + AvroWriterBuilder::ToAvroCompressionKind("zstandard")); + ASSERT_EQ(zstandard_codec, ::avro::Codec::ZSTD_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec snappy_codec, + AvroWriterBuilder::ToAvroCompressionKind("snappy")); + ASSERT_EQ(snappy_codec, ::avro::Codec::SNAPPY_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec null_codec, + AvroWriterBuilder::ToAvroCompressionKind("null")); + ASSERT_EQ(null_codec, ::avro::Codec::NULL_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec deflate_codec, + AvroWriterBuilder::ToAvroCompressionKind("deflate")); + ASSERT_EQ(deflate_codec, ::avro::Codec::DEFLATE_CODEC); +} + +TEST(AvroWriterBuilderTest, HandlesInvalidCompression) { + ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("unknown_compression")); +} + +TEST(AvroWriterBuilderTest, HandlesEmptyString) { + ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("")); +} + +TEST(AvroWriterBuilderTest, CheckAvroCodec) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto schema = std::make_shared<arrow::Schema>(fields); + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "snappy"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast<AvroFormatWriter*>(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::SNAPPY_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); Review Comment: This test code will not compile and/or will crash at runtime: (1) `ToAvroCompressionKind` is `private` in `AvroWriterBuilder`; (2) `Build(nullptr, ...)` passes a null `OutputStream` which is very likely dereferenced inside `AvroOutputStreamImpl` / Avro writer initialization; (3) it dereferences `avro_file_writer` without checking the `dynamic_cast` result; and (4) it accesses `AvroFormatWriter::writer_` and `DataFileWriterBase` internals (`codec_`, `compressionLevel_`), which are not exposed as public API. Prefer validating codec behavior by writing to a real temp `OutputStream` (e.g., via `LocalFileSystem`) and inspecting file metadata, or add a narrow test-only accessor/friend in `AvroFormatWriter` rather than reaching into private members. ########## src/paimon/format/avro/avro_format_writer.cpp: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_format_writer.h" + +#include <cassert> +#include <exception> +#include <memory> +#include <utility> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "avro/Compiler.hh" // IWYU pragma: keep +#include "avro/DataFile.hh" +#include "avro/Exception.hh" +#include "avro/Generic.hh" // IWYU pragma: keep +#include "avro/Specific.hh" // IWYU pragma: keep +#include "avro/ValidSchema.hh" +#include "fmt/format.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/format/avro/avro_schema_converter.h" + +namespace arrow { +class Array; +} // namespace arrow +struct ArrowArray; + +namespace paimon::avro { + +AvroFormatWriter::AvroFormatWriter(std::unique_ptr<::avro::DataFileWriterBase>&& file_writer, + const ::avro::ValidSchema& avro_schema, + const std::shared_ptr<arrow::DataType>& data_type, + AvroOutputStreamImpl* avro_output_stream) + : writer_(std::move(file_writer)), + avro_schema_(avro_schema), + data_type_(data_type), + metrics_(std::make_shared<MetricsImpl>()), + avro_output_stream_(avro_output_stream) {} + +Result<std::unique_ptr<AvroFormatWriter>> AvroFormatWriter::Create( + std::unique_ptr<AvroOutputStreamImpl> out, const std::shared_ptr<arrow::Schema>& schema, + const ::avro::Codec codec, std::optional<int32_t> compression_level) { + try { + PAIMON_ASSIGN_OR_RAISE(::avro::ValidSchema avro_schema, + AvroSchemaConverter::ArrowSchemaToAvroSchema(schema)); + AvroOutputStreamImpl* avro_output_stream = out.get(); + auto writer = std::make_unique<::avro::DataFileWriterBase>( + std::move(out), avro_schema, DEFAULT_SYNC_INTERVAL, codec, ::avro::Metadata(), + compression_level); + auto data_type = arrow::struct_(schema->fields()); + return std::unique_ptr<AvroFormatWriter>( + new AvroFormatWriter(std::move(writer), avro_schema, data_type, avro_output_stream)); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro format writer create failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro format writer create failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro format writer create failed: unknown exception"); + } +} + +Status AvroFormatWriter::Flush() { + try { + writer_->flush(); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro writer flush failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro writer flush failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro writer flush failed: unknown exception"); + } + + return Status::OK(); +} + +Status AvroFormatWriter::Finish() { + try { + avro_output_stream_->FlushBuffer(); // we need flush buffer before close writer + writer_->close(); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro writer close failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro writer close failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro writer close failed: unknown exception"); + } + return Status::OK(); +} + +Result<bool> AvroFormatWriter::ReachTargetSize(bool suggested_check, int64_t target_size) const { + if (suggested_check) { + uint64_t current_size = writer_->getCurrentBlockStart(); + return current_size >= static_cast<uint64_t>(target_size); + } + return false; +} + +Status AvroFormatWriter::AddBatch(ArrowArray* batch) { + assert(batch); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(batch, data_type_)); + try { + for (int64_t row_index = 0; row_index < arrow_array->length(); ++row_index) { + writer_->syncIfNeeded(); + PAIMON_RETURN_NOT_OK(AvroDirectEncoder::EncodeArrowToAvro( + avro_schema_.root(), *arrow_array, row_index, &writer_->encoder(), &encode_ctx_)); + writer_->incr(); + } + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro writer add batch failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro writer add batch failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro writer add batch failed: unknown exception"); + } + PAIMON_RETURN_NOT_OK(Flush()); + return Status::OK(); Review Comment: `AddBatch()` unconditionally calls `Flush()` after writing every batch, which forces I/O/sync work and can significantly reduce throughput for small batches or high-frequency calls. Consider removing the implicit flush and letting callers control flushing via `Flush()`/`Finish()`, or only flushing based on a threshold (e.g., block size / sync interval) if immediate visibility is required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
