adamreeve commented on code in PR #47775:
URL: https://github.com/apache/arrow/pull/47775#discussion_r3090767926


##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,818 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/compression.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(auto read_size,
+                            from->Read(std::min(size - bytes_copied, 
buffer->size()),
+                                       buffer->mutable_data()));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class PagesRewriter {
+ public:
+  PagesRewriter(const RewriterProperties& props, std::unique_ptr<PageReader> 
page_reader,
+                std::unique_ptr<PageWriter> page_writer,
+                std::shared_ptr<OffsetIndex> original_offset_index)
+      : props_(props),
+        page_reader_(std::move(page_reader)),
+        page_writer_(std::move(page_writer)),
+        original_offset_index_(std::move(original_offset_index)) {}
+
+  void WritePages() {
+    bool has_dictionary = false;
+    bool fallback = false;
+    size_t page_ordinal = 0;
+    auto page = page_reader_->NextPage();
+    while (page != nullptr) {
+      switch (page->type()) {
+        case parquet::PageType::DICTIONARY_PAGE: {
+          WriteDictionaryPage(static_cast<const DictionaryPage&>(*page));
+          has_dictionary = true;
+          break;
+        }
+        case parquet::PageType::DATA_PAGE: {
+          auto& data_page = static_cast<const DataPageV1&>(*page);
+          if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+            fallback = true;
+          }
+          WriteDataPageV1(data_page, page_ordinal);
+          page_ordinal++;
+          break;
+        }
+        case parquet::PageType::DATA_PAGE_V2: {
+          auto& data_page = static_cast<const DataPageV2&>(*page);
+          if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+            fallback = true;
+          }
+          WriteDataPageV2(data_page, page_ordinal);
+          page_ordinal++;
+          break;
+        }
+        default: {
+          ARROW_LOG(DEBUG) << "Unsupported page type: " << 
static_cast<int>(page->type());
+          break;
+        }
+      }
+      page = page_reader_->NextPage();
+    }
+    page_writer_->Close(has_dictionary, has_dictionary && fallback);
+  }
+
+  int64_t total_uncompressed_size() const { return total_uncompressed_size_; }
+
+ private:
+  void WriteDictionaryPage(const DictionaryPage& dict_page) {
+    total_uncompressed_size_ += page_writer_->WriteDictionaryPage(dict_page);
+  }
+
+  void WriteDataPageV1(const DataPageV1& data_page, const size_t page_ordinal) 
{
+    std::shared_ptr<Buffer> compressed_data;
+    if (page_writer_->has_compressor()) {
+      auto buffer = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(props_.memory_pool(), data_page.size()));
+      page_writer_->Compress(*data_page.buffer(), buffer.get());
+      compressed_data = std::move(buffer);
+    } else {
+      compressed_data = data_page.buffer();
+    }
+    auto first_row_index =
+        original_offset_index_
+            ? 
std::optional{original_offset_index_->page_locations()[page_ordinal]
+                                .first_row_index}
+            : std::nullopt;
+    SizeStatistics size_statistics;
+    size_statistics.unencoded_byte_array_data_bytes =
+        original_offset_index_ &&
+                
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+            ? std::optional{original_offset_index_
+                                
->unencoded_byte_array_data_bytes()[page_ordinal]}
+            : std::nullopt;
+    DataPageV1 new_page(compressed_data, data_page.num_values(), 
data_page.encoding(),
+                        data_page.definition_level_encoding(),
+                        data_page.repetition_level_encoding(),
+                        data_page.uncompressed_size(), data_page.statistics(),
+                        first_row_index, size_statistics);
+    total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+  }
+
+  void WriteDataPageV2(const DataPageV2& data_page, const size_t page_ordinal) 
{
+    int32_t levels_byte_len = data_page.repetition_levels_byte_length() +
+                              data_page.definition_levels_byte_length();
+    bool page_is_compressed = false;
+    std::shared_ptr<Buffer> output_buffer;
+    if (page_writer_->has_compressor() && data_page.size() > levels_byte_len) {
+      auto values_buffer = SliceBuffer(data_page.buffer(), levels_byte_len);
+      auto compressed_values = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(props_.memory_pool(), values_buffer->size()));
+      page_writer_->Compress(*values_buffer, compressed_values.get());
+      if (compressed_values->size() < values_buffer->size()) {
+        page_is_compressed = true;
+        int64_t combined_size = levels_byte_len + compressed_values->size();
+        auto combined = AllocateBuffer(props_.memory_pool(), combined_size);
+        if (levels_byte_len > 0) {
+          std::memcpy(combined->mutable_data(), data_page.data(), 
levels_byte_len);
+        }
+        std::memcpy(combined->mutable_data() + levels_byte_len, 
compressed_values->data(),
+                    compressed_values->size());
+        output_buffer = std::move(combined);
+      }
+    }
+    if (!page_is_compressed) {
+      output_buffer = data_page.buffer();
+    }
+
+    auto first_row_index =
+        original_offset_index_
+            ? 
std::optional{original_offset_index_->page_locations()[page_ordinal]
+                                .first_row_index}
+            : std::nullopt;
+    SizeStatistics size_statistics;
+    size_statistics.unencoded_byte_array_data_bytes =
+        original_offset_index_ &&
+                
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+            ? std::optional{original_offset_index_
+                                
->unencoded_byte_array_data_bytes()[page_ordinal]}
+            : std::nullopt;
+    DataPageV2 new_page(output_buffer, data_page.num_values(), 
data_page.num_nulls(),
+                        data_page.num_rows(), data_page.encoding(),
+                        data_page.definition_levels_byte_length(),
+                        data_page.repetition_levels_byte_length(),
+                        data_page.uncompressed_size(), page_is_compressed,
+                        data_page.statistics(), first_row_index, 
size_statistics);
+    total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+  }
+
+  const RewriterProperties& props_;
+  std::unique_ptr<PageReader> page_reader_;
+  std::unique_ptr<PageWriter> page_writer_;
+  std::shared_ptr<OffsetIndex> original_offset_index_;
+  int64_t total_uncompressed_size_{0};
+};
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int32_t row_group_ordinal, int32_t column_ordinal, bool 
fast_copy)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        row_group_ordinal_(row_group_ordinal),
+        column_ordinal_(column_ordinal),
+        fast_copy_(fast_copy) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("ColumnChunkMetaData should not be nullptr");
+    }
+    if (metadata_->is_encrypted()) {
+      ParquetException::NYI("Rewriter does not support reading encrypted 
column chunks");
+    }
+  }
+
+  static bool CanFastCopy(const RewriterProperties& props,
+                          const ColumnChunkMetaData& metadata) {
+    Compression::type original_codec = metadata.compression();
+    auto column_path = metadata.path_in_schema();
+    Compression::type new_codec = 
props.writer_properties()->compression(column_path);
+    return (original_codec == new_codec);

Review Comment:
   Should this also consider the encodings used as well as the compression 
type? And page size limits too? Maybe this is OK as a starting point and we add 
support for changing other properties later?



##########
cpp/src/parquet/file_rewriter.h:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+class PARQUET_EXPORT ParquetFileRewriter {
+ public:
+  struct PARQUET_EXPORT Contents {
+    virtual ~Contents() = default;
+    virtual void Close() = 0;
+    virtual void Rewrite() = 0;
+
+    const std::shared_ptr<FileMetaData>& metadata() const { return 
file_metadata_; }
+    std::shared_ptr<FileMetaData> file_metadata_;

Review Comment:
   It seems odd to have a public method that just returns a reference to a 
public member. I can see that `ParquetFileWriter` follows the same pattern, but 
maybe `file_metadata_` should be `protected`?



##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,818 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/compression.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string.h"
+#include "parquet/bloom_filter.h"  // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+                std::shared_ptr<ArrowOutputStream> to, int64_t size,
+                ::arrow::MemoryPool* pool) {
+  int64_t bytes_copied = 0;
+  if (from->supports_zero_copy()) {
+    while (bytes_copied < size) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+      if (buffer->size() == 0) {
+        throw ParquetException("Unexpected end of stream at ", bytes_copied);
+      }
+      PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+      bytes_copied += buffer->size();
+    }
+    return;
+  }
+
+  std::shared_ptr<ResizableBuffer> buffer =
+      AllocateBuffer(pool, kDefaultOutputStreamSize);
+  while (bytes_copied < size) {
+    PARQUET_ASSIGN_OR_THROW(auto read_size,
+                            from->Read(std::min(size - bytes_copied, 
buffer->size()),
+                                       buffer->mutable_data()));
+    if (read_size == 0) {
+      throw ParquetException("Unexpected end of stream at ", bytes_copied);
+    }
+    PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+    bytes_copied += read_size;
+  }
+}
+}  // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+  static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+      RewriterProperties::Builder().build();
+  return default_rewriter_properties;
+}
+
+class PagesRewriter {
+ public:
+  PagesRewriter(const RewriterProperties& props, std::unique_ptr<PageReader> 
page_reader,
+                std::unique_ptr<PageWriter> page_writer,
+                std::shared_ptr<OffsetIndex> original_offset_index)
+      : props_(props),
+        page_reader_(std::move(page_reader)),
+        page_writer_(std::move(page_writer)),
+        original_offset_index_(std::move(original_offset_index)) {}
+
+  void WritePages() {
+    bool has_dictionary = false;
+    bool fallback = false;
+    size_t page_ordinal = 0;
+    auto page = page_reader_->NextPage();
+    while (page != nullptr) {
+      switch (page->type()) {
+        case parquet::PageType::DICTIONARY_PAGE: {
+          WriteDictionaryPage(static_cast<const DictionaryPage&>(*page));
+          has_dictionary = true;
+          break;
+        }
+        case parquet::PageType::DATA_PAGE: {
+          auto& data_page = static_cast<const DataPageV1&>(*page);
+          if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+            fallback = true;
+          }
+          WriteDataPageV1(data_page, page_ordinal);
+          page_ordinal++;
+          break;
+        }
+        case parquet::PageType::DATA_PAGE_V2: {
+          auto& data_page = static_cast<const DataPageV2&>(*page);
+          if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+            fallback = true;
+          }
+          WriteDataPageV2(data_page, page_ordinal);
+          page_ordinal++;
+          break;
+        }
+        default: {
+          ARROW_LOG(DEBUG) << "Unsupported page type: " << 
static_cast<int>(page->type());
+          break;
+        }
+      }
+      page = page_reader_->NextPage();
+    }
+    page_writer_->Close(has_dictionary, has_dictionary && fallback);
+  }
+
+  int64_t total_uncompressed_size() const { return total_uncompressed_size_; }
+
+ private:
+  void WriteDictionaryPage(const DictionaryPage& dict_page) {
+    total_uncompressed_size_ += page_writer_->WriteDictionaryPage(dict_page);
+  }
+
+  void WriteDataPageV1(const DataPageV1& data_page, const size_t page_ordinal) 
{
+    std::shared_ptr<Buffer> compressed_data;
+    if (page_writer_->has_compressor()) {
+      auto buffer = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(props_.memory_pool(), data_page.size()));
+      page_writer_->Compress(*data_page.buffer(), buffer.get());
+      compressed_data = std::move(buffer);
+    } else {
+      compressed_data = data_page.buffer();
+    }
+    auto first_row_index =
+        original_offset_index_
+            ? 
std::optional{original_offset_index_->page_locations()[page_ordinal]
+                                .first_row_index}
+            : std::nullopt;
+    SizeStatistics size_statistics;
+    size_statistics.unencoded_byte_array_data_bytes =
+        original_offset_index_ &&
+                
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+            ? std::optional{original_offset_index_
+                                
->unencoded_byte_array_data_bytes()[page_ordinal]}
+            : std::nullopt;
+    DataPageV1 new_page(compressed_data, data_page.num_values(), 
data_page.encoding(),
+                        data_page.definition_level_encoding(),
+                        data_page.repetition_level_encoding(),
+                        data_page.uncompressed_size(), data_page.statistics(),
+                        first_row_index, size_statistics);
+    total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+  }
+
+  void WriteDataPageV2(const DataPageV2& data_page, const size_t page_ordinal) 
{
+    int32_t levels_byte_len = data_page.repetition_levels_byte_length() +
+                              data_page.definition_levels_byte_length();
+    bool page_is_compressed = false;
+    std::shared_ptr<Buffer> output_buffer;
+    if (page_writer_->has_compressor() && data_page.size() > levels_byte_len) {
+      auto values_buffer = SliceBuffer(data_page.buffer(), levels_byte_len);
+      auto compressed_values = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(props_.memory_pool(), values_buffer->size()));
+      page_writer_->Compress(*values_buffer, compressed_values.get());
+      if (compressed_values->size() < values_buffer->size()) {
+        page_is_compressed = true;
+        int64_t combined_size = levels_byte_len + compressed_values->size();
+        auto combined = AllocateBuffer(props_.memory_pool(), combined_size);
+        if (levels_byte_len > 0) {
+          std::memcpy(combined->mutable_data(), data_page.data(), 
levels_byte_len);
+        }
+        std::memcpy(combined->mutable_data() + levels_byte_len, 
compressed_values->data(),
+                    compressed_values->size());
+        output_buffer = std::move(combined);
+      }
+    }
+    if (!page_is_compressed) {
+      output_buffer = data_page.buffer();
+    }
+
+    auto first_row_index =
+        original_offset_index_
+            ? 
std::optional{original_offset_index_->page_locations()[page_ordinal]
+                                .first_row_index}
+            : std::nullopt;
+    SizeStatistics size_statistics;
+    size_statistics.unencoded_byte_array_data_bytes =
+        original_offset_index_ &&
+                
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+            ? std::optional{original_offset_index_
+                                
->unencoded_byte_array_data_bytes()[page_ordinal]}
+            : std::nullopt;
+    DataPageV2 new_page(output_buffer, data_page.num_values(), 
data_page.num_nulls(),
+                        data_page.num_rows(), data_page.encoding(),
+                        data_page.definition_levels_byte_length(),
+                        data_page.repetition_levels_byte_length(),
+                        data_page.uncompressed_size(), page_is_compressed,
+                        data_page.statistics(), first_row_index, 
size_statistics);
+    total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+  }
+
+  const RewriterProperties& props_;
+  std::unique_ptr<PageReader> page_reader_;
+  std::unique_ptr<PageWriter> page_writer_;
+  std::shared_ptr<OffsetIndex> original_offset_index_;
+  int64_t total_uncompressed_size_{0};
+};
+
+class ColumnChunkRewriter {
+ public:
+  ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+                      std::shared_ptr<ArrowOutputStream> sink,
+                      const RewriterProperties& props,
+                      std::unique_ptr<ColumnChunkMetaData> metadata,
+                      std::shared_ptr<RowGroupPageIndexReader> 
page_index_reader,
+                      std::shared_ptr<RowGroupBloomFilterReader> 
bloom_filter_reader,
+                      int32_t row_group_ordinal, int32_t column_ordinal, bool 
fast_copy)
+      : source_(std::move(source)),
+        sink_(std::move(sink)),
+        props_(props),
+        metadata_(std::move(metadata)),
+        page_index_reader_(std::move(page_index_reader)),
+        bloom_filter_reader_(std::move(bloom_filter_reader)),
+        row_group_ordinal_(row_group_ordinal),
+        column_ordinal_(column_ordinal),
+        fast_copy_(fast_copy) {
+    if (metadata_ == nullptr) {
+      throw ParquetException("ColumnChunkMetaData should not be nullptr");
+    }
+    if (metadata_->is_encrypted()) {
+      ParquetException::NYI("Rewriter does not support reading encrypted 
column chunks");
+    }
+  }
+
+  static bool CanFastCopy(const RewriterProperties& props,
+                          const ColumnChunkMetaData& metadata) {
+    Compression::type original_codec = metadata.compression();
+    auto column_path = metadata.path_in_schema();
+    Compression::type new_codec = 
props.writer_properties()->compression(column_path);
+    return (original_codec == new_codec);
+  }
+
+  void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+                            PageIndexBuilder* page_index_builder,
+                            BloomFilterBuilder* bloom_filter_builder,
+                            int64_t& total_bytes_written) {
+    auto& reader_props = props_.reader_properties();
+    auto& writer_props = *props_.writer_properties();
+    auto stream = reader_props.GetStream(source_, metadata_->start_offset(),
+                                         metadata_->total_compressed_size());
+
+    if (fast_copy_) {
+      PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+      int64_t shift = sink_offset - metadata_->start_offset();
+
+      CopyStream(stream, sink_, metadata_->total_compressed_size(), 
props_.memory_pool());
+      PARQUET_THROW_NOT_OK(stream->Close());
+
+      rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+      if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+        auto offset_index = 
page_index_reader_->GetOffsetIndex(column_ordinal_);
+        if (offset_index != nullptr) {
+          page_index_builder->SetOffsetIndex(column_ordinal_, offset_index, 
shift);
+        }
+      }
+
+      total_bytes_written += metadata_->total_uncompressed_size();

Review Comment:
   There's a call to `std::move(metadata_)` above, so how is this valid? Is 
this not hit in any tests, or am I missing something?



##########
cpp/src/parquet/file_rewriter.h:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+class PARQUET_EXPORT ParquetFileRewriter {
+ public:
+  struct PARQUET_EXPORT Contents {
+    virtual ~Contents() = default;
+    virtual void Close() = 0;
+    virtual void Rewrite() = 0;
+  };
+
+  ParquetFileRewriter();
+  ~ParquetFileRewriter();
+
+  static std::unique_ptr<ParquetFileRewriter> Open(

Review Comment:
   Yeah I think this would be a good improvement, the API at the moment is 
quite complex. Maybe some useful methods that could have simplified APIs could 
be:
   * `OpenSingleFile`
   * `OpenVerticalConcatenator`
   * `OpenHorizontalConcatenator`
   
   All of these and the existing `Open` should have documentation comments to 
explain how their parameters are used.
   
   I think it would also be pretty uncommon to provide values for 
`sources_metadata`, and it's awkward that this always needs to be created with 
the correct shape. Maybe we should wrap it in `std::optional` and default it to 
`std::nullopt`? Or provide overloads that don't have this parameter?
   



##########
cpp/src/parquet/arrow/test_util.h:
##########
@@ -482,6 +488,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, 
Array* result) {
   EXPECT_TRUE(result->Equals(*expected_array));
 }
 
+::arrow::Result<std::shared_ptr<Buffer>> WriteFile(
+    const std::shared_ptr<WriterProperties>& writer_properties,
+    const std::shared_ptr<::arrow::Table>& table) {
+  // Get schema from table.
+  auto schema = table->schema();
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  auto arrow_writer_properties = default_arrow_writer_properties();
+  RETURN_NOT_OK(ToParquetSchema(schema.get(), *writer_properties,
+                                *arrow_writer_properties, &parquet_schema));
+  auto schema_node = 
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

Review Comment:
   This could be simplified by using `FileWriter::Open` rather than 
`FileWriter::Make` below, as the `Open` method handles this creation of the 
Parquet schema for you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to