Copilot commented on code in PR #47775:
URL: https://github.com/apache/arrow/pull/47775#discussion_r2782481214


##########
cpp/src/parquet/bloom_filter_writer.h:
##########
@@ -92,6 +92,18 @@ class PARQUET_EXPORT BloomFilterBuilder {
   ///   - `WriteTo()` has been called
   virtual BloomFilter* CreateBloomFilter(int32_t column_ordinal) = 0;
 
+  /// \brief Insert a BloomFilter of the column ordinal of the current row 
group.
+  ///
+  /// \param column_ordinal Column ordinal for the bloom filter.
+  /// \param bloom_filter The bloom filter to insert.
+  /// \throws ParquetException if any condition is violated:
+  ///   - `AppendRowGroup()` has not been called yet
+  ///   - The column ordinal is out of bound
+  ///   - Bloom filter already exists for the column
+  ///   - `WriteTo()` has been called
+  virtual void InsertBloomFilter(int32_t column_ordinal,
+                                 std::unique_ptr<BloomFilter> bloom_filter) = 
0;

Review Comment:
   BloomFilterBuilder is a public exported interface; adding a new pure-virtual 
method (InsertBloomFilter) is ABI & source-breaking for any external 
implementations. If interface stability matters, consider a non-pure default 
implementation (throwing) or introducing a new interface rather than extending 
the existing vtable.



##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -172,6 +172,7 @@ set(PARQUET_SRCS
     encryption/internal_file_encryptor.cc
     exception.cc
     file_reader.cc
+    file_rewriter.cc
     file_writer.cc

Review Comment:
   The Parquet CMake build is updated to compile file_rewriter.cc and add a new 
test, but other build definitions in this repo (e.g., Meson) also enumerate 
sources/tests. Please ensure the corresponding Meson build files are updated so 
non-CMake builds don’t break.



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(auto read_size, from->Read(size - bytes_copied, 
&buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {}
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += total_uncompressed_size;
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  int column_ordinal_;
+};
+
+class RowGroupRewriter {
+ public:
+  RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+                   std::shared_ptr<ArrowOutputStream> sink,
+                   const RewriterProperties& props,
+                   std::shared_ptr<RowGroupReader> row_group_reader,
+                   std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+                   std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        row_group_reader_(std::move(row_group_reader)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        metadata_(row_group_reader_->metadata()) {}
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+    bool fast_copy = metadata_->file_offset() != 0;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->file_offset();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->file_offset(), 
metadata_->total_compressed_size());
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+        auto column_index =
+            page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : 
nullptr;
+        auto offset_index =
+            page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : 
nullptr;
+        auto bloom_filter = bloom_filter_reader_
+                                ? bloom_filter_reader_->GetColumnBloomFilter(i)
+                                : nullptr;
+
+        if (column_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetColumnIndex(i, column_index);
+        }
+        if (offset_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetOffsetIndex(i, offset_index, shift);
+        }
+        if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += metadata_->total_byte_size();
+    } else {
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        ColumnChunkRewriter rewriter(source_, sink_, props_, 
std::move(cc_metadata),
+                                     page_index_reader_, bloom_filter_reader_, 
i);
+        rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+                                      bloom_filter_builder, 
total_bytes_written);
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::shared_ptr<RowGroupReader> row_group_reader_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+  SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<ArrowOutputStream> sink,
+                     std::shared_ptr<FileMetaData> source_metadata,
+                     const RewriterProperties& props)
+      : source_(source),
+        sink_(std::move(sink)),
+        props_(props),
+        parquet_file_reader_(ParquetFileReader::Open(
+            std::move(source), props_.reader_properties(), 
std::move(source_metadata))),
+        page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+        bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+        metadata_(parquet_file_reader_->metadata()) {
+    std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+    std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+    std::vector<int32_t> column_indices(metadata_->num_columns());
+    std::iota(column_indices.begin(), column_indices.end(), 0);
+    page_index_reader_->WillNeed(row_group_indices, column_indices,
+                                 {/*column_index=*/true, 
/*offset_index=*/true});

Review Comment:
   SingleFileRewriter unconditionally dereferences page_index_reader_ (WillNeed 
and RowGroup). ParquetFileReader::GetPageIndexReader() explicitly may return 
nullptr when the file has no page index, so this can crash (e.g., when 
rewriting files written with page indexes disabled). Guard these calls when 
page_index_reader_ is null, and only prefetch/consume page indexes when the 
output writer_properties() has page_index_enabled().



##########
cpp/src/parquet/metadata.cc:
##########
@@ -1924,6 +1934,23 @@ class 
RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
     return column_builder_ptr;
   }
 
+  void NextColumnChunk(std::unique_ptr<ColumnChunkMetaData> cc_metadata, 
int64_t shift) {
+    auto* column_chunk = &row_group_->columns[next_column_++];
+    column_chunk->__set_file_offset(0);
+    column_chunk->__isset.meta_data = true;
+    column_chunk->meta_data =
+        *static_cast<const format::ColumnMetaData*>(cc_metadata->to_thrift());
+    column_chunk->meta_data.__set_dictionary_page_offset(
+        column_chunk->meta_data.dictionary_page_offset + shift);
+    column_chunk->meta_data.__set_data_page_offset(
+        column_chunk->meta_data.data_page_offset + shift);
+    column_chunk->meta_data.__set_index_page_offset(
+        column_chunk->meta_data.index_page_offset + shift);
+    column_chunk->meta_data.__set_bloom_filter_offset(
+        column_chunk->meta_data.bloom_filter_offset + shift);

Review Comment:
   RowGroupMetaDataBuilderImpl::NextColumnChunk unconditionally calls 
__set_dictionary_page_offset / __set_index_page_offset / 
__set_bloom_filter_offset and shifts them. These fields are optional; calling 
__set_* will mark them present even when the source column chunk didn’t have 
them, producing invalid metadata. Additionally, bloom_filter_offset is later 
recomputed by FileMetaDataBuilder::SetIndexLocations when bloom filters are 
written, so copying/shifting the input bloom_filter_offset can leave incorrect 
offsets when bloom filter writing is disabled (or temporarily wrong values 
before override). Only shift optional page offsets when they were set in the 
source metadata, and do not propagate bloom_filter_offset here—leave it unset 
and let SetIndexLocations set it when/if bloom filters are actually written.
   ```suggestion
       auto& meta_data = column_chunk->meta_data;
       if (meta_data.__isset.dictionary_page_offset) {
         
meta_data.__set_dictionary_page_offset(meta_data.dictionary_page_offset + 
shift);
       }
       meta_data.__set_data_page_offset(meta_data.data_page_offset + shift);
       if (meta_data.__isset.index_page_offset) {
         meta_data.__set_index_page_offset(meta_data.index_page_offset + shift);
       }
       // Do not propagate bloom filter offsets here; they are set later by
       // FileMetaDataBuilder::SetIndexLocations when/if bloom filters are 
written.
       meta_data.__isset.bloom_filter_offset = false;
   ```



##########
cpp/src/parquet/arrow/test_util.h:
##########
@@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, 
Array* result) {
   EXPECT_TRUE(result->Equals(*expected_array));
 }
 
+void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
+               const std::shared_ptr<::arrow::Table>& table,
+               std::shared_ptr<Buffer>& buffer) {
+  // Get schema from table.
+  auto schema = table->schema();
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                     *arrow_writer_properties, 
&parquet_schema));
+  auto schema_node = 
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+  // Write table to buffer.
+  auto sink = CreateOutputStream();
+  auto pool = ::arrow::default_memory_pool();
+  auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, 
arrow_writer_properties,
+                             &arrow_writer));
+  ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
+  ASSERT_OK_AND_ASSIGN(buffer, sink->Finish());
+}

Review Comment:
   WriteFile is defined as a non-inline function in a header. Since 
parquet/arrow/test_util.h is included by multiple test translation units, this 
will cause multiple-definition linker errors. Mark it inline (or move the 
implementation to a .cc) to avoid ODR violations.



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -219,6 +222,21 @@ BloomFilter* 
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
   return curr_rg_bfs.emplace(column_ordinal, 
std::move(bf)).first->second.get();
 }
 
+void BloomFilterBuilderImpl::InsertBloomFilter(
+    int32_t column_ordinal, std::unique_ptr<BloomFilter> bloom_filter) {
+  CheckState(column_ordinal);
+
+  auto& curr_rg_bfs = *bloom_filters_.rbegin();
+  if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) {
+    std::stringstream ss;
+    ss << "Bloom filter already exists for column: " << column_ordinal
+       << ", row group: " << (bloom_filters_.size() - 1);
+    throw ParquetException(ss.str());
+  }
+
+  curr_rg_bfs.emplace(column_ordinal, std::move(bloom_filter));
+}

Review Comment:
   InsertBloomFilter allows inserting a null bloom_filter (unique_ptr), but 
WriteTo() later unconditionally dereferences the stored filter. Validate 
bloom_filter is non-null (and consider rejecting unsupported option cases 
similarly to CreateBloomFilter()) to avoid a nullptr dereference at write time.



##########
cpp/src/parquet/properties.h:
##########
@@ -77,7 +77,7 @@ class PARQUET_EXPORT ReaderProperties {
   MemoryPool* memory_pool() const { return pool_; }
 
   std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> 
source,
-                                              int64_t start, int64_t 
num_bytes);
+                                              int64_t start, int64_t 
num_bytes) const;
 

Review Comment:
   Changing ReaderProperties::GetStream to be const changes the mangled symbol 
and will break ABI for downstream code compiled against earlier versions of the 
library. If ABI compatibility matters here, consider keeping the old non-const 
overload (forwarding to the const implementation) instead of changing the 
existing signature in-place.
   ```suggestion
   
     // Non-const overload kept for ABI compatibility. It forwards to the const
     // implementation introduced in a later version.
     std::shared_ptr<ArrowInputStream> 
GetStream(std::shared_ptr<ArrowInputFile> source,
                                                 int64_t start, int64_t 
num_bytes) {
       return static_cast<const ReaderProperties*>(this)->GetStream(source, 
start, num_bytes);
     }
   ```



##########
cpp/src/parquet/metadata.h:
##########
@@ -160,6 +161,8 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::optional<IndexLocation> GetOffsetIndexLocation() const;
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
+  const void* to_thrift() const;
+

Review Comment:
   ColumnChunkMetaData::to_thrift() exposes an untyped pointer to an internal 
Thrift struct as part of the public metadata API, which couples consumers to 
internal representation and is easy to misuse/UB (wrong cast / lifetime 
assumptions). Prefer an internal-only accessor, or return a typed 
reference/pointer to the concrete thrift type in an internal header, or provide 
a dedicated cloning/copy helper on the builder to avoid exposing raw thrift at 
all.



##########
cpp/src/parquet/page_index.h:
##########
@@ -370,6 +371,12 @@ class PARQUET_EXPORT PageIndexBuilder {
   /// the PageIndexBuilder.
   virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0;
 
+  virtual void SetColumnIndex(int32_t i,
+                              const std::shared_ptr<ColumnIndex>& 
column_index) = 0;
+
+  virtual void SetOffsetIndex(int32_t i, const std::shared_ptr<OffsetIndex>& 
offset_index,
+                              int64_t shift) = 0;
+

Review Comment:
   PageIndexBuilder is a public exported interface; adding new pure-virtual 
methods (SetColumnIndex / SetOffsetIndex) is an ABI & source-breaking change 
for any external implementations of PageIndexBuilder. If preserving interface 
stability is important, consider providing non-pure default implementations 
(that throw) or adding these APIs via a separate interface/extension point 
instead of changing the vtable.



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(auto read_size, from->Read(size - bytes_copied, 
&buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {}
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }

Review Comment:
   The new rewriter code path copies page indexes 
(SetColumnIndex/SetOffsetIndex) and bloom filters (InsertBloomFilter) when 
enabled, but the added Arrow-level roundtrip tests only validate the decoded 
table contents. Consider extending tests to assert that rewritten files 
actually contain the expected page index / bloom filter structures (e.g., via 
ParquetFileReader::GetPageIndexReader / GetBloomFilterReader and checking 
non-null indexes/locations), so regressions in metadata copying/offset shifting 
are caught.



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(auto read_size, from->Read(size - bytes_copied, 
&buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;

Review Comment:
   CopyStream’s non-zero-copy path calls InputStream::Read(size - bytes_copied, 
&buffer). For Arrow’s ResizableBuffer overload this can resize the buffer to 
the full remaining size, potentially allocating very large buffers for big 
column chunks/row groups. Read in fixed-size chunks (e.g., min(remaining, 
buffer->size()) and use the pointer-based Read overload) to keep memory bounded.



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(auto read_size, from->Read(size - bytes_copied, 
&buffer));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int column_ordinal)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        column_ordinal_(column_ordinal) {}
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    // TODO(HuaHuaY): add else branch to rewrite column chunk with new 
encoding,
+    // compression, etc.
+    bool fast_copy = true;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+      int64_t total_uncompressed_size = metadata_->total_uncompressed_size();
+      int64_t total_compressed_size = metadata_->total_compressed_size();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->start_offset(), total_compressed_size);
+      CopyStream(stream, sink_, total_compressed_size, props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto column_index = 
page_index_reader_->GetColumnIndex(column_ordinal_);
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (column_index != nullptr) {
+          page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+        }
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+        auto bloom_filter = 
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+        if (bloom_filter != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(column_ordinal_,
+                                                  std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += total_uncompressed_size;
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  int column_ordinal_;
+};
+
+class RowGroupRewriter {
+ public:
+  RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+                   std::shared_ptr<ArrowOutputStream> sink,
+                   const RewriterProperties& props,
+                   std::shared_ptr<RowGroupReader> row_group_reader,
+                   std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+                   std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        row_group_reader_(std::move(row_group_reader)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        metadata_(row_group_reader_->metadata()) {}
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+    bool fast_copy = metadata_->file_offset() != 0;
+    if (fast_copy) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->file_offset();
+
+      auto stream = props_.reader_properties().GetStream(
+          source_, metadata_->file_offset(), 
metadata_->total_compressed_size());
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+        auto column_index =
+            page_index_reader_ ? page_index_reader_->GetColumnIndex(i) : 
nullptr;
+        auto offset_index =
+            page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) : 
nullptr;
+        auto bloom_filter = bloom_filter_reader_
+                                ? bloom_filter_reader_->GetColumnBloomFilter(i)
+                                : nullptr;
+
+        if (column_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetColumnIndex(i, column_index);
+        }
+        if (offset_index != nullptr && page_index_builder != nullptr) {
+          page_index_builder->SetOffsetIndex(i, offset_index, shift);
+        }
+        if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+          bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+        }
+      }
+
+      total_bytes_written += metadata_->total_byte_size();
+    } else {
+      for (int i = 0; i < metadata_->num_columns(); ++i) {
+        auto cc_metadata = metadata_->ColumnChunk(i);
+        ColumnChunkRewriter rewriter(source_, sink_, props_, 
std::move(cc_metadata),
+                                     page_index_reader_, bloom_filter_reader_, 
i);
+        rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+                                      bloom_filter_builder, 
total_bytes_written);
+      }
+    }
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::shared_ptr<RowGroupReader> row_group_reader_;
+  std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+  std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+  const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+  SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<ArrowOutputStream> sink,
+                     std::shared_ptr<FileMetaData> source_metadata,
+                     const RewriterProperties& props)
+      : source_(source),
+        sink_(std::move(sink)),
+        props_(props),
+        parquet_file_reader_(ParquetFileReader::Open(
+            std::move(source), props_.reader_properties(), 
std::move(source_metadata))),
+        page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+        bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+        metadata_(parquet_file_reader_->metadata()) {
+    std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+    std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+    std::vector<int32_t> column_indices(metadata_->num_columns());
+    std::iota(column_indices.begin(), column_indices.end(), 0);
+    page_index_reader_->WillNeed(row_group_indices, column_indices,
+                                 {/*column_index=*/true, 
/*offset_index=*/true});
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    if (current_row_group_index_ >= metadata_->num_row_groups()) {
+      std::stringstream ss;
+      ss << "Trying to read row group " << current_row_group_index_
+         << " but file only has " << metadata_->num_row_groups() << " row 
groups";
+      throw ParquetException(ss.str());
+    }
+    auto row_group_metadata = metadata_->RowGroup(current_row_group_index_);
+    auto row_group_reader = 
parquet_file_reader_->RowGroup(current_row_group_index_);
+    auto page_index_reader = 
page_index_reader_->RowGroup(current_row_group_index_);
+    auto bloom_filter_reader = 
bloom_filter_reader_.RowGroup(current_row_group_index_);
+    RowGroupRewriter rewriter(source_, sink_, props_, 
std::move(row_group_reader),
+                              std::move(page_index_reader),
+                              std::move(bloom_filter_reader));
+    rewriter.WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                               bloom_filter_builder, total_bytes_written);
+    ++current_row_group_index_;
+  }
+
+  bool HasMoreRowGroup() {
+    return current_row_group_index_ < metadata_->num_row_groups();
+  }
+
+  void Close() { parquet_file_reader_->Close(); }
+
+  const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    int num_row_groups = metadata_->num_row_groups();
+    std::vector<int64_t> row_counts;
+    row_counts.reserve(num_row_groups);
+    for (int i = 0; i < num_row_groups; ++i) {
+      row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::shared_ptr<ArrowInputFile> source_;
+  std::shared_ptr<ArrowOutputStream> sink_;
+  const RewriterProperties& props_;
+  std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+  std::shared_ptr<PageIndexReader> page_index_reader_;
+  BloomFilterReader& bloom_filter_reader_;
+  std::shared_ptr<FileMetaData> metadata_;
+  int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+  explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> 
rewriters)
+      : file_rewriters_(std::move(rewriters)) {
+    auto& schema = file_rewriters_[0]->schema();
+    if (std::ranges::any_of(
+            file_rewriters_ | std::views::drop(1),
+            [&schema](auto& rewriter) { return 
!schema.Equals(rewriter->schema()); })) {
+      throw ParquetException("Input files have different schemas.");
+    }
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+        rg_metadata_builder, page_index_builder, bloom_filter_builder,
+        total_bytes_written);
+  }
+
+  bool HasMoreRowGroup() {
+    while (current_rewriter_index_ < file_rewriters_.size() &&
+           !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+      file_rewriters_[current_rewriter_index_]->Close();
+      ARROW_LOG(DEBUG) << "Finished rewriting file index " << 
current_rewriter_index_;
+      ++current_rewriter_index_;
+    }
+    return current_rewriter_index_ < file_rewriters_.size();
+  }
+
+  void Close() { std::ranges::for_each(file_rewriters_, 
&SingleFileRewriter::Close); }
+
+  const SchemaDescriptor& schema() const { return 
file_rewriters_[0]->schema(); }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    std::vector<int64_t> row_counts;
+    for (auto& rewriter : file_rewriters_) {
+      auto count = rewriter->row_group_row_counts();
+      row_counts.insert(row_counts.end(), count.begin(), count.end());
+    }
+    return row_counts;
+  }
+
+ private:
+  std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_;
+  size_t current_rewriter_index_{};
+};
+
+class JoinRewriter {
+ public:
+  explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters)
+      : rewriters_(std::move(rewriters)) {
+    auto row_counts = rewriters_[0]->row_group_row_counts();
+    for (size_t i = 1; i < rewriters_.size(); ++i) {
+      if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
+          row_counts != current_row_counts) {
+        auto vecToString = [](const std::vector<int64_t>& v) {
+          if (v.empty()) {
+            return std::string("[]");
+          }
+          std::ostringstream oss;
+          oss << "[" << v[0];
+          // TODO(anyone): use std::format and std::views::join_with when 
C++23 available
+          for (const auto& val : v | std::views::drop(1)) {
+            oss << ", " << val;
+          }
+          oss << "]";
+          return oss.str();
+        };
+        throw ParquetException(
+            "The number of rows in each block must match! No.0 blocks row 
counts: ",
+            vecToString(row_counts), ", No.", i,
+            " blocks row counts: ", vecToString(current_row_counts));
+      }
+    }
+
+    std::unordered_set<std::string> column_paths;
+    schema::NodeVector fields;
+
+    for (auto& rewriter : rewriters_) {
+      const SchemaDescriptor& schema_desc = rewriter->schema();
+
+      for (int i = 0; i < schema_desc.num_columns(); ++i) {
+        auto path = schema_desc.Column(i)->path()->ToDotString();
+        if (auto [_, inserted] = column_paths.emplace(path); !inserted) {
+          // TODO(HuaHuaY): support choose one column from columns with same 
path
+          throw ParquetException("NotImplemented, files have same column path: 
", path);
+        }
+      }
+
+      const auto& group_node = schema_desc.group_node();
+      for (int i = 0; i < group_node->field_count(); ++i) {
+        fields.push_back(group_node->field(i));
+      }
+    }
+
+    auto new_root = schema::GroupNode::Make(rewriters_[0]->schema().name(),
+                                            Repetition::REQUIRED, fields);
+    schema_.Init(new_root);
+  }
+
+  void WriteRowGroupData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                         PageIndexBuilder* page_index_builder,
+                         BloomFilterBuilder* bloom_filter_builder,
+                         int64_t& total_bytes_written) {
+    for (auto& rewriter : rewriters_) {
+      rewriter->WriteRowGroupData(rg_metadata_builder, page_index_builder,
+                                  bloom_filter_builder, total_bytes_written);
+    }
+  }
+
+  bool HasMoreRowGroup() {
+    return std::ranges::all_of(rewriters_, &ConcatRewriter::HasMoreRowGroup);
+  }
+
+  void Close() { std::ranges::for_each(rewriters_, &ConcatRewriter::Close); }
+
+  const SchemaDescriptor& schema() const { return schema_; }
+
+  std::vector<int64_t> row_group_row_counts() const {
+    return rewriters_[0]->row_group_row_counts();
+  }
+
+ private:
+  std::vector<std::unique_ptr<ConcatRewriter>> rewriters_;
+  SchemaDescriptor schema_;
+};
+
+// ----------------------------------------------------------------------
+// GeneratedFile
+
+class GeneratedFile : public ParquetFileRewriter::Contents {
+ public:
+  static std::unique_ptr<ParquetFileRewriter::Contents> Open(
+      std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
+      std::shared_ptr<ArrowOutputStream> sink,
+      std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
+      std::shared_ptr<const KeyValueMetadata> sink_metadata,
+      std::shared_ptr<RewriterProperties> props) {
+    if (sources.empty() || sources[0].empty()) {
+      throw ParquetException("At least one source file is required");
+    }

Review Comment:
   GeneratedFile::Open validates sources[0] is non-empty but doesn’t validate 
the other outer "blocks". If sources[i] is empty for i>0, GeneratedFile’s 
constructor will build a ConcatRewriter with zero SingleFileRewriters and 
ConcatRewriter will dereference file_rewriters_[0], causing a crash. Validate 
that every sources[i] is non-empty (and the corresponding sources_metadata[i] 
too).



##########
cpp/src/parquet/metadata.cc:
##########
@@ -1924,6 +1934,23 @@ class 
RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
     return column_builder_ptr;
   }
 
+  void NextColumnChunk(std::unique_ptr<ColumnChunkMetaData> cc_metadata, 
int64_t shift) {
+    auto* column_chunk = &row_group_->columns[next_column_++];
+    column_chunk->__set_file_offset(0);
+    column_chunk->__isset.meta_data = true;

Review Comment:
   The new NextColumnChunk overload doesn’t enforce the same bounds checks as 
NextColumnChunk() (it increments next_column_ and writes into 
row_group_->columns without verifying next_column_ < num_columns()). This can 
write past the row group column vector if misused. Add the same schema 
column-count validation and error message as the existing NextColumnChunk().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to