Copilot commented on code in PR #47775: URL: https://github.com/apache/arrow/pull/47775#discussion_r2786965172
########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,609 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <algorithm> +#include <memory> +#include <numeric> +#include <ranges> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "arrow/util/logging.h" +#include "parquet/bloom_filter.h" // IWYU pragma: keep +#include "parquet/bloom_filter_reader.h" +#include "parquet/bloom_filter_writer.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/index_location.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" +#include "parquet/schema.h" + +namespace parquet { + +namespace { +void CopyStream(std::shared_ptr<ArrowInputStream> from, + std::shared_ptr<ArrowOutputStream> to, int64_t size, + ::arrow::MemoryPool* pool) { + int64_t bytes_copied = 0; + if (from->supports_zero_copy()) { + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied)); + if (buffer->size() == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size())); + bytes_copied += buffer->size(); + } + return; + } + + std::shared_ptr<ResizableBuffer> buffer = + AllocateBuffer(pool, kDefaultOutputStreamSize); + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW( + auto read_size, + from->Read(std::min(size - bytes_copied, buffer->size()), &buffer)); + if (read_size == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); + bytes_copied += read_size; + } +} +} // namespace + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class ColumnChunkRewriter { + public: + ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::unique_ptr<ColumnChunkMetaData> metadata, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader, + int column_ordinal) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + metadata_(std::move(metadata)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + column_ordinal_(column_ordinal) { + if (metadata_ == nullptr) { + throw ParquetException("ColumnChunkMetaData should not be nullptr"); + } + } + + void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + // TODO(HuaHuaY): add else branch to rewrite column chunk with new encoding, + // compression, etc. + bool fast_copy = true; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->start_offset(); + int64_t total_uncompressed_size = metadata_->total_uncompressed_size(); + int64_t total_compressed_size = metadata_->total_compressed_size(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->start_offset(), total_compressed_size); + CopyStream(stream, sink_, total_compressed_size, props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift); + + if (page_index_reader_ != nullptr && page_index_builder != nullptr) { + auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_); + auto offset_index = page_index_reader_->GetOffsetIndex(column_ordinal_); + if (column_index != nullptr) { + page_index_builder->SetColumnIndex(column_ordinal_, column_index); + } + if (offset_index != nullptr) { + page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, shift); + } + } + + if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) { + auto bloom_filter = bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_); + if (bloom_filter != nullptr) { + bloom_filter_builder->InsertBloomFilter(column_ordinal_, + std::move(bloom_filter)); + } + } + + total_bytes_written += total_uncompressed_size; + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ColumnChunkMetaData> metadata_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + int column_ordinal_; +}; + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("RowGroupMetaData should not be nullptr"); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + rg_metadata_builder.set_num_rows(metadata_->num_rows()); + + bool fast_copy = metadata_->file_offset() != 0; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size(), props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift); + + auto column_index = + page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : nullptr; + auto offset_index = + page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : nullptr; + auto bloom_filter = bloom_filter_reader_ + ? bloom_filter_reader_->GetColumnBloomFilter(i) + : nullptr; + + if (column_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetColumnIndex(i, column_index); + } + if (offset_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + if (bloom_filter != nullptr && bloom_filter_builder != nullptr) { + bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter)); + } + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + ColumnChunkRewriter rewriter(source_, sink_, props_, std::move(cc_metadata), + page_index_reader_, bloom_filter_reader_, i); + rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties& props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props_.reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("FileMetaData should not be nullptr"); + } + + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + if (page_index_reader_) { + page_index_reader_->WillNeed(row_group_indices, column_indices, + {/*column_index=*/true, /*offset_index=*/true}); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_ + ? page_index_reader_->RowGroup(current_row_group_index_) + : nullptr; + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasMoreRowGroup() { + return current_row_group_index_ < metadata_->num_row_groups(); + } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor& schema() const { return *metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : file_rewriters_(std::move(rewriters)) { + if (file_rewriters_.empty()) { + throw ParquetException("At least one SingleFileRewriter is required"); + } + auto& schema = file_rewriters_[0]->schema(); + if (std::ranges::any_of( + file_rewriters_ | std::views::drop(1), + [&schema](auto& rewriter) { return !schema.Equals(rewriter->schema()); })) { + throw ParquetException("Input files have different schemas."); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + file_rewriters_[current_rewriter_index_]->WriteRowGroupData( + rg_metadata_builder, page_index_builder, bloom_filter_builder, + total_bytes_written); + } + + bool HasMoreRowGroup() { + while (current_rewriter_index_ < file_rewriters_.size() && + !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) { + file_rewriters_[current_rewriter_index_]->Close(); + ARROW_LOG(DEBUG) << "Finished rewriting file index " << current_rewriter_index_; + ++current_rewriter_index_; + } + return current_rewriter_index_ < file_rewriters_.size(); + } + + void Close() { std::ranges::for_each(file_rewriters_, &SingleFileRewriter::Close); } + + const SchemaDescriptor& schema() const { return file_rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : file_rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_; + size_t current_rewriter_index_{}; +}; + +class JoinRewriter { + public: + explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + if (rewriters_.empty()) { + throw ParquetException("At least one ConcatRewriter is required"); + } + auto row_counts = rewriters_[0]->row_group_row_counts(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (auto current_row_counts = rewriters_[i]->row_group_row_counts(); + row_counts != current_row_counts) { + auto vecToString = [](const std::vector<int64_t>& v) { + if (v.empty()) { + return std::string("[]"); + } + std::ostringstream oss; + oss << "[" << v[0]; + // TODO(anyone): use std::format and std::views::join_with when C++23 available + for (const auto& val : v | std::views::drop(1)) { + oss << ", " << val; + } + oss << "]"; + return oss.str(); + }; + throw ParquetException( + "The number of rows in each block must match! No.0 blocks row counts: ", + vecToString(row_counts), ", No.", i, + " blocks row counts: ", vecToString(current_row_counts)); + } + } + + std::unordered_set<std::string> column_paths; + schema::NodeVector fields; + + for (auto& rewriter : rewriters_) { + const SchemaDescriptor& schema_desc = rewriter->schema(); + + for (int i = 0; i < schema_desc.num_columns(); ++i) { + auto path = schema_desc.Column(i)->path()->ToDotString(); + if (auto [_, inserted] = column_paths.emplace(path); !inserted) { + // TODO(HuaHuaY): support choose one column from columns with same path + throw ParquetException("NotImplemented, files have same column path: ", path); + } + } + + const auto& group_node = schema_desc.group_node(); + for (int i = 0; i < group_node->field_count(); ++i) { + fields.push_back(group_node->field(i)); + } + } + + auto new_root = schema::GroupNode::Make(rewriters_[0]->schema().name(), + Repetition::REQUIRED, fields); + schema_.Init(new_root); + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + for (auto& rewriter : rewriters_) { + rewriter->WriteRowGroupData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + } + } + + bool HasMoreRowGroup() { + return std::ranges::all_of(rewriters_, &ConcatRewriter::HasMoreRowGroup); + } + + void Close() { std::ranges::for_each(rewriters_, &ConcatRewriter::Close); } + + const SchemaDescriptor& schema() const { return schema_; } + + private: + std::vector<std::unique_ptr<ConcatRewriter>> rewriters_; + SchemaDescriptor schema_; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr<ParquetFileRewriter::Contents> Open( + std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) { + if (sources.size() != sources_metadata.size() || + // TODO(anyone): use std::views::zip when C++23 available + std::ranges::any_of(std::views::iota(0u, sources.size()), [&](size_t i) { + return sources[i].size() != sources_metadata[i].size(); + })) { + throw ParquetException( + "The number of sources and sources_metadata must be the same"); + } + std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile( + std::move(sources), std::move(sink), std::move(sources_metadata), + std::move(sink_metadata), std::move(props))); + return result; + } + + void Close() override { + if (rewriter_) { + rewriter_->Close(); + rewriter_.reset(); + } + } + + void Rewrite() override { + while (rewriter_->HasMoreRowGroup()) { + auto& rg_metadata_builder = *metadata_builder_->AppendRowGroup(); + if (page_index_builder_) { + page_index_builder_->AppendRowGroup(); + } + if (bloom_filter_builder_) { + bloom_filter_builder_->AppendRowGroup(); + } + int64_t total_bytes_written = 0; + rewriter_->WriteRowGroupData(rg_metadata_builder, page_index_builder_.get(), + bloom_filter_builder_.get(), total_bytes_written); + rg_metadata_builder.Finish(total_bytes_written); + } + if (page_index_builder_) { + page_index_builder_->Finish(); + auto [column_index_locations, offset_index_locations] = + page_index_builder_->WriteTo(sink_.get()); + metadata_builder_->SetIndexLocations(IndexKind::kColumnIndex, + column_index_locations); + metadata_builder_->SetIndexLocations(IndexKind::kOffsetIndex, + offset_index_locations); + } + if (bloom_filter_builder_) { + auto bloom_filter_locations = bloom_filter_builder_->WriteTo(sink_.get()); + metadata_builder_->SetIndexLocations(IndexKind::kBloomFilter, + bloom_filter_locations); + } + + auto file_metadata = metadata_builder_->Finish(sink_metadata_); + WriteFileMetaData(*file_metadata, sink_.get()); + } + + private: + GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) + : sink_(std::move(sink)), + props_(std::move(props)), + sink_metadata_(std::move(sink_metadata)) { + std::vector<std::unique_ptr<ConcatRewriter>> rewriters; + rewriters.reserve(sources.size()); + for (size_t i = 0; i < sources.size(); ++i) { + std::vector<std::unique_ptr<SingleFileRewriter>> concat_rewriters; + concat_rewriters.reserve(sources[i].size()); + for (size_t j = 0; j < sources[i].size(); ++j) { + concat_rewriters.emplace_back(std::make_unique<SingleFileRewriter>( + std::move(sources[i][j]), sink_, std::move(sources_metadata[i][j]), *props_)); + } + rewriters.emplace_back( + std::make_unique<ConcatRewriter>(std::move(concat_rewriters))); + } + rewriter_ = std::make_unique<JoinRewriter>(std::move(rewriters)); + + if (props_->writer_properties()->file_encryption_properties() == nullptr) { + // Unencrypted parquet files always start with PAR1 + PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); + } else { + throw ParquetException( + "NotImplemented, rewriter does not support to write encrypted files."); + } + + auto new_schema = rewriter_->schema().schema_root(); + new_schema_.Init(new_schema); + metadata_builder_ = + FileMetaDataBuilder::Make(&new_schema_, props_->writer_properties()); + if (props_->writer_properties()->page_index_enabled()) { + page_index_builder_ = PageIndexBuilder::Make(&new_schema_, nullptr); + } + if (props_->writer_properties()->bloom_filter_enabled()) { + bloom_filter_builder_ = + BloomFilterBuilder::Make(&new_schema_, props_->writer_properties().get()); + } + } + + std::shared_ptr<ArrowOutputStream> sink_; + std::shared_ptr<RewriterProperties> props_; + std::shared_ptr<const KeyValueMetadata> sink_metadata_; + std::unique_ptr<JoinRewriter> rewriter_; + + SchemaDescriptor new_schema_; + std::unique_ptr<FileMetaDataBuilder> metadata_builder_; + std::unique_ptr<PageIndexBuilder> page_index_builder_; + std::unique_ptr<BloomFilterBuilder> bloom_filter_builder_; +}; + +// ---------------------------------------------------------------------- +// ParquetFilesRewriter public API + +ParquetFileRewriter::ParquetFileRewriter() = default; + +ParquetFileRewriter::~ParquetFileRewriter() { + try { + Close(); + } catch (...) { + } +} + +std::unique_ptr<ParquetFileRewriter> ParquetFileRewriter::Open( + std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) { + auto contents = GeneratedFile::Open(std::move(sources), std::move(sink), + std::move(sources_metadata), + std::move(sink_metadata), std::move(props)); + std::unique_ptr<ParquetFileRewriter> result(new ParquetFileRewriter()); + result->Open(std::move(contents)); + return result; +} + +void ParquetFileRewriter::Open(std::unique_ptr<ParquetFileRewriter::Contents> contents) { + contents_ = std::move(contents); +} + +void ParquetFileRewriter::Close() { + if (contents_) { + contents_->Close(); + contents_.reset(); + } +} + +void ParquetFileRewriter::Rewrite() { contents_->Rewrite(); } Review Comment: ParquetFileRewriter::Rewrite() dereferences contents_ unconditionally. Since the default constructor is public, calling Rewrite() before Open() (or after Close()) will crash. Consider either making the constructor private (force Open()), or add a null check and throw a ParquetException when contents_ is not set. ```suggestion void ParquetFileRewriter::Rewrite() { if (!contents_) { throw ParquetException( "ParquetFileRewriter::Rewrite() called on an uninitialized or closed file"); } contents_->Rewrite(); } ``` ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,609 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <algorithm> +#include <memory> +#include <numeric> +#include <ranges> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "arrow/util/logging.h" +#include "parquet/bloom_filter.h" // IWYU pragma: keep +#include "parquet/bloom_filter_reader.h" +#include "parquet/bloom_filter_writer.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/index_location.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" +#include "parquet/schema.h" + +namespace parquet { + +namespace { +void CopyStream(std::shared_ptr<ArrowInputStream> from, + std::shared_ptr<ArrowOutputStream> to, int64_t size, + ::arrow::MemoryPool* pool) { + int64_t bytes_copied = 0; + if (from->supports_zero_copy()) { + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied)); + if (buffer->size() == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size())); + bytes_copied += buffer->size(); + } + return; + } + + std::shared_ptr<ResizableBuffer> buffer = + AllocateBuffer(pool, kDefaultOutputStreamSize); + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW( + auto read_size, + from->Read(std::min(size - bytes_copied, buffer->size()), &buffer)); + if (read_size == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); + bytes_copied += read_size; + } +} +} // namespace + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class ColumnChunkRewriter { + public: + ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::unique_ptr<ColumnChunkMetaData> metadata, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader, + int column_ordinal) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + metadata_(std::move(metadata)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + column_ordinal_(column_ordinal) { + if (metadata_ == nullptr) { + throw ParquetException("ColumnChunkMetaData should not be nullptr"); + } + } + + void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + // TODO(HuaHuaY): add else branch to rewrite column chunk with new encoding, + // compression, etc. + bool fast_copy = true; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->start_offset(); + int64_t total_uncompressed_size = metadata_->total_uncompressed_size(); + int64_t total_compressed_size = metadata_->total_compressed_size(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->start_offset(), total_compressed_size); + CopyStream(stream, sink_, total_compressed_size, props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift); + + if (page_index_reader_ != nullptr && page_index_builder != nullptr) { + auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_); + auto offset_index = page_index_reader_->GetOffsetIndex(column_ordinal_); + if (column_index != nullptr) { + page_index_builder->SetColumnIndex(column_ordinal_, column_index); + } + if (offset_index != nullptr) { + page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, shift); + } + } + + if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) { + auto bloom_filter = bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_); + if (bloom_filter != nullptr) { + bloom_filter_builder->InsertBloomFilter(column_ordinal_, + std::move(bloom_filter)); + } + } + + total_bytes_written += total_uncompressed_size; + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ColumnChunkMetaData> metadata_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + int column_ordinal_; +}; + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("RowGroupMetaData should not be nullptr"); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + rg_metadata_builder.set_num_rows(metadata_->num_rows()); + + bool fast_copy = metadata_->file_offset() != 0; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size(), props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift); + + auto column_index = + page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : nullptr; + auto offset_index = + page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : nullptr; + auto bloom_filter = bloom_filter_reader_ + ? bloom_filter_reader_->GetColumnBloomFilter(i) + : nullptr; + + if (column_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetColumnIndex(i, column_index); + } + if (offset_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + if (bloom_filter != nullptr && bloom_filter_builder != nullptr) { + bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter)); + } + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + ColumnChunkRewriter rewriter(source_, sink_, props_, std::move(cc_metadata), + page_index_reader_, bloom_filter_reader_, i); + rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties& props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props_.reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("FileMetaData should not be nullptr"); + } + + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + if (page_index_reader_) { + page_index_reader_->WillNeed(row_group_indices, column_indices, + {/*column_index=*/true, /*offset_index=*/true}); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_ + ? page_index_reader_->RowGroup(current_row_group_index_) + : nullptr; + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasMoreRowGroup() { + return current_row_group_index_ < metadata_->num_row_groups(); + } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor& schema() const { return *metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : file_rewriters_(std::move(rewriters)) { + if (file_rewriters_.empty()) { + throw ParquetException("At least one SingleFileRewriter is required"); + } + auto& schema = file_rewriters_[0]->schema(); + if (std::ranges::any_of( + file_rewriters_ | std::views::drop(1), + [&schema](auto& rewriter) { return !schema.Equals(rewriter->schema()); })) { + throw ParquetException("Input files have different schemas."); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + file_rewriters_[current_rewriter_index_]->WriteRowGroupData( + rg_metadata_builder, page_index_builder, bloom_filter_builder, + total_bytes_written); + } + + bool HasMoreRowGroup() { + while (current_rewriter_index_ < file_rewriters_.size() && + !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) { + file_rewriters_[current_rewriter_index_]->Close(); + ARROW_LOG(DEBUG) << "Finished rewriting file index " << current_rewriter_index_; + ++current_rewriter_index_; + } + return current_rewriter_index_ < file_rewriters_.size(); + } + + void Close() { std::ranges::for_each(file_rewriters_, &SingleFileRewriter::Close); } + + const SchemaDescriptor& schema() const { return file_rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : file_rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_; + size_t current_rewriter_index_{}; +}; + +class JoinRewriter { + public: + explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + if (rewriters_.empty()) { + throw ParquetException("At least one ConcatRewriter is required"); + } + auto row_counts = rewriters_[0]->row_group_row_counts(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (auto current_row_counts = rewriters_[i]->row_group_row_counts(); + row_counts != current_row_counts) { + auto vecToString = [](const std::vector<int64_t>& v) { + if (v.empty()) { + return std::string("[]"); + } + std::ostringstream oss; + oss << "[" << v[0]; + // TODO(anyone): use std::format and std::views::join_with when C++23 available + for (const auto& val : v | std::views::drop(1)) { + oss << ", " << val; + } + oss << "]"; + return oss.str(); + }; + throw ParquetException( + "The number of rows in each block must match! No.0 blocks row counts: ", + vecToString(row_counts), ", No.", i, + " blocks row counts: ", vecToString(current_row_counts)); + } + } + + std::unordered_set<std::string> column_paths; + schema::NodeVector fields; + + for (auto& rewriter : rewriters_) { + const SchemaDescriptor& schema_desc = rewriter->schema(); + + for (int i = 0; i < schema_desc.num_columns(); ++i) { + auto path = schema_desc.Column(i)->path()->ToDotString(); + if (auto [_, inserted] = column_paths.emplace(path); !inserted) { + // TODO(HuaHuaY): support choose one column from columns with same path + throw ParquetException("NotImplemented, files have same column path: ", path); + } + } + + const auto& group_node = schema_desc.group_node(); + for (int i = 0; i < group_node->field_count(); ++i) { + fields.push_back(group_node->field(i)); + } + } + + auto new_root = schema::GroupNode::Make(rewriters_[0]->schema().name(), + Repetition::REQUIRED, fields); + schema_.Init(new_root); + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + for (auto& rewriter : rewriters_) { + rewriter->WriteRowGroupData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + } + } + + bool HasMoreRowGroup() { + return std::ranges::all_of(rewriters_, &ConcatRewriter::HasMoreRowGroup); + } + + void Close() { std::ranges::for_each(rewriters_, &ConcatRewriter::Close); } + + const SchemaDescriptor& schema() const { return schema_; } + + private: + std::vector<std::unique_ptr<ConcatRewriter>> rewriters_; + SchemaDescriptor schema_; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr<ParquetFileRewriter::Contents> Open( + std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) { + if (sources.size() != sources_metadata.size() || + // TODO(anyone): use std::views::zip when C++23 available + std::ranges::any_of(std::views::iota(0u, sources.size()), [&](size_t i) { + return sources[i].size() != sources_metadata[i].size(); + })) { + throw ParquetException( + "The number of sources and sources_metadata must be the same"); + } + std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile( + std::move(sources), std::move(sink), std::move(sources_metadata), + std::move(sink_metadata), std::move(props))); + return result; + } + + void Close() override { + if (rewriter_) { + rewriter_->Close(); + rewriter_.reset(); + } + } + + void Rewrite() override { + while (rewriter_->HasMoreRowGroup()) { + auto& rg_metadata_builder = *metadata_builder_->AppendRowGroup(); + if (page_index_builder_) { + page_index_builder_->AppendRowGroup(); + } + if (bloom_filter_builder_) { + bloom_filter_builder_->AppendRowGroup(); + } + int64_t total_bytes_written = 0; + rewriter_->WriteRowGroupData(rg_metadata_builder, page_index_builder_.get(), + bloom_filter_builder_.get(), total_bytes_written); + rg_metadata_builder.Finish(total_bytes_written); + } + if (page_index_builder_) { + page_index_builder_->Finish(); + auto [column_index_locations, offset_index_locations] = + page_index_builder_->WriteTo(sink_.get()); + metadata_builder_->SetIndexLocations(IndexKind::kColumnIndex, + column_index_locations); + metadata_builder_->SetIndexLocations(IndexKind::kOffsetIndex, + offset_index_locations); + } + if (bloom_filter_builder_) { + auto bloom_filter_locations = bloom_filter_builder_->WriteTo(sink_.get()); + metadata_builder_->SetIndexLocations(IndexKind::kBloomFilter, + bloom_filter_locations); + } + + auto file_metadata = metadata_builder_->Finish(sink_metadata_); + WriteFileMetaData(*file_metadata, sink_.get()); + } + + private: + GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) + : sink_(std::move(sink)), + props_(std::move(props)), + sink_metadata_(std::move(sink_metadata)) { + std::vector<std::unique_ptr<ConcatRewriter>> rewriters; + rewriters.reserve(sources.size()); + for (size_t i = 0; i < sources.size(); ++i) { + std::vector<std::unique_ptr<SingleFileRewriter>> concat_rewriters; + concat_rewriters.reserve(sources[i].size()); + for (size_t j = 0; j < sources[i].size(); ++j) { + concat_rewriters.emplace_back(std::make_unique<SingleFileRewriter>( + std::move(sources[i][j]), sink_, std::move(sources_metadata[i][j]), *props_)); + } + rewriters.emplace_back( + std::make_unique<ConcatRewriter>(std::move(concat_rewriters))); + } + rewriter_ = std::make_unique<JoinRewriter>(std::move(rewriters)); + + if (props_->writer_properties()->file_encryption_properties() == nullptr) { + // Unencrypted parquet files always start with PAR1 + PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); + } else { Review Comment: GeneratedFile writes the PAR1 magic bytes to sink_ without checking the current sink position. If sink_ isn’t empty / is already advanced, this will create an invalid file. Consider mirroring ParquetFileWriter behavior (check sink_->Tell() == 0 and throw if appending is attempted). ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,609 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <algorithm> +#include <memory> +#include <numeric> +#include <ranges> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "arrow/util/logging.h" +#include "parquet/bloom_filter.h" // IWYU pragma: keep +#include "parquet/bloom_filter_reader.h" +#include "parquet/bloom_filter_writer.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/index_location.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" +#include "parquet/schema.h" + +namespace parquet { + +namespace { +void CopyStream(std::shared_ptr<ArrowInputStream> from, + std::shared_ptr<ArrowOutputStream> to, int64_t size, + ::arrow::MemoryPool* pool) { + int64_t bytes_copied = 0; + if (from->supports_zero_copy()) { + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied)); + if (buffer->size() == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size())); + bytes_copied += buffer->size(); + } + return; + } + + std::shared_ptr<ResizableBuffer> buffer = + AllocateBuffer(pool, kDefaultOutputStreamSize); + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW( + auto read_size, + from->Read(std::min(size - bytes_copied, buffer->size()), &buffer)); + if (read_size == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); + bytes_copied += read_size; + } +} +} // namespace + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class ColumnChunkRewriter { + public: + ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::unique_ptr<ColumnChunkMetaData> metadata, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader, + int column_ordinal) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + metadata_(std::move(metadata)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + column_ordinal_(column_ordinal) { + if (metadata_ == nullptr) { + throw ParquetException("ColumnChunkMetaData should not be nullptr"); + } + } + + void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + // TODO(HuaHuaY): add else branch to rewrite column chunk with new encoding, + // compression, etc. + bool fast_copy = true; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->start_offset(); + int64_t total_uncompressed_size = metadata_->total_uncompressed_size(); + int64_t total_compressed_size = metadata_->total_compressed_size(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->start_offset(), total_compressed_size); + CopyStream(stream, sink_, total_compressed_size, props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift); + + if (page_index_reader_ != nullptr && page_index_builder != nullptr) { + auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_); + auto offset_index = page_index_reader_->GetOffsetIndex(column_ordinal_); + if (column_index != nullptr) { + page_index_builder->SetColumnIndex(column_ordinal_, column_index); + } + if (offset_index != nullptr) { + page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, shift); + } + } + + if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) { + auto bloom_filter = bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_); + if (bloom_filter != nullptr) { + bloom_filter_builder->InsertBloomFilter(column_ordinal_, + std::move(bloom_filter)); + } + } + + total_bytes_written += total_uncompressed_size; + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ColumnChunkMetaData> metadata_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + int column_ordinal_; +}; + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("RowGroupMetaData should not be nullptr"); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + rg_metadata_builder.set_num_rows(metadata_->num_rows()); + + bool fast_copy = metadata_->file_offset() != 0; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size(), props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift); + + auto column_index = + page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : nullptr; + auto offset_index = + page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : nullptr; + auto bloom_filter = bloom_filter_reader_ + ? bloom_filter_reader_->GetColumnBloomFilter(i) + : nullptr; + + if (column_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetColumnIndex(i, column_index); + } + if (offset_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + if (bloom_filter != nullptr && bloom_filter_builder != nullptr) { + bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter)); + } + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + ColumnChunkRewriter rewriter(source_, sink_, props_, std::move(cc_metadata), + page_index_reader_, bloom_filter_reader_, i); + rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties& props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props_.reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("FileMetaData should not be nullptr"); + } + + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + if (page_index_reader_) { + page_index_reader_->WillNeed(row_group_indices, column_indices, + {/*column_index=*/true, /*offset_index=*/true}); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); Review Comment: SingleFileRewriter::WriteRowGroupData() creates row_group_metadata but never uses it. This is dead code and may trigger -Wunused-variable warnings in some builds; remove it or use it as intended. ```suggestion ``` ########## cpp/src/parquet/meson.build: ########## @@ -39,6 +39,7 @@ parquet_srcs = files( 'encryption/internal_file_encryptor.cc', 'exception.cc', 'file_reader.cc', + 'file_rewriter.cc', 'file_writer.cc', Review Comment: Meson build adds file_rewriter.cc to parquet_srcs, but the new public header parquet/file_rewriter.h is not added to the Meson install_headers() list. This means Meson installs will ship the implementation but not the API header. Consider adding file_rewriter.h to the installed header set. ########## cpp/src/parquet/metadata.cc: ########## @@ -413,6 +413,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; } + inline int64_t start_offset() const { + return has_dictionary_page() ? dictionary_page_offset() : data_page_offset(); Review Comment: ColumnChunkMetaData::start_offset() ignores index_page_offset(). For files that set index_page_offset without a dictionary page, the start of the column chunk may be before data_page_offset(), so using data_page_offset() here can cause the rewriter to skip bytes and compute an incorrect shift. Consider defining start_offset as the minimum of the available offsets (dictionary, index, data) to ensure it always points at the earliest page in the chunk. ```suggestion int64_t offset = data_page_offset(); if (has_dictionary_page()) { offset = std::min(offset, dictionary_page_offset()); } if (has_index_page()) { offset = std::min(offset, index_page_offset()); } return offset; ``` ########## cpp/src/parquet/bloom_filter_writer.cc: ########## @@ -219,6 +222,26 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) { return curr_rg_bfs.emplace(column_ordinal, std::move(bf)).first->second.get(); } +void BloomFilterBuilderImpl::InsertBloomFilter( + int32_t column_ordinal, std::unique_ptr<BloomFilter> bloom_filter) { + auto opts = properties_->bloom_filter_options(schema_->Column(column_ordinal)->path()); + if (!opts.has_value() || bloom_filter == nullptr) { + return; + } + + CheckState(column_ordinal); + Review Comment: BloomFilterBuilderImpl::InsertBloomFilter() queries schema_->Column(column_ordinal) before CheckState(), so an out-of-range ordinal can crash/throw unexpectedly. It also returns early when bloom filter options are unset (or bloom_filter is nullptr), bypassing the documented state checks (e.g., AppendRowGroup not called / builder finished) and masking incorrect usage. Consider calling CheckState() before accessing schema_/options, and either throw on nullptr input or clearly document the no-op behavior. (CreateBloomFilter() has the same pre-CheckState schema_->Column() pattern and should be updated similarly.) ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,609 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <algorithm> +#include <memory> +#include <numeric> +#include <ranges> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "arrow/util/logging.h" +#include "parquet/bloom_filter.h" // IWYU pragma: keep +#include "parquet/bloom_filter_reader.h" +#include "parquet/bloom_filter_writer.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/index_location.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" +#include "parquet/schema.h" + +namespace parquet { + +namespace { +void CopyStream(std::shared_ptr<ArrowInputStream> from, + std::shared_ptr<ArrowOutputStream> to, int64_t size, + ::arrow::MemoryPool* pool) { + int64_t bytes_copied = 0; + if (from->supports_zero_copy()) { + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied)); + if (buffer->size() == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size())); + bytes_copied += buffer->size(); + } + return; + } + + std::shared_ptr<ResizableBuffer> buffer = + AllocateBuffer(pool, kDefaultOutputStreamSize); + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW( + auto read_size, + from->Read(std::min(size - bytes_copied, buffer->size()), &buffer)); + if (read_size == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); + bytes_copied += read_size; + } +} +} // namespace + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class ColumnChunkRewriter { + public: + ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::unique_ptr<ColumnChunkMetaData> metadata, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader, + int column_ordinal) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + metadata_(std::move(metadata)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + column_ordinal_(column_ordinal) { + if (metadata_ == nullptr) { + throw ParquetException("ColumnChunkMetaData should not be nullptr"); + } + } + + void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + // TODO(HuaHuaY): add else branch to rewrite column chunk with new encoding, + // compression, etc. + bool fast_copy = true; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->start_offset(); + int64_t total_uncompressed_size = metadata_->total_uncompressed_size(); + int64_t total_compressed_size = metadata_->total_compressed_size(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->start_offset(), total_compressed_size); + CopyStream(stream, sink_, total_compressed_size, props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift); + + if (page_index_reader_ != nullptr && page_index_builder != nullptr) { + auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_); + auto offset_index = page_index_reader_->GetOffsetIndex(column_ordinal_); + if (column_index != nullptr) { + page_index_builder->SetColumnIndex(column_ordinal_, column_index); + } + if (offset_index != nullptr) { + page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, shift); + } + } + + if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) { + auto bloom_filter = bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_); + if (bloom_filter != nullptr) { + bloom_filter_builder->InsertBloomFilter(column_ordinal_, + std::move(bloom_filter)); + } + } + + total_bytes_written += total_uncompressed_size; + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ColumnChunkMetaData> metadata_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + int column_ordinal_; +}; + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties& props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("RowGroupMetaData should not be nullptr"); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + rg_metadata_builder.set_num_rows(metadata_->num_rows()); + + bool fast_copy = metadata_->file_offset() != 0; + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_.reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size(), props_.memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift); + + auto column_index = + page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : nullptr; + auto offset_index = + page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : nullptr; + auto bloom_filter = bloom_filter_reader_ + ? bloom_filter_reader_->GetColumnBloomFilter(i) + : nullptr; + + if (column_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetColumnIndex(i, column_index); + } + if (offset_index != nullptr && page_index_builder != nullptr) { + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + if (bloom_filter != nullptr && bloom_filter_builder != nullptr) { + bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter)); + } + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + ColumnChunkRewriter rewriter(source_, sink_, props_, std::move(cc_metadata), + page_index_reader_, bloom_filter_reader_, i); + rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties& props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props_.reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + if (metadata_ == nullptr) { + throw ParquetException("FileMetaData should not be nullptr"); + } + + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + if (page_index_reader_) { + page_index_reader_->WillNeed(row_group_indices, column_indices, + {/*column_index=*/true, /*offset_index=*/true}); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_ + ? page_index_reader_->RowGroup(current_row_group_index_) + : nullptr; + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder, + bloom_filter_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasMoreRowGroup() { + return current_row_group_index_ < metadata_->num_row_groups(); + } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor& schema() const { return *metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties& props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : file_rewriters_(std::move(rewriters)) { + if (file_rewriters_.empty()) { + throw ParquetException("At least one SingleFileRewriter is required"); + } + auto& schema = file_rewriters_[0]->schema(); + if (std::ranges::any_of( + file_rewriters_ | std::views::drop(1), + [&schema](auto& rewriter) { return !schema.Equals(rewriter->schema()); })) { + throw ParquetException("Input files have different schemas."); + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder, + PageIndexBuilder* page_index_builder, + BloomFilterBuilder* bloom_filter_builder, + int64_t& total_bytes_written) { + file_rewriters_[current_rewriter_index_]->WriteRowGroupData( + rg_metadata_builder, page_index_builder, bloom_filter_builder, + total_bytes_written); + } + + bool HasMoreRowGroup() { + while (current_rewriter_index_ < file_rewriters_.size() && + !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) { + file_rewriters_[current_rewriter_index_]->Close(); + ARROW_LOG(DEBUG) << "Finished rewriting file index " << current_rewriter_index_; + ++current_rewriter_index_; + } + return current_rewriter_index_ < file_rewriters_.size(); + } + + void Close() { std::ranges::for_each(file_rewriters_, &SingleFileRewriter::Close); } Review Comment: ConcatRewriter::Close() uses std::ranges::for_each(file_rewriters_, &SingleFileRewriter::Close), but file_rewriters_ holds std::unique_ptr<SingleFileRewriter>. std::invoke won't dispatch a pointer-to-member function on a unique_ptr, so this is likely a compile error. Use a lambda that calls ptr->Close() (and null-check if needed). ```suggestion void Close() { std::ranges::for_each( file_rewriters_, [](const std::unique_ptr<SingleFileRewriter>& rewriter) { if (rewriter) { rewriter->Close(); } }); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
