Copilot commented on code in PR #47775:
URL: https://github.com/apache/arrow/pull/47775#discussion_r2786965172


##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(
+        auto read_size,
+        from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("ColumnChunkMetaData should not be nullptr");
+    }
+  }
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += total_uncompressed_size;
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  int column_ordinal_;
+};
+
+class RowGroupRewriter {
+ public:
+  RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+                   std::shared_ptr<ArrowOutputStream> sink,
+                   const RewriterProperties& props,
+                   std::shared_ptr<RowGroupReader> row_group_reader,
+                   std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+                   std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        row_group_reader_(std::move(row_group_reader)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        metadata_(row_group_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("RowGroupMetaData should not be nullptr");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+    bool fast_copy = metadata_->file_offset() != 0;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->file_offset();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->file_offset(), 
metadata_->total_compressed_size());
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+        auto column_index =
+            page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : 
nullptr;
+        auto offset_index =
+            page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : 
nullptr;
+        auto bloom_filter = bloom_filter_reader_
+                                ? bloom_filter_reader_->GetColumnBloomFilter(i)
+                                : nullptr;
+
+        if (column_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetColumnIndex(i, column_index);
+        }
+        if (offset_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetOffsetIndex(i, offset_index, shift);
+        }
+        if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += metadata_->total_byte_size();
+    } else {
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        ColumnChunkRewriter rewriter(source_, sink_, props_, 
std::move(cc_metadata),
+                                     page_index_reader_, bloom_filter_reader_, 
i);
+        rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+                                      bloom_filter_builder, 
total_bytes_written);
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::shared_ptr<RowGroupReader> row_group_reader_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+  SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<ArrowOutputStream> sink,
+                     std::shared_ptr<FileMetaData> source_metadata,
+                     const RewriterProperties& props)
+      : source_(source),
+        sink_(std::move(sink)),
+        props_(props),
+        parquet_file_reader_(ParquetFileReader::Open(
+            std::move(source), props_.reader_properties(), 
std::move(source_metadata))),
+        page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+        bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+        metadata_(parquet_file_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("FileMetaData should not be nullptr");
+    }
+
+    std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+    std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+    std::vector<int32_t> column_indices(metadata_->num_columns());
+    std::iota(column_indices.begin(), column_indices.end(), 0);
+    if (page_index_reader_) {
+      page_index_reader_->WillNeed(row_group_indices, column_indices,
+                                   {/*column_index=*/true, 
/*offset_index=*/true});
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    if (current_row_group_index_ >= metadata_->num_row_groups()) {
+      std::stringstream ss;
+      ss << "Trying to read row group " << current_row_group_index_
+         << " but file only has " << metadata_->num_row_groups() << " row 
groups";
+      throw ParquetException(ss.str());
+    }
+    auto row_group_metadata = metadata_->RowGroup(current_row_group_index_);
+    auto row_group_reader = 
parquet_file_reader_->RowGroup(current_row_group_index_);
+    auto page_index_reader = page_index_reader_
+                                 ? 
page_index_reader_->RowGroup(current_row_group_index_)
+                                 : nullptr;
+    auto bloom_filter_reader = 
bloom_filter_reader_.RowGroup(current_row_group_index_);
+    RowGroupRewriter rewriter(source_, sink_, props_, 
std::move(row_group_reader),
+                              std::move(page_index_reader),
+                              std::move(bloom_filter_reader));
+    rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                               bloom_filter_builder, total_bytes_written);
+    ++current_row_group_index_;
+  }
+
+  bool HasMoreRowGroup() {
+    return current_row_group_index_ < metadata_->num_row_groups();
+  }
+
+  void Close() { parquet_file_reader_->Close(); }
+
+  const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    int num_row_groups = metadata_->num_row_groups();
+    std::vector<int64_t> row_counts;
+    row_counts.reserve(num_row_groups);
+    for (int i = 0; i < num_row_groups; ++i) {
+      row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+  std::shared_ptr<PageIndexReader> page_index_reader_;
+  BloomFilterReader& bloom_filter_reader_;
+  std::shared_ptr<FileMetaData> metadata_;
+  int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+  explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> 
rewriters)
+      : file_rewriters_(std::move(rewriters)) {
+    if (file_rewriters_.empty()) {
+      throw ParquetException("At least one SingleFileRewriter is required");
+    }
+    auto& schema = file_rewriters_[0]->schema();
+    if (std::ranges::any_of(
+            file_rewriters_ | std::views::drop(1),
+            [&schema](auto& rewriter) { return 
!schema.Equals(rewriter->schema()); })) {
+      throw ParquetException("Input files have different schemas.");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+        rg_metadata_builder, page_index_builder, bloom_filter_builder,
+        total_bytes_written);
+  }
+
+  bool HasMoreRowGroup() {
+    while (current_rewriter_index_ < file_rewriters_.size() &&
+           !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+      file_rewriters_[current_rewriter_index_]->Close();
+      ARROW_LOG(DEBUG) << "Finished rewriting file index " << 
current_rewriter_index_;
+      ++current_rewriter_index_;
+    }
+    return current_rewriter_index_ < file_rewriters_.size();
+  }
+
+  void Close() { std::ranges::for_each(file_rewriters_, 
&SingleFileRewriter::Close); }
+
+  const SchemaDescriptor& schema() const { return 
file_rewriters_[0]->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    std::vector<int64_t> row_counts;
+    for (auto& rewriter : file_rewriters_) {
+      auto count = rewriter->row_group_row_counts();
+      row_counts.insert(row_counts.end(), count.begin(), count.end());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_;
+  size_t current_rewriter_index_{};
+};
+
+class JoinRewriter {
+ public:
+  explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters)
+      : rewriters_(std::move(rewriters)) {
+    if (rewriters_.empty()) {
+      throw ParquetException("At least one ConcatRewriter is required");
+    }
+    auto row_counts = rewriters_[0]->row_group_row_counts();
+    for (size_t i = 1; i < rewriters_.size(); ++i) {
+      if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
+          row_counts != current_row_counts) {
+        auto vecToString = [](const std::vector<int64_t>& v) {
+          if (v.empty()) {
+            return std::string("[]");
+          }
+          std::ostringstream oss;
+          oss << "[" << v[0];
+          // TODO(anyone): use std::format and std::views::join_with when 
C++23 available
+          for (const auto& val : v | std::views::drop(1)) {
+            oss << ", " << val;
+          }
+          oss << "]";
+          return oss.str();
+        };
+        throw ParquetException(
+            "The number of rows in each block must match! No.0 blocks row 
counts: ",
+            vecToString(row_counts), ", No.", i,
+            " blocks row counts: ", vecToString(current_row_counts));
+      }
+    }
+
+    std::unordered_set<std::string> column_paths;
+    schema::NodeVector fields;
+
+    for (auto& rewriter : rewriters_) {
+      const SchemaDescriptor& schema_desc = rewriter->schema();
+
+      for (int i = 0; i < schema_desc.num_columns(); ++i) {
+        auto path = schema_desc.Column(i)->path()->ToDotString();
+        if (auto [_, inserted] = column_paths.emplace(path); !inserted) {
+          // TODO(HuaHuaY): support choose one column from columns with same 
path
+          throw ParquetException("NotImplemented, files have same column path: 
", path);
+        }
+      }
+
+      const auto& group_node = schema_desc.group_node();
+      for (int i = 0; i < group_node->field_count(); ++i) {
+        fields.push_back(group_node->field(i));
+      }
+    }
+
+    auto new_root = schema::GroupNode::Make(rewriters_[0]->schema().name(),
+                                            Repetition::REQUIRED, fields);
+    schema_.Init(new_root);
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    for (auto& rewriter : rewriters_) {
+      rewriter->WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                                  bloom_filter_builder, total_bytes_written);
+    }
+  }
+
+  bool HasMoreRowGroup() {
+    return std::ranges::all_of(rewriters_, &ConcatRewriter::HasMoreRowGroup);
+  }
+
+  void Close() { std::ranges::for_each(rewriters_, &ConcatRewriter::Close); }
+
+  const SchemaDescriptor& schema() const { return schema_; }
+
+ private:
+  std::vector<std::unique_ptr<ConcatRewriter>> rewriters_;
+  SchemaDescriptor schema_;
+};
+
+// ----------------------------------------------------------------------
+// GeneratedFile
+
+class GeneratedFile : public ParquetFileRewriter::Contents {
+ public:
+  static std::unique_ptr<ParquetFileRewriter::Contents> Open(
+      std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
+      std::shared_ptr<ArrowOutputStream> sink,
+      std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
+      std::shared_ptr<const KeyValueMetadata> sink_metadata,
+      std::shared_ptr<RewriterProperties> props) {
+    if (sources.size() != sources_metadata.size() ||
+        // TODO(anyone): use std::views::zip when C++23 available
+        std::ranges::any_of(std::views::iota(0u, sources.size()), [&](size_t 
i) {
+          return sources[i].size() != sources_metadata[i].size();
+        })) {
+      throw ParquetException(
+          "The number of sources and sources_metadata must be the same");
+    }
+    std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile(
+        std::move(sources), std::move(sink), std::move(sources_metadata),
+        std::move(sink_metadata), std::move(props)));
+    return result;
+  }
+
+  void Close() override {
+    if (rewriter_) {
+      rewriter_->Close();
+      rewriter_.reset();
+    }
+  }
+
+  void Rewrite() override {
+    while (rewriter_->HasMoreRowGroup()) {
+      auto& rg_metadata_builder = *metadata_builder_->AppendRowGroup();
+      if (page_index_builder_) {
+        page_index_builder_->AppendRowGroup();
+      }
+      if (bloom_filter_builder_) {
+        bloom_filter_builder_->AppendRowGroup();
+      }
+      int64_t total_bytes_written = 0;
+      rewriter_->WriteRowGroupData(rg_metadata_builder, 
page_index_builder_.get(),
+                                   bloom_filter_builder_.get(), 
total_bytes_written);
+      rg_metadata_builder.Finish(total_bytes_written);
+    }
+    if (page_index_builder_) {
+      page_index_builder_->Finish();
+      auto [column_index_locations, offset_index_locations] =
+          page_index_builder_->WriteTo(sink_.get());
+      metadata_builder_->SetIndexLocations(IndexKind::kColumnIndex,
+                                           column_index_locations);
+      metadata_builder_->SetIndexLocations(IndexKind::kOffsetIndex,
+                                           offset_index_locations);
+    }
+    if (bloom_filter_builder_) {
+      auto bloom_filter_locations = 
bloom_filter_builder_->WriteTo(sink_.get());
+      metadata_builder_->SetIndexLocations(IndexKind::kBloomFilter,
+                                           bloom_filter_locations);
+    }
+
+    auto file_metadata = metadata_builder_->Finish(sink_metadata_);
+    WriteFileMetaData(*file_metadata, sink_.get());
+  }
+
+ private:
+  GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> 
sources,
+                std::shared_ptr<ArrowOutputStream> sink,
+                std::vector<std::vector<std::shared_ptr<FileMetaData>>> 
sources_metadata,
+                std::shared_ptr<const KeyValueMetadata> sink_metadata,
+                std::shared_ptr<RewriterProperties> props)
+      : sink_(std::move(sink)),
+        props_(std::move(props)),
+        sink_metadata_(std::move(sink_metadata)) {
+    std::vector<std::unique_ptr<ConcatRewriter>> rewriters;
+    rewriters.reserve(sources.size());
+    for (size_t i = 0; i < sources.size(); ++i) {
+      std::vector<std::unique_ptr<SingleFileRewriter>> concat_rewriters;
+      concat_rewriters.reserve(sources[i].size());
+      for (size_t j = 0; j < sources[i].size(); ++j) {
+        concat_rewriters.emplace_back(std::make_unique<SingleFileRewriter>(
+            std::move(sources[i][j]), sink_, 
std::move(sources_metadata[i][j]), *props_));
+      }
+      rewriters.emplace_back(
+          std::make_unique<ConcatRewriter>(std::move(concat_rewriters)));
+    }
+    rewriter_ = std::make_unique<JoinRewriter>(std::move(rewriters));
+
+    if (props_->writer_properties()->file_encryption_properties() == nullptr) {
+      // Unencrypted parquet files always start with PAR1
+      PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4));
+    } else {
+      throw ParquetException(
+          "NotImplemented, rewriter does not support to write encrypted 
files.");
+    }
+
+    auto new_schema = rewriter_->schema().schema_root();
+    new_schema_.Init(new_schema);
+    metadata_builder_ =
+        FileMetaDataBuilder::Make(&new_schema_, props_->writer_properties());
+    if (props_->writer_properties()->page_index_enabled()) {
+      page_index_builder_ = PageIndexBuilder::Make(&new_schema_, nullptr);
+    }
+    if (props_->writer_properties()->bloom_filter_enabled()) {
+      bloom_filter_builder_ =
+          BloomFilterBuilder::Make(&new_schema_, 
props_->writer_properties().get());
+    }
+  }
+
+  std::shared_ptr<ArrowOutputStream> sink_;
+  std::shared_ptr<RewriterProperties> props_;
+  std::shared_ptr<const KeyValueMetadata> sink_metadata_;
+  std::unique_ptr<JoinRewriter> rewriter_;
+
+  SchemaDescriptor new_schema_;
+  std::unique_ptr<FileMetaDataBuilder> metadata_builder_;
+  std::unique_ptr<PageIndexBuilder> page_index_builder_;
+  std::unique_ptr<BloomFilterBuilder> bloom_filter_builder_;
+};
+
+// ----------------------------------------------------------------------
+// ParquetFilesRewriter public API
+
+ParquetFileRewriter::ParquetFileRewriter() = default;
+
+ParquetFileRewriter::~ParquetFileRewriter() {
+  try {
+    Close();
+  } catch (...) {
+  }
+}
+
+std::unique_ptr<ParquetFileRewriter> ParquetFileRewriter::Open(
+    std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
+    std::shared_ptr<ArrowOutputStream> sink,
+    std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
+    std::shared_ptr<const KeyValueMetadata> sink_metadata,
+    std::shared_ptr<RewriterProperties> props) {
+  auto contents = GeneratedFile::Open(std::move(sources), std::move(sink),
+                                      std::move(sources_metadata),
+                                      std::move(sink_metadata), 
std::move(props));
+  std::unique_ptr<ParquetFileRewriter> result(new ParquetFileRewriter());
+  result->Open(std::move(contents));
+  return result;
+}
+
+void ParquetFileRewriter::Open(std::unique_ptr<ParquetFileRewriter::Contents> 
contents) {
+  contents_ = std::move(contents);
+}
+
+void ParquetFileRewriter::Close() {
+  if (contents_) {
+    contents_->Close();
+    contents_.reset();
+  }
+}
+
+void ParquetFileRewriter::Rewrite() { contents_->Rewrite(); }

Review Comment:
   ParquetFileRewriter::Rewrite() dereferences contents_ unconditionally. Since 
the default constructor is public, calling Rewrite() before Open() (or after 
Close()) will crash. Consider either making the constructor private (force 
Open()), or add a null check and throw a ParquetException when contents_ is not 
set.
   ```suggestion
   void ParquetFileRewriter::Rewrite() {
     if (!contents_) {
       throw ParquetException(
           "ParquetFileRewriter::Rewrite() called on an uninitialized or closed 
file");
     }
     contents_->Rewrite();
   }
   ```



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(
+        auto read_size,
+        from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("ColumnChunkMetaData should not be nullptr");
+    }
+  }
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += total_uncompressed_size;
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  int column_ordinal_;
+};
+
+class RowGroupRewriter {
+ public:
+  RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+                   std::shared_ptr<ArrowOutputStream> sink,
+                   const RewriterProperties& props,
+                   std::shared_ptr<RowGroupReader> row_group_reader,
+                   std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+                   std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        row_group_reader_(std::move(row_group_reader)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        metadata_(row_group_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("RowGroupMetaData should not be nullptr");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+    bool fast_copy = metadata_->file_offset() != 0;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->file_offset();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->file_offset(), 
metadata_->total_compressed_size());
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+        auto column_index =
+            page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : 
nullptr;
+        auto offset_index =
+            page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : 
nullptr;
+        auto bloom_filter = bloom_filter_reader_
+                                ? bloom_filter_reader_->GetColumnBloomFilter(i)
+                                : nullptr;
+
+        if (column_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetColumnIndex(i, column_index);
+        }
+        if (offset_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetOffsetIndex(i, offset_index, shift);
+        }
+        if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += metadata_->total_byte_size();
+    } else {
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        ColumnChunkRewriter rewriter(source_, sink_, props_, 
std::move(cc_metadata),
+                                     page_index_reader_, bloom_filter_reader_, 
i);
+        rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+                                      bloom_filter_builder, 
total_bytes_written);
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::shared_ptr<RowGroupReader> row_group_reader_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+  SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<ArrowOutputStream> sink,
+                     std::shared_ptr<FileMetaData> source_metadata,
+                     const RewriterProperties& props)
+      : source_(source),
+        sink_(std::move(sink)),
+        props_(props),
+        parquet_file_reader_(ParquetFileReader::Open(
+            std::move(source), props_.reader_properties(), 
std::move(source_metadata))),
+        page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+        bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+        metadata_(parquet_file_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("FileMetaData should not be nullptr");
+    }
+
+    std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+    std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+    std::vector<int32_t> column_indices(metadata_->num_columns());
+    std::iota(column_indices.begin(), column_indices.end(), 0);
+    if (page_index_reader_) {
+      page_index_reader_->WillNeed(row_group_indices, column_indices,
+                                   {/*column_index=*/true, 
/*offset_index=*/true});
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    if (current_row_group_index_ >= metadata_->num_row_groups()) {
+      std::stringstream ss;
+      ss << "Trying to read row group " << current_row_group_index_
+         << " but file only has " << metadata_->num_row_groups() << " row 
groups";
+      throw ParquetException(ss.str());
+    }
+    auto row_group_metadata = metadata_->RowGroup(current_row_group_index_);
+    auto row_group_reader = 
parquet_file_reader_->RowGroup(current_row_group_index_);
+    auto page_index_reader = page_index_reader_
+                                 ? 
page_index_reader_->RowGroup(current_row_group_index_)
+                                 : nullptr;
+    auto bloom_filter_reader = 
bloom_filter_reader_.RowGroup(current_row_group_index_);
+    RowGroupRewriter rewriter(source_, sink_, props_, 
std::move(row_group_reader),
+                              std::move(page_index_reader),
+                              std::move(bloom_filter_reader));
+    rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                               bloom_filter_builder, total_bytes_written);
+    ++current_row_group_index_;
+  }
+
+  bool HasMoreRowGroup() {
+    return current_row_group_index_ < metadata_->num_row_groups();
+  }
+
+  void Close() { parquet_file_reader_->Close(); }
+
+  const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    int num_row_groups = metadata_->num_row_groups();
+    std::vector<int64_t> row_counts;
+    row_counts.reserve(num_row_groups);
+    for (int i = 0; i < num_row_groups; ++i) {
+      row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+  std::shared_ptr<PageIndexReader> page_index_reader_;
+  BloomFilterReader& bloom_filter_reader_;
+  std::shared_ptr<FileMetaData> metadata_;
+  int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+  explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> 
rewriters)
+      : file_rewriters_(std::move(rewriters)) {
+    if (file_rewriters_.empty()) {
+      throw ParquetException("At least one SingleFileRewriter is required");
+    }
+    auto& schema = file_rewriters_[0]->schema();
+    if (std::ranges::any_of(
+            file_rewriters_ | std::views::drop(1),
+            [&schema](auto& rewriter) { return 
!schema.Equals(rewriter->schema()); })) {
+      throw ParquetException("Input files have different schemas.");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+        rg_metadata_builder, page_index_builder, bloom_filter_builder,
+        total_bytes_written);
+  }
+
+  bool HasMoreRowGroup() {
+    while (current_rewriter_index_ < file_rewriters_.size() &&
+           !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+      file_rewriters_[current_rewriter_index_]->Close();
+      ARROW_LOG(DEBUG) << "Finished rewriting file index " << 
current_rewriter_index_;
+      ++current_rewriter_index_;
+    }
+    return current_rewriter_index_ < file_rewriters_.size();
+  }
+
+  void Close() { std::ranges::for_each(file_rewriters_, 
&SingleFileRewriter::Close); }
+
+  const SchemaDescriptor& schema() const { return 
file_rewriters_[0]->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    std::vector<int64_t> row_counts;
+    for (auto& rewriter : file_rewriters_) {
+      auto count = rewriter->row_group_row_counts();
+      row_counts.insert(row_counts.end(), count.begin(), count.end());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_;
+  size_t current_rewriter_index_{};
+};
+
+class JoinRewriter {
+ public:
+  explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters)
+      : rewriters_(std::move(rewriters)) {
+    if (rewriters_.empty()) {
+      throw ParquetException("At least one ConcatRewriter is required");
+    }
+    auto row_counts = rewriters_[0]->row_group_row_counts();
+    for (size_t i = 1; i < rewriters_.size(); ++i) {
+      if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
+          row_counts != current_row_counts) {
+        auto vecToString = [](const std::vector<int64_t>& v) {
+          if (v.empty()) {
+            return std::string("[]");
+          }
+          std::ostringstream oss;
+          oss << "[" << v[0];
+          // TODO(anyone): use std::format and std::views::join_with when 
C++23 available
+          for (const auto& val : v | std::views::drop(1)) {
+            oss << ", " << val;
+          }
+          oss << "]";
+          return oss.str();
+        };
+        throw ParquetException(
+            "The number of rows in each block must match! No.0 blocks row 
counts: ",
+            vecToString(row_counts), ", No.", i,
+            " blocks row counts: ", vecToString(current_row_counts));
+      }
+    }
+
+    std::unordered_set<std::string> column_paths;
+    schema::NodeVector fields;
+
+    for (auto& rewriter : rewriters_) {
+      const SchemaDescriptor& schema_desc = rewriter->schema();
+
+      for (int i = 0; i < schema_desc.num_columns(); ++i) {
+        auto path = schema_desc.Column(i)->path()->ToDotString();
+        if (auto [_, inserted] = column_paths.emplace(path); !inserted) {
+          // TODO(HuaHuaY): support choose one column from columns with same 
path
+          throw ParquetException("NotImplemented, files have same column path: 
", path);
+        }
+      }
+
+      const auto& group_node = schema_desc.group_node();
+      for (int i = 0; i < group_node->field_count(); ++i) {
+        fields.push_back(group_node->field(i));
+      }
+    }
+
+    auto new_root = schema::GroupNode::Make(rewriters_[0]->schema().name(),
+                                            Repetition::REQUIRED, fields);
+    schema_.Init(new_root);
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    for (auto& rewriter : rewriters_) {
+      rewriter->WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                                  bloom_filter_builder, total_bytes_written);
+    }
+  }
+
+  bool HasMoreRowGroup() {
+    return std::ranges::all_of(rewriters_, &ConcatRewriter::HasMoreRowGroup);
+  }
+
+  void Close() { std::ranges::for_each(rewriters_, &ConcatRewriter::Close); }
+
+  const SchemaDescriptor& schema() const { return schema_; }
+
+ private:
+  std::vector<std::unique_ptr<ConcatRewriter>> rewriters_;
+  SchemaDescriptor schema_;
+};
+
+// ----------------------------------------------------------------------
+// GeneratedFile
+
+class GeneratedFile : public ParquetFileRewriter::Contents {
+ public:
+  static std::unique_ptr<ParquetFileRewriter::Contents> Open(
+      std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
+      std::shared_ptr<ArrowOutputStream> sink,
+      std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
+      std::shared_ptr<const KeyValueMetadata> sink_metadata,
+      std::shared_ptr<RewriterProperties> props) {
+    if (sources.size() != sources_metadata.size() ||
+        // TODO(anyone): use std::views::zip when C++23 available
+        std::ranges::any_of(std::views::iota(0u, sources.size()), [&](size_t 
i) {
+          return sources[i].size() != sources_metadata[i].size();
+        })) {
+      throw ParquetException(
+          "The number of sources and sources_metadata must be the same");
+    }
+    std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile(
+        std::move(sources), std::move(sink), std::move(sources_metadata),
+        std::move(sink_metadata), std::move(props)));
+    return result;
+  }
+
+  void Close() override {
+    if (rewriter_) {
+      rewriter_->Close();
+      rewriter_.reset();
+    }
+  }
+
+  void Rewrite() override {
+    while (rewriter_->HasMoreRowGroup()) {
+      auto& rg_metadata_builder = *metadata_builder_->AppendRowGroup();
+      if (page_index_builder_) {
+        page_index_builder_->AppendRowGroup();
+      }
+      if (bloom_filter_builder_) {
+        bloom_filter_builder_->AppendRowGroup();
+      }
+      int64_t total_bytes_written = 0;
+      rewriter_->WriteRowGroupData(rg_metadata_builder, 
page_index_builder_.get(),
+                                   bloom_filter_builder_.get(), 
total_bytes_written);
+      rg_metadata_builder.Finish(total_bytes_written);
+    }
+    if (page_index_builder_) {
+      page_index_builder_->Finish();
+      auto [column_index_locations, offset_index_locations] =
+          page_index_builder_->WriteTo(sink_.get());
+      metadata_builder_->SetIndexLocations(IndexKind::kColumnIndex,
+                                           column_index_locations);
+      metadata_builder_->SetIndexLocations(IndexKind::kOffsetIndex,
+                                           offset_index_locations);
+    }
+    if (bloom_filter_builder_) {
+      auto bloom_filter_locations = 
bloom_filter_builder_->WriteTo(sink_.get());
+      metadata_builder_->SetIndexLocations(IndexKind::kBloomFilter,
+                                           bloom_filter_locations);
+    }
+
+    auto file_metadata = metadata_builder_->Finish(sink_metadata_);
+    WriteFileMetaData(*file_metadata, sink_.get());
+  }
+
+ private:
+  GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> 
sources,
+                std::shared_ptr<ArrowOutputStream> sink,
+                std::vector<std::vector<std::shared_ptr<FileMetaData>>> 
sources_metadata,
+                std::shared_ptr<const KeyValueMetadata> sink_metadata,
+                std::shared_ptr<RewriterProperties> props)
+      : sink_(std::move(sink)),
+        props_(std::move(props)),
+        sink_metadata_(std::move(sink_metadata)) {
+    std::vector<std::unique_ptr<ConcatRewriter>> rewriters;
+    rewriters.reserve(sources.size());
+    for (size_t i = 0; i < sources.size(); ++i) {
+      std::vector<std::unique_ptr<SingleFileRewriter>> concat_rewriters;
+      concat_rewriters.reserve(sources[i].size());
+      for (size_t j = 0; j < sources[i].size(); ++j) {
+        concat_rewriters.emplace_back(std::make_unique<SingleFileRewriter>(
+            std::move(sources[i][j]), sink_, 
std::move(sources_metadata[i][j]), *props_));
+      }
+      rewriters.emplace_back(
+          std::make_unique<ConcatRewriter>(std::move(concat_rewriters)));
+    }
+    rewriter_ = std::make_unique<JoinRewriter>(std::move(rewriters));
+
+    if (props_->writer_properties()->file_encryption_properties() == nullptr) {
+      // Unencrypted parquet files always start with PAR1
+      PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4));
+    } else {

Review Comment:
   GeneratedFile writes the PAR1 magic bytes to sink_ without checking the 
current sink position. If sink_ isn’t empty / is already advanced, this will 
create an invalid file. Consider mirroring ParquetFileWriter behavior (check 
sink_->Tell() == 0 and throw if appending is attempted).



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(
+        auto read_size,
+        from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("ColumnChunkMetaData should not be nullptr");
+    }
+  }
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += total_uncompressed_size;
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  int column_ordinal_;
+};
+
+class RowGroupRewriter {
+ public:
+  RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+                   std::shared_ptr<ArrowOutputStream> sink,
+                   const RewriterProperties& props,
+                   std::shared_ptr<RowGroupReader> row_group_reader,
+                   std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+                   std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        row_group_reader_(std::move(row_group_reader)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        metadata_(row_group_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("RowGroupMetaData should not be nullptr");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+    bool fast_copy = metadata_->file_offset() != 0;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->file_offset();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->file_offset(), 
metadata_->total_compressed_size());
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+        auto column_index =
+            page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : 
nullptr;
+        auto offset_index =
+            page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : 
nullptr;
+        auto bloom_filter = bloom_filter_reader_
+                                ? bloom_filter_reader_->GetColumnBloomFilter(i)
+                                : nullptr;
+
+        if (column_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetColumnIndex(i, column_index);
+        }
+        if (offset_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetOffsetIndex(i, offset_index, shift);
+        }
+        if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += metadata_->total_byte_size();
+    } else {
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        ColumnChunkRewriter rewriter(source_, sink_, props_, 
std::move(cc_metadata),
+                                     page_index_reader_, bloom_filter_reader_, 
i);
+        rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+                                      bloom_filter_builder, 
total_bytes_written);
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::shared_ptr<RowGroupReader> row_group_reader_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+  SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<ArrowOutputStream> sink,
+                     std::shared_ptr<FileMetaData> source_metadata,
+                     const RewriterProperties& props)
+      : source_(source),
+        sink_(std::move(sink)),
+        props_(props),
+        parquet_file_reader_(ParquetFileReader::Open(
+            std::move(source), props_.reader_properties(), 
std::move(source_metadata))),
+        page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+        bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+        metadata_(parquet_file_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("FileMetaData should not be nullptr");
+    }
+
+    std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+    std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+    std::vector<int32_t> column_indices(metadata_->num_columns());
+    std::iota(column_indices.begin(), column_indices.end(), 0);
+    if (page_index_reader_) {
+      page_index_reader_->WillNeed(row_group_indices, column_indices,
+                                   {/*column_index=*/true, 
/*offset_index=*/true});
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    if (current_row_group_index_ >= metadata_->num_row_groups()) {
+      std::stringstream ss;
+      ss << "Trying to read row group " << current_row_group_index_
+         << " but file only has " << metadata_->num_row_groups() << " row 
groups";
+      throw ParquetException(ss.str());
+    }
+    auto row_group_metadata = metadata_->RowGroup(current_row_group_index_);

Review Comment:
   SingleFileRewriter::WriteRowGroupData() creates row_group_metadata but never 
uses it. This is dead code and may trigger -Wunused-variable warnings in some 
builds; remove it or use it as intended.
   ```suggestion
   
   ```



##########
cpp/src/parquet/meson.build:
##########
@@ -39,6 +39,7 @@ parquet_srcs = files(
     'encryption/internal_file_encryptor.cc',
     'exception.cc',
     'file_reader.cc',
+    'file_rewriter.cc',
     'file_writer.cc',

Review Comment:
   Meson build adds file_rewriter.cc to parquet_srcs, but the new public header 
parquet/file_rewriter.h is not added to the Meson install_headers() list. This 
means Meson installs will ship the implementation but not the API header. 
Consider adding file_rewriter.h to the installed header set.



##########
cpp/src/parquet/metadata.cc:
##########
@@ -413,6 +413,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
 
   inline int64_t data_page_offset() const { return 
column_metadata_->data_page_offset; }
 
+  inline int64_t start_offset() const {
+    return has_dictionary_page() ? dictionary_page_offset() : 
data_page_offset();

Review Comment:
   ColumnChunkMetaData::start_offset() ignores index_page_offset(). For files 
that set index_page_offset without a dictionary page, the start of the column 
chunk may be before data_page_offset(), so using data_page_offset() here can 
cause the rewriter to skip bytes and compute an incorrect shift. Consider 
defining start_offset as the minimum of the available offsets (dictionary, 
index, data) to ensure it always points at the earliest page in the chunk.
   ```suggestion
       int64_t offset = data_page_offset();
       if (has_dictionary_page()) {
         offset = std::min(offset, dictionary_page_offset());
       }
       if (has_index_page()) {
         offset = std::min(offset, index_page_offset());
       }
       return offset;
   ```



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -219,6 +222,26 @@ BloomFilter* 
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
   return curr_rg_bfs.emplace(column_ordinal, 
std::move(bf)).first->second.get();
 }
 
+void BloomFilterBuilderImpl::InsertBloomFilter(
+    int32_t column_ordinal, std::unique_ptr<BloomFilter> bloom_filter) {
+  auto opts = 
properties_->bloom_filter_options(schema_->Column(column_ordinal)->path());
+  if (!opts.has_value() || bloom_filter == nullptr) {
+    return;
+  }
+
+  CheckState(column_ordinal);
+

Review Comment:
   BloomFilterBuilderImpl::InsertBloomFilter() queries 
schema_->Column(column_ordinal) before CheckState(), so an out-of-range ordinal 
can crash/throw unexpectedly. It also returns early when bloom filter options 
are unset (or bloom_filter is nullptr), bypassing the documented state checks 
(e.g., AppendRowGroup not called / builder finished) and masking incorrect 
usage. Consider calling CheckState() before accessing schema_/options, and 
either throw on nullptr input or clearly document the no-op behavior. 
(CreateBloomFilter() has the same pre-CheckState schema_->Column() pattern and 
should be updated similarly.)



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(
+        auto read_size,
+        from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("ColumnChunkMetaData should not be nullptr");
+    }
+  }
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += total_uncompressed_size;
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  int column_ordinal_;
+};
+
+class RowGroupRewriter {
+ public:
+  RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+                   std::shared_ptr<ArrowOutputStream> sink,
+                   const RewriterProperties& props,
+                   std::shared_ptr<RowGroupReader> row_group_reader,
+                   std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+                   std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        row_group_reader_(std::move(row_group_reader)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        metadata_(row_group_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("RowGroupMetaData should not be nullptr");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+    bool fast_copy = metadata_->file_offset() != 0;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->file_offset();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->file_offset(), 
metadata_->total_compressed_size());
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+        auto column_index =
+            page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : 
nullptr;
+        auto offset_index =
+            page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : 
nullptr;
+        auto bloom_filter = bloom_filter_reader_
+                                ? bloom_filter_reader_->GetColumnBloomFilter(i)
+                                : nullptr;
+
+        if (column_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetColumnIndex(i, column_index);
+        }
+        if (offset_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetOffsetIndex(i, offset_index, shift);
+        }
+        if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += metadata_->total_byte_size();
+    } else {
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        ColumnChunkRewriter rewriter(source_, sink_, props_, 
std::move(cc_metadata),
+                                     page_index_reader_, bloom_filter_reader_, 
i);
+        rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+                                      bloom_filter_builder, 
total_bytes_written);
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::shared_ptr<RowGroupReader> row_group_reader_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+  SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<ArrowOutputStream> sink,
+                     std::shared_ptr<FileMetaData> source_metadata,
+                     const RewriterProperties& props)
+      : source_(source),
+        sink_(std::move(sink)),
+        props_(props),
+        parquet_file_reader_(ParquetFileReader::Open(
+            std::move(source), props_.reader_properties(), 
std::move(source_metadata))),
+        page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+        bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+        metadata_(parquet_file_reader_->metadata()) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("FileMetaData should not be nullptr");
+    }
+
+    std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+    std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+    std::vector<int32_t> column_indices(metadata_->num_columns());
+    std::iota(column_indices.begin(), column_indices.end(), 0);
+    if (page_index_reader_) {
+      page_index_reader_->WillNeed(row_group_indices, column_indices,
+                                   {/*column_index=*/true, 
/*offset_index=*/true});
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    if (current_row_group_index_ >= metadata_->num_row_groups()) {
+      std::stringstream ss;
+      ss << "Trying to read row group " << current_row_group_index_
+         << " but file only has " << metadata_->num_row_groups() << " row 
groups";
+      throw ParquetException(ss.str());
+    }
+    auto row_group_metadata = metadata_->RowGroup(current_row_group_index_);
+    auto row_group_reader = 
parquet_file_reader_->RowGroup(current_row_group_index_);
+    auto page_index_reader = page_index_reader_
+                                 ? 
page_index_reader_->RowGroup(current_row_group_index_)
+                                 : nullptr;
+    auto bloom_filter_reader = 
bloom_filter_reader_.RowGroup(current_row_group_index_);
+    RowGroupRewriter rewriter(source_, sink_, props_, 
std::move(row_group_reader),
+                              std::move(page_index_reader),
+                              std::move(bloom_filter_reader));
+    rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                               bloom_filter_builder, total_bytes_written);
+    ++current_row_group_index_;
+  }
+
+  bool HasMoreRowGroup() {
+    return current_row_group_index_ < metadata_->num_row_groups();
+  }
+
+  void Close() { parquet_file_reader_->Close(); }
+
+  const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    int num_row_groups = metadata_->num_row_groups();
+    std::vector<int64_t> row_counts;
+    row_counts.reserve(num_row_groups);
+    for (int i = 0; i < num_row_groups; ++i) {
+      row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+  std::shared_ptr<PageIndexReader> page_index_reader_;
+  BloomFilterReader& bloom_filter_reader_;
+  std::shared_ptr<FileMetaData> metadata_;
+  int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+  explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> 
rewriters)
+      : file_rewriters_(std::move(rewriters)) {
+    if (file_rewriters_.empty()) {
+      throw ParquetException("At least one SingleFileRewriter is required");
+    }
+    auto& schema = file_rewriters_[0]->schema();
+    if (std::ranges::any_of(
+            file_rewriters_ | std::views::drop(1),
+            [&schema](auto& rewriter) { return 
!schema.Equals(rewriter->schema()); })) {
+      throw ParquetException("Input files have different schemas.");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+        rg_metadata_builder, page_index_builder, bloom_filter_builder,
+        total_bytes_written);
+  }
+
+  bool HasMoreRowGroup() {
+    while (current_rewriter_index_ < file_rewriters_.size() &&
+           !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+      file_rewriters_[current_rewriter_index_]->Close();
+      ARROW_LOG(DEBUG) << "Finished rewriting file index " << 
current_rewriter_index_;
+      ++current_rewriter_index_;
+    }
+    return current_rewriter_index_ < file_rewriters_.size();
+  }
+
+  void Close() { std::ranges::for_each(file_rewriters_, 
&SingleFileRewriter::Close); }

Review Comment:
   ConcatRewriter::Close() uses std::ranges::for_each(file_rewriters_, 
&SingleFileRewriter::Close), but file_rewriters_ holds 
std::unique_ptr<SingleFileRewriter>. std::invoke won't dispatch a 
pointer-to-member function on a unique_ptr, so this is likely a compile error. 
Use a lambda that calls ptr->Close() (and null-check if needed).
   ```suggestion
     void Close() {
       std::ranges::for_each(
           file_rewriters_,
           [](const std::unique_ptr<SingleFileRewriter>& rewriter) {
             if (rewriter) {
               rewriter->Close();
             }
           });
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to