Copilot commented on code in PR #98: URL: https://github.com/apache/paimon-cpp/pull/98#discussion_r3432748433
########## src/paimon/format/blob/blob_file_batch_reader.cpp: ########## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include <algorithm> +#include <future> +#include <numeric> + +#include "arrow/api.h" +#include "arrow/array/builder_dict.h" +#include "arrow/array/builder_nested.h" +#include "arrow/c/bridge.h" +#include "arrow/util/bit_util.h" +#include "fmt/format.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/io/offset_input_stream.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/delta_varint_compressor.h" +#include "paimon/common/utils/stream_utils.h" +#include "paimon/data/blob.h" + +namespace paimon::blob { + +Result<std::unique_ptr<BlobFileBatchReader>> BlobFileBatchReader::Create( + const std::shared_ptr<InputStream>& input_stream, int32_t batch_size, bool blob_as_descriptor, + const std::shared_ptr<MemoryPool>& pool) { + if (input_stream == nullptr) { + return Status::Invalid("blob file batch reader create failed: input stream is nullptr"); + } + if (batch_size <= 0) { + return Status::Invalid(fmt::format( + "blob file batch reader create failed: read batch size '{}' should be larger than zero", + batch_size)); + } + + PAIMON_ASSIGN_OR_RAISE(uint64_t file_size, input_stream->Length()); + PAIMON_RETURN_NOT_OK( + input_stream->Seek(file_size - BlobDefs::kBlobFileHeaderLength, FS_SEEK_SET)); + int8_t header[BlobDefs::kBlobFileHeaderLength]; + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_size, + input_stream->Read(reinterpret_cast<char*>(header), BlobDefs::kBlobFileHeaderLength)); + if (actual_size != BlobDefs::kBlobFileHeaderLength) { + return Status::Invalid( + fmt::format("actual read size {} not match with expect header length {}", actual_size, + BlobDefs::kBlobFileHeaderLength)); + } + int8_t version = header[4]; + if (version != BlobDefs::kFileVersion) { + return Status::Invalid(fmt::format( + "create blob format reader failed. unsupported blob file version: {}", version)); + } + int32_t index_length = GetIndexLength(header, 0); + PAIMON_RETURN_NOT_OK(input_stream->Seek( + file_size - BlobDefs::kBlobFileHeaderLength - index_length, FS_SEEK_SET)); + std::vector<char> index_bytes(index_length, '\0'); Review Comment: `file_size - BlobDefs::kBlobFileHeaderLength` can underflow when reading a truncated/corrupt blob file (e.g., size < 5), leading to a huge seek offset and undefined behavior. `index_length` also needs validation (non-negative and not larger than remaining file size) before allocating/seek. ########## src/paimon/format/blob/blob_format_writer.cpp: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_format_writer.h" + +#include <algorithm> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/delta_varint_compressor.h" +#include "paimon/data/blob.h" +#include "paimon/io/byte_array_input_stream.h" + +namespace paimon::blob { + +BlobFormatWriter::BlobFormatWriter(bool blob_as_descriptor, + const std::shared_ptr<OutputStream>& out, + const std::shared_ptr<arrow::DataType>& data_type, + const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<MemoryPool>& pool) + : blob_as_descriptor_(blob_as_descriptor), + out_(out), + data_type_(data_type), + fs_(fs), + pool_(pool) { + metrics_ = std::make_shared<MetricsImpl>(); + tmp_buffer_ = Bytes::AllocateBytes(kTmpBufferSize, pool_.get()); +} + +Result<std::unique_ptr<BlobFormatWriter>> BlobFormatWriter::Create( + bool blob_as_descriptor, const std::shared_ptr<OutputStream>& out, + const std::shared_ptr<arrow::DataType>& data_type, const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<MemoryPool>& pool) { + if (out == nullptr) { + return Status::Invalid("blob format writer create failed. out is nullptr"); + } + if (data_type == nullptr) { + return Status::Invalid("blob format writer create failed. data_type is nullptr"); + } + if (pool == nullptr) { + return Status::Invalid("blob format writer create failed. pool is nullptr"); + } Review Comment: When `blob_as_descriptor` is true, `fs` is required (used later by `Blob::NewInputStream`). `Create()` currently accepts `fs == nullptr`, which will lead to a null dereference at write time. Validate `fs` up front. ########## src/paimon/format/blob/blob_format_writer.cpp: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_format_writer.h" + +#include <algorithm> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/delta_varint_compressor.h" +#include "paimon/data/blob.h" +#include "paimon/io/byte_array_input_stream.h" + +namespace paimon::blob { + +BlobFormatWriter::BlobFormatWriter(bool blob_as_descriptor, + const std::shared_ptr<OutputStream>& out, + const std::shared_ptr<arrow::DataType>& data_type, + const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<MemoryPool>& pool) + : blob_as_descriptor_(blob_as_descriptor), + out_(out), + data_type_(data_type), + fs_(fs), + pool_(pool) { + metrics_ = std::make_shared<MetricsImpl>(); + tmp_buffer_ = Bytes::AllocateBytes(kTmpBufferSize, pool_.get()); +} + +Result<std::unique_ptr<BlobFormatWriter>> BlobFormatWriter::Create( + bool blob_as_descriptor, const std::shared_ptr<OutputStream>& out, + const std::shared_ptr<arrow::DataType>& data_type, const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<MemoryPool>& pool) { + if (out == nullptr) { + return Status::Invalid("blob format writer create failed. out is nullptr"); + } + if (data_type == nullptr) { + return Status::Invalid("blob format writer create failed. data_type is nullptr"); + } + if (pool == nullptr) { + return Status::Invalid("blob format writer create failed. pool is nullptr"); + } + if (data_type->num_fields() != 1) { + return Status::Invalid( + fmt::format("blob data type field number {} is not 1", data_type->num_fields())); + } + if (!BlobUtils::IsBlobField(data_type->field(0))) { + return Status::Invalid( + fmt::format("field {} is not BLOB", data_type->field(0)->ToString())); + } + return std::unique_ptr<BlobFormatWriter>( + new BlobFormatWriter(blob_as_descriptor, out, data_type, fs, pool)); +} + +Status BlobFormatWriter::AddBatch(ArrowArray* batch) { + if (batch == nullptr) { + return Status::Invalid("blob format writer add batch failed. batch is nullptr"); + } + if (batch->length != 1) { + return Status::Invalid("BlobFormatWriter only supports batch with a row count of 1"); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(batch, data_type_)); + + assert(arrow_array->num_fields() == 1); + auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(arrow_array); + auto child_array = struct_array->field(0); + + // Struct-level null is not supported (caller should not pass null struct rows) + if (struct_array->IsNull(0)) { + return Status::Invalid("BlobFormatWriter does not support struct-level null."); + } + // Child-level null: record kNullBinLength, skip data writing (aligned with Java) + if (child_array->IsNull(0)) { + bin_lengths_.push_back(BlobDefs::kNullBinLength); + return Status::OK(); + } + + if (child_array->type_id() != arrow::Type::type::LARGE_BINARY) { + return Status::Invalid("BlobFormatWriter only support large binary type."); + } + + const auto& blob_array = + arrow::internal::checked_cast<const arrow::LargeBinaryArray&>(*child_array); + assert(blob_array.length() == 1); + PAIMON_RETURN_NOT_OK(WriteBlob(blob_array.GetView(0))); + + PAIMON_RETURN_NOT_OK(Flush()); + return Status::OK(); +} + +Status BlobFormatWriter::Flush() { + return out_->Flush(); +} + +Status BlobFormatWriter::Finish() { + // index + const auto& index_bytes = DeltaVarintCompressor::Compress(bin_lengths_); + PAIMON_RETURN_NOT_OK(WriteBytes(index_bytes.data(), index_bytes.size())); + // header + PAIMON_UNIQUE_PTR<Bytes> index_length_bytes = + IntegerToLittleEndian<int32_t>(static_cast<int32_t>(index_bytes.size()), pool_); + PAIMON_RETURN_NOT_OK(WriteBytes(index_length_bytes->data(), index_length_bytes->size())); + PAIMON_RETURN_NOT_OK(WriteBytes(reinterpret_cast<const char*>(&BlobDefs::kFileVersion), + sizeof(BlobDefs::kFileVersion))); + + PAIMON_RETURN_NOT_OK(Flush()); + + tmp_buffer_.reset(); + return Status::OK(); +} + +Status BlobFormatWriter::WriteBlob(std::string_view blob_data) { + crc32_ = 0; + PAIMON_ASSIGN_OR_RAISE(int64_t previous_pos, out_->GetPos()); + + // write magic number + static PAIMON_UNIQUE_PTR<Bytes> kMagicNumberBytes = + IntegerToLittleEndian<int32_t>(BlobDefs::kMagicNumber, pool_); + PAIMON_RETURN_NOT_OK(WriteWithCrc32(kMagicNumberBytes->data(), kMagicNumberBytes->size())); + + // write blob content + std::unique_ptr<InputStream> in; + if (blob_as_descriptor_) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<Blob> blob, + Blob::FromDescriptor(blob_data.data(), blob_data.size())); + PAIMON_ASSIGN_OR_RAISE(in, blob->NewInputStream(fs_)); + } else { + in = std::make_unique<ByteArrayInputStream>(blob_data.data(), blob_data.size()); + } + PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, in->Length()); + uint64_t total_read_length = 0; + auto read_len = static_cast<uint32_t>(std::min<uint64_t>(file_length, tmp_buffer_->size())); + while (read_len > 0) { + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len, in->Read(tmp_buffer_->data(), read_len)); + if (static_cast<uint32_t>(actual_read_len) != read_len) { + return Status::Invalid("actual read length {}, not match with expect length {}", + actual_read_len, read_len); + } + PAIMON_RETURN_NOT_OK(WriteWithCrc32(tmp_buffer_->data(), actual_read_len)); + total_read_length += actual_read_len; + read_len = static_cast<uint32_t>( + std::min<uint64_t>(file_length - total_read_length, tmp_buffer_->size())); Review Comment: `InputStream::Read()` is allowed to return fewer bytes than requested (short read). The current implementation treats any short read as an error, which can break blob writing on some filesystems/streams. Loop until the requested chunk is fully read (or return an EOF/IO error). ########## src/paimon/format/blob/blob_stats_extractor.cpp: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_stats_extractor.h" + +#include <cassert> +#include <optional> + +#include "arrow/api.h" +#include "fmt/format.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/format/blob/blob_file_batch_reader.h" +#include "paimon/format/column_stats.h" +#include "paimon/fs/file_system.h" +#include "paimon/status.h" + +namespace paimon { +class MemoryPool; +} // namespace paimon + +namespace paimon::blob { + +Result<std::pair<ColumnStatsVector, FormatStatsExtractor::FileInfo>> +BlobStatsExtractor::ExtractWithFileInfo(const std::shared_ptr<FileSystem>& file_system, + const std::string& path, + const std::shared_ptr<MemoryPool>& pool) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream, file_system->Open(path)); + assert(input_stream); + if (write_schema_->num_fields() != 1) { + return Status::Invalid( + fmt::format("schema field number {} is not 1", write_schema_->num_fields())); + } + if (!BlobUtils::IsBlobField(write_schema_->field(0))) { + return Status::Invalid( + fmt::format("field {} is not BLOB", write_schema_->field(0)->ToString())); + } + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr<BlobFileBatchReader> blob_reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1024, /*blob_as_descriptor=*/true, pool)); + ColumnStatsVector result_stats; + result_stats.push_back( + ColumnStats::CreateStringColumnStats(std::nullopt, std::nullopt, /*null_count=*/0)); + PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, blob_reader->GetNumberOfRows()); Review Comment: `BlobStatsExtractor` always reports `null_count = 0`, but the blob writer/reader supports null blob entries (kNullBinLength). At minimum, avoid claiming "0" when the schema allows nulls by returning an unknown (nullopt) null count. ########## src/paimon/format/blob/blob_format_writer.cpp: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_format_writer.h" + +#include <algorithm> + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/delta_varint_compressor.h" +#include "paimon/data/blob.h" +#include "paimon/io/byte_array_input_stream.h" + +namespace paimon::blob { + +BlobFormatWriter::BlobFormatWriter(bool blob_as_descriptor, + const std::shared_ptr<OutputStream>& out, + const std::shared_ptr<arrow::DataType>& data_type, + const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<MemoryPool>& pool) + : blob_as_descriptor_(blob_as_descriptor), + out_(out), + data_type_(data_type), + fs_(fs), + pool_(pool) { + metrics_ = std::make_shared<MetricsImpl>(); + tmp_buffer_ = Bytes::AllocateBytes(kTmpBufferSize, pool_.get()); +} + +Result<std::unique_ptr<BlobFormatWriter>> BlobFormatWriter::Create( + bool blob_as_descriptor, const std::shared_ptr<OutputStream>& out, + const std::shared_ptr<arrow::DataType>& data_type, const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<MemoryPool>& pool) { + if (out == nullptr) { + return Status::Invalid("blob format writer create failed. out is nullptr"); + } + if (data_type == nullptr) { + return Status::Invalid("blob format writer create failed. data_type is nullptr"); + } + if (pool == nullptr) { + return Status::Invalid("blob format writer create failed. pool is nullptr"); + } + if (data_type->num_fields() != 1) { + return Status::Invalid( + fmt::format("blob data type field number {} is not 1", data_type->num_fields())); + } + if (!BlobUtils::IsBlobField(data_type->field(0))) { + return Status::Invalid( + fmt::format("field {} is not BLOB", data_type->field(0)->ToString())); + } + return std::unique_ptr<BlobFormatWriter>( + new BlobFormatWriter(blob_as_descriptor, out, data_type, fs, pool)); +} + +Status BlobFormatWriter::AddBatch(ArrowArray* batch) { + if (batch == nullptr) { + return Status::Invalid("blob format writer add batch failed. batch is nullptr"); + } + if (batch->length != 1) { + return Status::Invalid("BlobFormatWriter only supports batch with a row count of 1"); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(batch, data_type_)); + + assert(arrow_array->num_fields() == 1); + auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(arrow_array); + auto child_array = struct_array->field(0); + + // Struct-level null is not supported (caller should not pass null struct rows) + if (struct_array->IsNull(0)) { + return Status::Invalid("BlobFormatWriter does not support struct-level null."); + } + // Child-level null: record kNullBinLength, skip data writing (aligned with Java) + if (child_array->IsNull(0)) { + bin_lengths_.push_back(BlobDefs::kNullBinLength); + return Status::OK(); + } + + if (child_array->type_id() != arrow::Type::type::LARGE_BINARY) { + return Status::Invalid("BlobFormatWriter only support large binary type."); + } + + const auto& blob_array = + arrow::internal::checked_cast<const arrow::LargeBinaryArray&>(*child_array); + assert(blob_array.length() == 1); + PAIMON_RETURN_NOT_OK(WriteBlob(blob_array.GetView(0))); + + PAIMON_RETURN_NOT_OK(Flush()); + return Status::OK(); +} + +Status BlobFormatWriter::Flush() { + return out_->Flush(); +} + +Status BlobFormatWriter::Finish() { + // index + const auto& index_bytes = DeltaVarintCompressor::Compress(bin_lengths_); + PAIMON_RETURN_NOT_OK(WriteBytes(index_bytes.data(), index_bytes.size())); + // header + PAIMON_UNIQUE_PTR<Bytes> index_length_bytes = + IntegerToLittleEndian<int32_t>(static_cast<int32_t>(index_bytes.size()), pool_); + PAIMON_RETURN_NOT_OK(WriteBytes(index_length_bytes->data(), index_length_bytes->size())); + PAIMON_RETURN_NOT_OK(WriteBytes(reinterpret_cast<const char*>(&BlobDefs::kFileVersion), + sizeof(BlobDefs::kFileVersion))); + + PAIMON_RETURN_NOT_OK(Flush()); + + tmp_buffer_.reset(); + return Status::OK(); +} + +Status BlobFormatWriter::WriteBlob(std::string_view blob_data) { + crc32_ = 0; + PAIMON_ASSIGN_OR_RAISE(int64_t previous_pos, out_->GetPos()); + + // write magic number + static PAIMON_UNIQUE_PTR<Bytes> kMagicNumberBytes = + IntegerToLittleEndian<int32_t>(BlobDefs::kMagicNumber, pool_); + PAIMON_RETURN_NOT_OK(WriteWithCrc32(kMagicNumberBytes->data(), kMagicNumberBytes->size())); Review Comment: `kMagicNumberBytes` is a function-local `static` allocated using the first writer instance's `pool_`. This is not thread-safe for multiple pools/writers and can lead to allocator/lifetime mismatches. Prefer allocating per call (or storing a plain constexpr byte array). ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + RoaringBitmap32 roaring_0; + roaring_0.Add(0); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin"}, blob_as_descriptor, roaring_0); + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_4_67007c96.bin"}, blob_as_descriptor, roaring_1); + RoaringBitmap32 roaring_2; + roaring_2.Add(0); + roaring_2.Add(1); + roaring_2.Add(3); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_8_5fba0737.bin"}, + blob_as_descriptor, roaring_2); + RoaringBitmap32 roaring_3; + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", {}, + blob_as_descriptor, roaring_3); +} + +TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + RoaringBitmap32 roaring_0; + roaring_0.Add(0); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin"}, blob_as_descriptor, roaring_0); + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_4_67007c96.bin"}, blob_as_descriptor, roaring_1); + RoaringBitmap32 roaring_2; + roaring_2.Add(0); + roaring_2.Add(1); + roaring_2.Add(3); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_8_5fba0737.bin"}, + blob_as_descriptor, roaring_2); + RoaringBitmap32 roaring_3; + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", {}, + blob_as_descriptor, roaring_3); +} + +TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(3, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); + ArrowArrayRelease(batch1.first.get()); + ArrowSchemaRelease(batch1.second.get()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); + ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch2.first.get()); + ArrowSchemaRelease(batch2.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); + ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch3.first.get()); + ArrowSchemaRelease(batch3.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); + ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); +} + +TEST_F(BlobFileBatchReaderTest, InvalidScenario) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/0, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: read batch size '0' should be larger than zero"); + } + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(/*input_stream=*/nullptr, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: input stream is nullptr"); + } + { + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(/*input_stream=*/input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), + "blob file has no self-describing file schema"); + ASSERT_TRUE(reader->GetReaderMetrics()); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), + "target type is nullptr, call SetReadSchema first"); + reader->Close(); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), "blob file batch reader is closed"); + } +} + +TEST_P(BlobFileBatchReaderTest, EmptyFile) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream, + file_system->Create(dir->Str() + "/file.blob", /*overwrite=*/true)); + std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("blob_col"); + auto struct_type = arrow::struct_({blob_field}); + bool blob_as_descriptor = GetParam(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlobFormatWriter> writer, + BlobFormatWriter::Create(blob_as_descriptor, output_stream, struct_type, + file_system, pool_)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(output_stream->Flush()); + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + file_system->Open(dir->Str() + "/file.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(0, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch)); +} + +TEST_F(BlobFileBatchReaderTest, SetReadSchemaWithInvalidInputs) { + { + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(/*read_schema=*/nullptr, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "SetReadSchema failed: read schema cannot be nullptr"); + } + { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false), + BlobUtils::ToArrowField("my_blob_field_2", false)}); + + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test scope dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + RoaringBitmap32 roaring_0; + roaring_0.Add(0); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin"}, blob_as_descriptor, roaring_0); + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_4_67007c96.bin"}, blob_as_descriptor, roaring_1); + RoaringBitmap32 roaring_2; + roaring_2.Add(0); + roaring_2.Add(1); + roaring_2.Add(3); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_8_5fba0737.bin"}, + blob_as_descriptor, roaring_2); + RoaringBitmap32 roaring_3; + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", {}, + blob_as_descriptor, roaring_3); +} + +TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(3, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); + ArrowArrayRelease(batch1.first.get()); + ArrowSchemaRelease(batch1.second.get()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); + ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch2.first.get()); + ArrowSchemaRelease(batch2.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); + ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch3.first.get()); + ArrowSchemaRelease(batch3.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); + ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); +} + +TEST_F(BlobFileBatchReaderTest, InvalidScenario) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/0, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: read batch size '0' should be larger than zero"); + } + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(/*input_stream=*/nullptr, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: input stream is nullptr"); + } + { + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(/*input_stream=*/input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), + "blob file has no self-describing file schema"); + ASSERT_TRUE(reader->GetReaderMetrics()); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), + "target type is nullptr, call SetReadSchema first"); + reader->Close(); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), "blob file batch reader is closed"); + } +} + +TEST_P(BlobFileBatchReaderTest, EmptyFile) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream, + file_system->Create(dir->Str() + "/file.blob", /*overwrite=*/true)); + std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("blob_col"); + auto struct_type = arrow::struct_({blob_field}); + bool blob_as_descriptor = GetParam(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlobFormatWriter> writer, + BlobFormatWriter::Create(blob_as_descriptor, output_stream, struct_type, + file_system, pool_)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(output_stream->Flush()); + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + file_system->Open(dir->Str() + "/file.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(0, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch)); +} + +TEST_F(BlobFileBatchReaderTest, SetReadSchemaWithInvalidInputs) { + { + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test scope dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + RoaringBitmap32 roaring_0; + roaring_0.Add(0); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin"}, blob_as_descriptor, roaring_0); + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_4_67007c96.bin"}, blob_as_descriptor, roaring_1); + RoaringBitmap32 roaring_2; + roaring_2.Add(0); + roaring_2.Add(1); + roaring_2.Add(3); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_8_5fba0737.bin"}, + blob_as_descriptor, roaring_2); + RoaringBitmap32 roaring_3; + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", {}, + blob_as_descriptor, roaring_3); +} + +TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(3, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); + ArrowArrayRelease(batch1.first.get()); + ArrowSchemaRelease(batch1.second.get()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); + ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch2.first.get()); + ArrowSchemaRelease(batch2.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); + ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch3.first.get()); + ArrowSchemaRelease(batch3.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); + ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); +} + +TEST_F(BlobFileBatchReaderTest, InvalidScenario) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/0, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: read batch size '0' should be larger than zero"); + } + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(/*input_stream=*/nullptr, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: input stream is nullptr"); + } + { + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(/*input_stream=*/input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), + "blob file has no self-describing file schema"); + ASSERT_TRUE(reader->GetReaderMetrics()); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), + "target type is nullptr, call SetReadSchema first"); + reader->Close(); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), "blob file batch reader is closed"); + } +} + +TEST_P(BlobFileBatchReaderTest, EmptyFile) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream, + file_system->Create(dir->Str() + "/file.blob", /*overwrite=*/true)); + std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("blob_col"); + auto struct_type = arrow::struct_({blob_field}); + bool blob_as_descriptor = GetParam(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlobFormatWriter> writer, + BlobFormatWriter::Create(blob_as_descriptor, output_stream, struct_type, + file_system, pool_)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(output_stream->Flush()); + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + file_system->Open(dir->Str() + "/file.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(0, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch)); +} + +TEST_F(BlobFileBatchReaderTest, SetReadSchemaWithInvalidInputs) { + { + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(/*read_schema=*/nullptr, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "SetReadSchema failed: read schema cannot be nullptr"); + } + { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false), + BlobUtils::ToArrowField("my_blob_field_2", false)}); + + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "read schema field number 2 is not 1"); + } + { + auto blob_field = arrow::field("my_blob_field", arrow::large_binary()); + + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "field my_blob_field: large_binary is not BLOB"); + } + { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test scope dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. ########## src/paimon/format/blob/blob_file_batch_reader_test.cpp: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector<std::string>& original_blob_files, bool blob_as_descriptor, + const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr<arrow::Array> combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast<arrow::LargeBinaryArray>(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr<MemoryPool> pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + RoaringBitmap32 roaring_0; + roaring_0.Add(0); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin"}, blob_as_descriptor, roaring_0); + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_4_67007c96.bin"}, blob_as_descriptor, roaring_1); + RoaringBitmap32 roaring_2; + roaring_2.Add(0); + roaring_2.Add(1); + roaring_2.Add(3); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_8_5fba0737.bin"}, + blob_as_descriptor, roaring_2); + RoaringBitmap32 roaring_3; + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", {}, + blob_as_descriptor, roaring_3); +} + +TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(3, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); + ArrowArrayRelease(batch1.first.get()); + ArrowSchemaRelease(batch1.second.get()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); + ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch2.first.get()); + ArrowSchemaRelease(batch2.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); + ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch3.first.get()); + ArrowSchemaRelease(batch3.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); + ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); +} + +TEST_F(BlobFileBatchReaderTest, InvalidScenario) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/0, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: read batch size '0' should be larger than zero"); + } + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(/*input_stream=*/nullptr, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: input stream is nullptr"); + } + { + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(/*input_stream=*/input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), + "blob file has no self-describing file schema"); + ASSERT_TRUE(reader->GetReaderMetrics()); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), + "target type is nullptr, call SetReadSchema first"); + reader->Close(); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), "blob file batch reader is closed"); + } +} + +TEST_P(BlobFileBatchReaderTest, EmptyFile) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream, + file_system->Create(dir->Str() + "/file.blob", /*overwrite=*/true)); + std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("blob_col"); + auto struct_type = arrow::struct_({blob_field}); + bool blob_as_descriptor = GetParam(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<BlobFormatWriter> writer, + BlobFormatWriter::Create(blob_as_descriptor, output_stream, struct_type, + file_system, pool_)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(output_stream->Flush()); + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, + file_system->Open(dir->Str() + "/file.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(0, number_of_rows); + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch)); +} + +TEST_F(BlobFileBatchReaderTest, SetReadSchemaWithInvalidInputs) { + { + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(/*read_schema=*/nullptr, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "SetReadSchema failed: read schema cannot be nullptr"); + } + { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false), + BlobUtils::ToArrowField("my_blob_field_2", false)}); + + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr<FileSystem> fs = std::make_shared<LocalFileSystem>(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr<InputStream> input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "read schema field number 2 is not 1"); + } + { + auto blob_field = arrow::field("my_blob_field", arrow::large_binary()); + + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); Review Comment: `UniqueTestDirectory::Create()` can return nullptr; this test scope dereferences `dir` without checking, which can cause a crash in test setup failures. Add `ASSERT_TRUE(dir)` before using it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
