Copilot commented on code in PR #47775:
URL: https://github.com/apache/arrow/pull/47775#discussion_r2804830959
##########
cpp/src/parquet/metadata.cc:
##########
@@ -2009,6 +2050,11 @@ ColumnChunkMetaDataBuilder*
RowGroupMetaDataBuilder::NextColumnChunk() {
return impl_->NextColumnChunk();
}
+void RowGroupMetaDataBuilder::NextColumnChunk(
+ std::unique_ptr<ColumnChunkMetaData> cc_metadata, int64_t shift) {
+ return impl_->NextColumnChunk(std::move(cc_metadata), shift);
Review Comment:
The return statement is unnecessary. The method has a `void` return type, so
the return statement on line 2055 should just be
`impl_->NextColumnChunk(std::move(cc_metadata), shift);` without the `return`
keyword.
```suggestion
impl_->NextColumnChunk(std::move(cc_metadata), shift);
```
##########
cpp/src/parquet/metadata.h:
##########
@@ -485,6 +488,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
~RowGroupMetaDataBuilder();
ColumnChunkMetaDataBuilder* NextColumnChunk();
Review Comment:
The new overload of `NextColumnChunk` that takes `ColumnChunkMetaData` and a
shift value lacks documentation. This method allows adding column metadata
without creating a builder, which is a key feature for the rewriter's fast-copy
optimization. Documentation should explain the parameters (especially `shift`
for offset adjustment) and when to use this method versus the builder-based
approach.
```suggestion
ColumnChunkMetaDataBuilder* NextColumnChunk();
// Add an existing column chunk metadata object to the row group without
// constructing it through a ColumnChunkMetaDataBuilder.
//
// This overload is intended for fast-copy / rewriting scenarios where
// column chunks are reused from another file and their metadata has
// already been finalized.
//
// Parameters:
// - cc_metadata: Ownership of the provided ColumnChunkMetaData is
// transferred to the RowGroupMetaDataBuilder.
// - shift: Byte offset delta to apply to all file-relative offsets in
// cc_metadata (for example, when appending row groups at a different
// position in the target file). Use 0 if no adjustment is needed.
//
// For column chunks produced by this writer, prefer the builder-based
// NextColumnChunk() API above.
```
##########
cpp/src/parquet/file_rewriter.h:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+class PARQUET_EXPORT ParquetFileRewriter {
+ public:
+ struct PARQUET_EXPORT Contents {
+ virtual ~Contents() = default;
+ virtual void Close() = 0;
+ virtual void Rewrite() = 0;
+ };
+
+ ParquetFileRewriter();
+ ~ParquetFileRewriter();
+
+ static std::unique_ptr<ParquetFileRewriter> Open(
+ std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
+ std::shared_ptr<ArrowOutputStream> sink,
+ std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
+ std::shared_ptr<const ::arrow::KeyValueMetadata> sink_metadata = NULLPTR,
+ std::shared_ptr<RewriterProperties> props =
default_rewriter_properties());
+
+ void Open(std::unique_ptr<Contents> contents);
+ void Close();
+
+ void Rewrite();
Review Comment:
The `ParquetFileRewriter` class and its public methods (`Open`, `Close`,
`Rewrite`) lack documentation comments. Since this is a new public API (marked
with `PARQUET_EXPORT`), these methods should have documentation describing
their purpose, parameters, return values, and any exceptions that may be
thrown. This is especially important for the `Open` method with its complex
nested vector parameters.
##########
cpp/src/parquet/properties.h:
##########
@@ -1526,4 +1526,74 @@ struct ArrowWriteContext {
PARQUET_EXPORT
std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties();
+class PARQUET_EXPORT RewriterProperties {
+ public:
+ class Builder {
+ public:
+ Builder()
+ : pool_(::arrow::default_memory_pool()),
+ writer_properties_(default_writer_properties()),
+ reader_properties_(default_reader_properties()) {}
+
+ explicit Builder(const RewriterProperties& properties)
+ : pool_(properties.memory_pool()),
+ writer_properties_(properties.writer_properties()),
+ reader_properties_(properties.reader_properties()) {}
+
+ virtual ~Builder() = default;
+
+ /// Specify the memory pool for the rewriter. Default default_memory_pool.
+ Builder* memory_pool(MemoryPool* pool) {
+ pool_ = pool;
+ return this;
+ }
+
+ /// Set the writer properties.
+ Builder* writer_properties(std::shared_ptr<WriterProperties> properties) {
+ writer_properties_ = std::move(properties);
+ return this;
+ }
+
+ /// Set the reader properties.
+ Builder* reader_properties(ReaderProperties properties) {
+ reader_properties_ = std::move(properties);
+ return this;
+ }
+
+ /// Build the RewriterProperties with the builder parameters.
+ std::shared_ptr<RewriterProperties> build() {
+ return std::shared_ptr<RewriterProperties>(new RewriterProperties(
+ pool_, std::move(writer_properties_),
std::move(reader_properties_)));
+ }
+
+ private:
+ MemoryPool* pool_;
+ std::shared_ptr<WriterProperties> writer_properties_;
+ ReaderProperties reader_properties_;
+ };
+
+ MemoryPool* memory_pool() const { return pool_; }
+
+ const std::shared_ptr<WriterProperties>& writer_properties() const {
+ return writer_properties_;
+ }
+
+ const ReaderProperties& reader_properties() const { return
reader_properties_; }
+
+ private:
+ explicit RewriterProperties(MemoryPool* pool,
+ std::shared_ptr<WriterProperties>
writer_properties,
+ ReaderProperties reader_properties)
+ : pool_(pool),
+ writer_properties_(std::move(writer_properties)),
+ reader_properties_(std::move(reader_properties)) {}
+
+ MemoryPool* pool_;
+ std::shared_ptr<WriterProperties> writer_properties_;
+ ReaderProperties reader_properties_;
+};
Review Comment:
The `RewriterProperties` class lacks documentation. As a new public API
class (marked with `PARQUET_EXPORT`), it should have comprehensive
documentation explaining its purpose, how it relates to `WriterProperties` and
`ReaderProperties`, and what configuration options it provides for the rewriter.
##########
cpp/src/parquet/page_index.h:
##########
@@ -370,6 +371,12 @@ class PARQUET_EXPORT PageIndexBuilder {
/// the PageIndexBuilder.
virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0;
+ virtual void SetColumnIndex(int32_t i,
+ const std::shared_ptr<ColumnIndex>&
column_index) = 0;
+
Review Comment:
The new methods `SetColumnIndex` and `SetOffsetIndex` lack documentation
comments. These are part of the public `PageIndexBuilder` interface and should
include documentation explaining their purpose, parameters (especially the
`shift` parameter in `SetOffsetIndex`), and when they should be used instead of
the corresponding builder methods.
```suggestion
/// \brief Set a fully constructed ColumnIndex for a column.
///
/// This method can be used instead of GetColumnIndexBuilder() when the
caller
/// already has a materialized ColumnIndex instance (for example, when
reusing
/// or transforming an existing index) and does not need to build it
through
/// the ColumnIndexBuilder interface.
///
/// \param i Column ordinal.
/// \param column_index The ColumnIndex to associate with the given column.
virtual void SetColumnIndex(int32_t i,
const std::shared_ptr<ColumnIndex>&
column_index) = 0;
/// \brief Set a fully constructed OffsetIndex for a column.
///
/// This method can be used instead of GetOffsetIndexBuilder() when the
caller
/// already has a materialized OffsetIndex instance.
///
/// The \p shift parameter is applied to all page offsets contained in the
/// provided OffsetIndex. It allows reusing an index whose offsets are
relative
/// to a different file position (for example, when concatenating data or
/// writing the index at a different location) by shifting all stored
offsets
/// by a constant amount.
///
/// \param i Column ordinal.
/// \param offset_index The OffsetIndex to associate with the given column.
/// \param shift A byte offset added to each page offset stored in \p
offset_index.
```
##########
cpp/src/parquet/arrow/test_util.h:
##########
@@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected,
Array* result) {
EXPECT_TRUE(result->Equals(*expected_array));
}
Review Comment:
The `WriteFile` function takes `buffer` as a non-const reference parameter
(`std::shared_ptr<Buffer>&`). This is unusual in modern C++; it would be more
idiomatic to return the buffer via the return value or use an out parameter
with clearer naming (e.g., `out_buffer`). However, since this is a test utility
function, this is acceptable. Consider adding a comment to clarify that
`buffer` is an output parameter.
```suggestion
// Note: 'buffer' is an output parameter that will receive the serialized
file contents.
```
##########
cpp/src/parquet/page_index.cc:
##########
@@ -802,11 +827,13 @@ class PageIndexBuilderImpl final : public
PageIndexBuilder {
WriteResult result;
- // Serialize column index ordered by row group ordinal and then column
ordinal.
- result.column_index_locations = SerializeIndex(column_index_builders_,
sink);
+ /// Serialize column index ordered by row group ordinal and then column
ordinal.
+ result.column_index_locations =
+ SerializeIndex(column_index_builders_, column_indices_, sink);
- // Serialize offset index ordered by row group ordinal and then column
ordinal.
- result.offset_index_locations = SerializeIndex(offset_index_builders_,
sink);
+ /// Serialize offset index ordered by row group ordinal and then column
ordinal.
+ result.offset_index_locations =
+ SerializeIndex(offset_index_builders_, offset_indices_, sink);
Review Comment:
The comment style changed from `//` to `///` (documentation comment). While
this is technically correct since these are implementation details, verify if
this change is intentional. In the existing codebase, regular comments (`//`)
are typically used for implementation notes, while documentation comments
(`///`) are used for API documentation.
##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,813 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h" // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+ std::shared_ptr<ArrowOutputStream> to, int64_t size,
+ ::arrow::MemoryPool* pool) {
+ int64_t bytes_copied = 0;
+ if (from->supports_zero_copy()) {
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+ if (buffer->size() == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+ bytes_copied += buffer->size();
+ }
+ return;
+ }
+
+ std::shared_ptr<ResizableBuffer> buffer =
+ AllocateBuffer(pool, kDefaultOutputStreamSize);
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(
+ auto read_size,
+ from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+ if (read_size == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+ bytes_copied += read_size;
+ }
+}
+} // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+ static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+ RewriterProperties::Builder().build();
+ return default_rewriter_properties;
+}
+
+class PagesRewriter {
+ public:
+ PagesRewriter(const RewriterProperties& props, std::unique_ptr<PageReader>
page_reader,
+ std::unique_ptr<PageWriter> page_writer,
+ std::shared_ptr<OffsetIndex> original_offset_index)
+ : props_(props),
+ page_reader_(std::move(page_reader)),
+ page_writer_(std::move(page_writer)),
+ original_offset_index_(std::move(original_offset_index)) {}
+
+ void WritePages() {
+ bool has_dictionary = false;
+ bool fallback = false;
+ std::shared_ptr<Page> page;
+ size_t page_no = 0;
+ while ((page = page_reader_->NextPage()) != nullptr) {
+ switch (page->type()) {
+ case parquet::PageType::DICTIONARY_PAGE: {
+ WriteDictionaryPage(*static_cast<const DictionaryPage*>(page.get()));
+ has_dictionary = true;
+ break;
+ }
+ case parquet::PageType::DATA_PAGE: {
+ auto& data_page = *static_cast<const DataPageV1*>(page.get());
+ if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+ fallback = true;
+ }
+ WriteDataPageV1(data_page, page_no);
+ page_no++;
+ break;
+ }
+ case parquet::PageType::DATA_PAGE_V2: {
+ auto& data_page = *static_cast<const DataPageV2*>(page.get());
+ if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+ fallback = true;
+ }
+ WriteDataPageV2(data_page, page_no);
+ page_no++;
+ break;
+ }
+ default: {
+ ARROW_LOG(DEBUG) << "Unsupported page type: " <<
static_cast<int>(page->type());
+ break;
+ }
+ }
+ }
+ page_writer_->Close(has_dictionary, has_dictionary && fallback);
+ }
+
+ int64_t total_uncompressed_size() const { return total_uncompressed_size_; }
+
+ private:
+ void WriteDictionaryPage(const DictionaryPage& dict_page) {
+ total_uncompressed_size_ += page_writer_->WriteDictionaryPage(dict_page);
+ }
+
+ void WriteDataPageV1(const DataPageV1& data_page, const size_t page_no) {
+ std::shared_ptr<Buffer> compressed_data;
+ if (page_writer_->has_compressor()) {
+ auto buffer = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(props_.memory_pool(), data_page.size()));
+ page_writer_->Compress(*data_page.buffer(), buffer.get());
+ compressed_data = std::move(buffer);
+ } else {
+ compressed_data = data_page.buffer();
+ }
+ auto first_row_index =
+ original_offset_index_
+ ? std::optional{original_offset_index_->page_locations()[page_no]
+ .first_row_index}
+ : std::nullopt;
+ SizeStatistics size_statistics;
+ size_statistics.unencoded_byte_array_data_bytes =
+ original_offset_index_ &&
+
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+ ? std::optional{original_offset_index_
+ ->unencoded_byte_array_data_bytes()[page_no]}
+ : std::nullopt;
+ DataPageV1 new_page(compressed_data, data_page.num_values(),
data_page.encoding(),
+ data_page.definition_level_encoding(),
+ data_page.repetition_level_encoding(),
+ data_page.uncompressed_size(), data_page.statistics(),
+ first_row_index, size_statistics);
+ total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+ }
+
+ void WriteDataPageV2(const DataPageV2& data_page, const size_t page_no) {
+ int32_t levels_byte_len = data_page.repetition_levels_byte_length() +
+ data_page.definition_levels_byte_length();
+ bool page_is_compressed = false;
+ std::shared_ptr<Buffer> output_buffer;
+ if (page_writer_->has_compressor() && data_page.size() > levels_byte_len) {
+ auto values_buffer = SliceBuffer(data_page.buffer(), levels_byte_len);
+ auto compressed_values = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(props_.memory_pool(), values_buffer->size()));
+ page_writer_->Compress(*values_buffer, compressed_values.get());
+ if (compressed_values->size() < values_buffer->size()) {
+ page_is_compressed = true;
+ int64_t combined_size = levels_byte_len + compressed_values->size();
+ auto combined = AllocateBuffer(props_.memory_pool(), combined_size);
+ if (levels_byte_len > 0) {
+ std::memcpy(combined->mutable_data(), data_page.data(),
levels_byte_len);
+ }
+ std::memcpy(combined->mutable_data() + levels_byte_len,
compressed_values->data(),
+ compressed_values->size());
+ output_buffer = std::move(combined);
+ }
+ }
+ if (!page_is_compressed) {
+ output_buffer = data_page.buffer();
+ }
+
+ auto first_row_index =
+ original_offset_index_
+ ? std::optional{original_offset_index_->page_locations()[page_no]
+ .first_row_index}
+ : std::nullopt;
+ SizeStatistics size_statistics;
+ size_statistics.unencoded_byte_array_data_bytes =
+ original_offset_index_ &&
+
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+ ? std::optional{original_offset_index_
+ ->unencoded_byte_array_data_bytes()[page_no]}
+ : std::nullopt;
+ DataPageV2 new_page(output_buffer, data_page.num_values(),
data_page.num_nulls(),
+ data_page.num_rows(), data_page.encoding(),
+ data_page.definition_levels_byte_length(),
+ data_page.repetition_levels_byte_length(),
+ data_page.uncompressed_size(), page_is_compressed,
+ data_page.statistics(), first_row_index,
size_statistics);
+ total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+ }
+
+ const RewriterProperties& props_;
+ std::unique_ptr<PageReader> page_reader_;
+ std::unique_ptr<PageWriter> page_writer_;
+ std::shared_ptr<OffsetIndex> original_offset_index_;
+ int64_t total_uncompressed_size_{0};
+};
+
+class ColumnChunkRewriter {
+ public:
+ ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ const RewriterProperties& props,
+ std::unique_ptr<ColumnChunkMetaData> metadata,
+ std::shared_ptr<RowGroupPageIndexReader>
page_index_reader,
+ std::shared_ptr<RowGroupBloomFilterReader>
bloom_filter_reader,
+ int32_t row_group_ordinal, int32_t column_ordinal, bool
fast_copy)
+ : source_(std::move(source)),
+ sink_(std::move(sink)),
+ props_(props),
+ metadata_(std::move(metadata)),
+ page_index_reader_(std::move(page_index_reader)),
+ bloom_filter_reader_(std::move(bloom_filter_reader)),
+ row_group_ordinal_(row_group_ordinal),
+ column_ordinal_(column_ordinal),
+ fast_copy_(fast_copy) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("ColumnChunkMetaData should not be nullptr");
+ }
+ }
+
+ static bool CanFastCopy(const RewriterProperties& props,
+ const ColumnChunkMetaData& metadata) {
+ Compression::type original_codec = metadata.compression();
+ auto column_path = metadata.path_in_schema();
+ Compression::type new_codec =
props.writer_properties()->compression(column_path);
+ return (original_codec == new_codec);
+ }
+
+ void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ auto& reader_props = props_.reader_properties();
+ auto& writer_props = *props_.writer_properties();
+ auto stream = reader_props.GetStream(source_, metadata_->start_offset(),
+ metadata_->total_compressed_size());
+
+ if (fast_copy_) {
+ auto uncompressed_size = metadata_->total_uncompressed_size();
+
+ PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+ int64_t shift = sink_offset - metadata_->start_offset();
+
+ CopyStream(stream, sink_, metadata_->total_compressed_size(),
props_.memory_pool());
+ PARQUET_THROW_NOT_OK(stream->Close());
+
+ rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ auto offset_index =
page_index_reader_->GetOffsetIndex(column_ordinal_);
+ if (offset_index != nullptr) {
+ page_index_builder->SetOffsetIndex(column_ordinal_, offset_index,
shift);
+ }
+ }
+
+ total_bytes_written += uncompressed_size;
+ } else {
+ auto column_path = metadata_->path_in_schema();
+ auto new_codec = writer_props.compression(column_path);
+ auto codec_options = writer_props.codec_options(column_path);
+
+ auto* cc_metadata_builder = rg_metadata_builder.NextColumnChunk();
+
+ OffsetIndexBuilder* offset_index_builder = nullptr;
+ std::shared_ptr<OffsetIndex> original_offset_index = nullptr;
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ offset_index_builder =
page_index_builder->GetOffsetIndexBuilder(column_ordinal_);
+ original_offset_index =
page_index_reader_->GetOffsetIndex(column_ordinal_);
+ }
+
+ auto page_reader = PageReader::Open(std::move(stream),
metadata_->num_values(),
+ metadata_->compression(),
reader_props);
+ auto page_writer = PageWriter::Open(
+ sink_, new_codec, cc_metadata_builder,
static_cast<int16_t>(row_group_ordinal_),
+ static_cast<int16_t>(column_ordinal_), props_.memory_pool(),
+ /*buffered_row_group=*/false,
+ /*header_encryptor=*/nullptr, /*data_encryptor=*/nullptr,
+ writer_props.page_checksum_enabled(),
+ /*column_index_builder=*/nullptr, offset_index_builder,
+ codec_options ? *codec_options : CodecOptions{});
+
+ PagesRewriter pages_rewriter(props_, std::move(page_reader),
std::move(page_writer),
+ std::move(original_offset_index));
+ pages_rewriter.WritePages();
+
+ total_bytes_written += pages_rewriter.total_uncompressed_size();
+ }
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_);
+ if (column_index != nullptr) {
+ page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+ }
+ }
+ if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+ auto bloom_filter =
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+ if (bloom_filter != nullptr) {
+ bloom_filter_builder->InsertBloomFilter(column_ordinal_,
std::move(bloom_filter));
+ }
+ }
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::unique_ptr<ColumnChunkMetaData> metadata_;
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+ std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+ const int32_t row_group_ordinal_;
+ const int32_t column_ordinal_;
+ const bool fast_copy_;
+};
+
+class RowGroupRewriter {
+ public:
+ RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ const RewriterProperties& props,
+ std::shared_ptr<RowGroupReader> row_group_reader,
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+ std::shared_ptr<RowGroupBloomFilterReader>
bloom_filter_reader)
+ : source_(std::move(source)),
+ sink_(std::move(sink)),
+ props_(props),
+ row_group_reader_(std::move(row_group_reader)),
+ page_index_reader_(std::move(page_index_reader)),
+ bloom_filter_reader_(std::move(bloom_filter_reader)),
+ metadata_(row_group_reader_->metadata()) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("RowGroupMetaData should not be nullptr");
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+ std::vector<bool> can_column_chunk_fast_copy(metadata_->num_columns());
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ can_column_chunk_fast_copy[i] =
+ ColumnChunkRewriter::CanFastCopy(props_, *cc_metadata);
+ }
+ bool fast_copy = std::ranges::all_of(can_column_chunk_fast_copy,
std::identity{});
+ if (fast_copy) {
+ fast_copy = metadata_->file_offset() != 0;
+ }
+ if (fast_copy) {
+ PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+ int64_t shift = sink_offset - metadata_->file_offset();
+
+ auto stream = props_.reader_properties().GetStream(
+ source_, metadata_->file_offset(),
metadata_->total_compressed_size());
+ CopyStream(stream, sink_, metadata_->total_compressed_size(),
props_.memory_pool());
+ PARQUET_THROW_NOT_OK(stream->Close());
+
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+ auto column_index =
+ page_index_reader_ ? page_index_reader_->GetColumnIndex(i) :
nullptr;
+ auto offset_index =
+ page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) :
nullptr;
+ auto bloom_filter = bloom_filter_reader_
+ ? bloom_filter_reader_->GetColumnBloomFilter(i)
+ : nullptr;
+
+ if (column_index != nullptr && page_index_builder != nullptr) {
+ page_index_builder->SetColumnIndex(i, column_index);
+ }
+ if (offset_index != nullptr && page_index_builder != nullptr) {
+ page_index_builder->SetOffsetIndex(i, offset_index, shift);
+ }
+ if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+ bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+ }
+ }
+
+ total_bytes_written += metadata_->total_byte_size();
+ } else {
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ ColumnChunkRewriter rewriter(source_, sink_, props_,
std::move(cc_metadata),
+ page_index_reader_, bloom_filter_reader_,
+ row_group_ordinal, i,
can_column_chunk_fast_copy[i]);
+ rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+ bloom_filter_builder,
total_bytes_written);
+ }
+ }
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::shared_ptr<RowGroupReader> row_group_reader_;
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+ std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+ const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+ SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ std::shared_ptr<FileMetaData> source_metadata,
+ const RewriterProperties& props)
+ : source_(source),
+ sink_(std::move(sink)),
+ props_(props),
+ parquet_file_reader_(ParquetFileReader::Open(
+ std::move(source), props_.reader_properties(),
std::move(source_metadata))),
+ page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+ bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+ metadata_(parquet_file_reader_->metadata()) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("FileMetaData should not be nullptr");
+ }
+
+ std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+ std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+ std::vector<int32_t> column_indices(metadata_->num_columns());
+ std::iota(column_indices.begin(), column_indices.end(), 0);
+ if (page_index_reader_) {
+ page_index_reader_->WillNeed(row_group_indices, column_indices,
+ {/*column_index=*/true,
/*offset_index=*/true});
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ if (current_row_group_index_ >= metadata_->num_row_groups()) {
+ std::stringstream ss;
+ ss << "Trying to read row group " << current_row_group_index_
+ << " but file only has " << metadata_->num_row_groups() << " row
groups";
+ throw ParquetException(ss.str());
+ }
+ auto row_group_reader =
parquet_file_reader_->RowGroup(current_row_group_index_);
+ auto page_index_reader = page_index_reader_
+ ?
page_index_reader_->RowGroup(current_row_group_index_)
+ : nullptr;
+ auto bloom_filter_reader =
bloom_filter_reader_.RowGroup(current_row_group_index_);
+ RowGroupRewriter rewriter(source_, sink_, props_,
std::move(row_group_reader),
+ std::move(page_index_reader),
+ std::move(bloom_filter_reader));
+ rewriter.WriteRowGroupData(row_group_ordinal, rg_metadata_builder,
page_index_builder,
+ bloom_filter_builder, total_bytes_written);
+ ++current_row_group_index_;
+ }
+
+ bool HasMoreRowGroup() {
+ return current_row_group_index_ < metadata_->num_row_groups();
+ }
+
+ void Close() { parquet_file_reader_->Close(); }
+
+ const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+ std::vector<int64_t> row_group_row_counts() const {
+ int num_row_groups = metadata_->num_row_groups();
+ std::vector<int64_t> row_counts;
+ row_counts.reserve(num_row_groups);
+ for (int i = 0; i < num_row_groups; ++i) {
+ row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+ }
+ return row_counts;
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+ std::shared_ptr<PageIndexReader> page_index_reader_;
+ BloomFilterReader& bloom_filter_reader_;
+ std::shared_ptr<FileMetaData> metadata_;
+ int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+ explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>>
rewriters)
+ : file_rewriters_(std::move(rewriters)) {
+ if (file_rewriters_.empty()) {
+ throw ParquetException("At least one SingleFileRewriter is required");
+ }
+ auto& schema = file_rewriters_[0]->schema();
+ if (std::ranges::any_of(
+ file_rewriters_ | std::views::drop(1),
+ [&schema](auto& rewriter) { return
!schema.Equals(rewriter->schema()); })) {
+ throw ParquetException("Input files have different schemas.");
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+ row_group_ordinal, rg_metadata_builder, page_index_builder,
bloom_filter_builder,
+ total_bytes_written);
+ }
+
+ bool HasMoreRowGroup() {
+ while (current_rewriter_index_ < file_rewriters_.size() &&
+ !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+ file_rewriters_[current_rewriter_index_]->Close();
+ ARROW_LOG(DEBUG) << "Finished rewriting file index " <<
current_rewriter_index_;
+ ++current_rewriter_index_;
+ }
+ return current_rewriter_index_ < file_rewriters_.size();
+ }
+
+ void Close() { std::ranges::for_each(file_rewriters_,
&SingleFileRewriter::Close); }
+
+ const SchemaDescriptor& schema() const { return
file_rewriters_[0]->schema(); }
+
+ std::vector<int64_t> row_group_row_counts() const {
+ std::vector<int64_t> row_counts;
+ for (auto& rewriter : file_rewriters_) {
+ auto count = rewriter->row_group_row_counts();
+ row_counts.insert(row_counts.end(), count.begin(), count.end());
+ }
+ return row_counts;
+ }
+
+ private:
+ std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_;
+ size_t current_rewriter_index_{};
+};
+
+class JoinRewriter {
+ public:
+ explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters)
+ : rewriters_(std::move(rewriters)) {
+ if (rewriters_.empty()) {
+ throw ParquetException("At least one ConcatRewriter is required");
+ }
+ auto row_counts = rewriters_[0]->row_group_row_counts();
+ for (size_t i = 1; i < rewriters_.size(); ++i) {
+ if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
+ row_counts != current_row_counts) {
+ auto vecToString = [](const std::vector<int64_t>& v) {
+ if (v.empty()) {
+ return std::string("[]");
+ }
+ std::ostringstream oss;
+ oss << "[" << v[0];
+ // TODO(anyone): use std::format and std::views::join_with when
C++23 available
+ for (const auto& val : v | std::views::drop(1)) {
+ oss << ", " << val;
+ }
+ oss << "]";
+ return oss.str();
+ };
+ throw ParquetException(
+ "The number of rows in each block must match! No.0 blocks row
counts: ",
+ vecToString(row_counts), ", No.", i,
+ " blocks row counts: ", vecToString(current_row_counts));
+ }
+ }
+
+ std::unordered_set<std::string> column_paths;
+ schema::NodeVector fields;
+
+ for (auto& rewriter : rewriters_) {
+ const SchemaDescriptor& schema_desc = rewriter->schema();
+
+ for (int i = 0; i < schema_desc.num_columns(); ++i) {
+ auto path = schema_desc.Column(i)->path()->ToDotString();
+ if (auto [_, inserted] = column_paths.emplace(path); !inserted) {
+ // TODO(HuaHuaY): support choose one column from columns with same
path
+ throw ParquetException("NotImplemented, files have same column path:
", path);
Review Comment:
The error message "NotImplemented, files have same column path: " should be
"NotImplemented: files have the same column path: " for better grammar. Also
consider adding more context about which files have the duplicate column, if
that information is available.
```suggestion
throw ParquetException("NotImplemented: files have the same column
path: ", path);
```
##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,813 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <sstream>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h" // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+ std::shared_ptr<ArrowOutputStream> to, int64_t size,
+ ::arrow::MemoryPool* pool) {
+ int64_t bytes_copied = 0;
+ if (from->supports_zero_copy()) {
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+ if (buffer->size() == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+ bytes_copied += buffer->size();
+ }
+ return;
+ }
+
+ std::shared_ptr<ResizableBuffer> buffer =
+ AllocateBuffer(pool, kDefaultOutputStreamSize);
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(
+ auto read_size,
+ from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+ if (read_size == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+ bytes_copied += read_size;
+ }
+}
+} // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+ static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+ RewriterProperties::Builder().build();
+ return default_rewriter_properties;
+}
+
+class PagesRewriter {
+ public:
+ PagesRewriter(const RewriterProperties& props, std::unique_ptr<PageReader>
page_reader,
+ std::unique_ptr<PageWriter> page_writer,
+ std::shared_ptr<OffsetIndex> original_offset_index)
+ : props_(props),
+ page_reader_(std::move(page_reader)),
+ page_writer_(std::move(page_writer)),
+ original_offset_index_(std::move(original_offset_index)) {}
+
+ void WritePages() {
+ bool has_dictionary = false;
+ bool fallback = false;
+ std::shared_ptr<Page> page;
+ size_t page_no = 0;
+ while ((page = page_reader_->NextPage()) != nullptr) {
+ switch (page->type()) {
+ case parquet::PageType::DICTIONARY_PAGE: {
+ WriteDictionaryPage(*static_cast<const DictionaryPage*>(page.get()));
+ has_dictionary = true;
+ break;
+ }
+ case parquet::PageType::DATA_PAGE: {
+ auto& data_page = *static_cast<const DataPageV1*>(page.get());
+ if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+ fallback = true;
+ }
+ WriteDataPageV1(data_page, page_no);
+ page_no++;
+ break;
+ }
+ case parquet::PageType::DATA_PAGE_V2: {
+ auto& data_page = *static_cast<const DataPageV2*>(page.get());
+ if (!IsDictionaryIndexEncoding(data_page.encoding())) {
+ fallback = true;
+ }
+ WriteDataPageV2(data_page, page_no);
+ page_no++;
+ break;
+ }
+ default: {
+ ARROW_LOG(DEBUG) << "Unsupported page type: " <<
static_cast<int>(page->type());
+ break;
+ }
+ }
+ }
+ page_writer_->Close(has_dictionary, has_dictionary && fallback);
+ }
+
+ int64_t total_uncompressed_size() const { return total_uncompressed_size_; }
+
+ private:
+ void WriteDictionaryPage(const DictionaryPage& dict_page) {
+ total_uncompressed_size_ += page_writer_->WriteDictionaryPage(dict_page);
+ }
+
+ void WriteDataPageV1(const DataPageV1& data_page, const size_t page_no) {
+ std::shared_ptr<Buffer> compressed_data;
+ if (page_writer_->has_compressor()) {
+ auto buffer = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(props_.memory_pool(), data_page.size()));
+ page_writer_->Compress(*data_page.buffer(), buffer.get());
+ compressed_data = std::move(buffer);
+ } else {
+ compressed_data = data_page.buffer();
+ }
+ auto first_row_index =
+ original_offset_index_
+ ? std::optional{original_offset_index_->page_locations()[page_no]
+ .first_row_index}
+ : std::nullopt;
+ SizeStatistics size_statistics;
+ size_statistics.unencoded_byte_array_data_bytes =
+ original_offset_index_ &&
+
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+ ? std::optional{original_offset_index_
+ ->unencoded_byte_array_data_bytes()[page_no]}
+ : std::nullopt;
+ DataPageV1 new_page(compressed_data, data_page.num_values(),
data_page.encoding(),
+ data_page.definition_level_encoding(),
+ data_page.repetition_level_encoding(),
+ data_page.uncompressed_size(), data_page.statistics(),
+ first_row_index, size_statistics);
+ total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+ }
+
+ void WriteDataPageV2(const DataPageV2& data_page, const size_t page_no) {
+ int32_t levels_byte_len = data_page.repetition_levels_byte_length() +
+ data_page.definition_levels_byte_length();
+ bool page_is_compressed = false;
+ std::shared_ptr<Buffer> output_buffer;
+ if (page_writer_->has_compressor() && data_page.size() > levels_byte_len) {
+ auto values_buffer = SliceBuffer(data_page.buffer(), levels_byte_len);
+ auto compressed_values = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(props_.memory_pool(), values_buffer->size()));
+ page_writer_->Compress(*values_buffer, compressed_values.get());
+ if (compressed_values->size() < values_buffer->size()) {
+ page_is_compressed = true;
+ int64_t combined_size = levels_byte_len + compressed_values->size();
+ auto combined = AllocateBuffer(props_.memory_pool(), combined_size);
+ if (levels_byte_len > 0) {
+ std::memcpy(combined->mutable_data(), data_page.data(),
levels_byte_len);
+ }
+ std::memcpy(combined->mutable_data() + levels_byte_len,
compressed_values->data(),
+ compressed_values->size());
+ output_buffer = std::move(combined);
+ }
+ }
+ if (!page_is_compressed) {
+ output_buffer = data_page.buffer();
+ }
+
+ auto first_row_index =
+ original_offset_index_
+ ? std::optional{original_offset_index_->page_locations()[page_no]
+ .first_row_index}
+ : std::nullopt;
+ SizeStatistics size_statistics;
+ size_statistics.unencoded_byte_array_data_bytes =
+ original_offset_index_ &&
+
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+ ? std::optional{original_offset_index_
+ ->unencoded_byte_array_data_bytes()[page_no]}
+ : std::nullopt;
+ DataPageV2 new_page(output_buffer, data_page.num_values(),
data_page.num_nulls(),
+ data_page.num_rows(), data_page.encoding(),
+ data_page.definition_levels_byte_length(),
+ data_page.repetition_levels_byte_length(),
+ data_page.uncompressed_size(), page_is_compressed,
+ data_page.statistics(), first_row_index,
size_statistics);
+ total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+ }
+
+ const RewriterProperties& props_;
+ std::unique_ptr<PageReader> page_reader_;
+ std::unique_ptr<PageWriter> page_writer_;
+ std::shared_ptr<OffsetIndex> original_offset_index_;
+ int64_t total_uncompressed_size_{0};
+};
+
+class ColumnChunkRewriter {
+ public:
+ ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ const RewriterProperties& props,
+ std::unique_ptr<ColumnChunkMetaData> metadata,
+ std::shared_ptr<RowGroupPageIndexReader>
page_index_reader,
+ std::shared_ptr<RowGroupBloomFilterReader>
bloom_filter_reader,
+ int32_t row_group_ordinal, int32_t column_ordinal, bool
fast_copy)
+ : source_(std::move(source)),
+ sink_(std::move(sink)),
+ props_(props),
+ metadata_(std::move(metadata)),
+ page_index_reader_(std::move(page_index_reader)),
+ bloom_filter_reader_(std::move(bloom_filter_reader)),
+ row_group_ordinal_(row_group_ordinal),
+ column_ordinal_(column_ordinal),
+ fast_copy_(fast_copy) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("ColumnChunkMetaData should not be nullptr");
+ }
+ }
+
+ static bool CanFastCopy(const RewriterProperties& props,
+ const ColumnChunkMetaData& metadata) {
+ Compression::type original_codec = metadata.compression();
+ auto column_path = metadata.path_in_schema();
+ Compression::type new_codec =
props.writer_properties()->compression(column_path);
+ return (original_codec == new_codec);
+ }
+
+ void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ auto& reader_props = props_.reader_properties();
+ auto& writer_props = *props_.writer_properties();
+ auto stream = reader_props.GetStream(source_, metadata_->start_offset(),
+ metadata_->total_compressed_size());
+
+ if (fast_copy_) {
+ auto uncompressed_size = metadata_->total_uncompressed_size();
+
+ PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+ int64_t shift = sink_offset - metadata_->start_offset();
+
+ CopyStream(stream, sink_, metadata_->total_compressed_size(),
props_.memory_pool());
+ PARQUET_THROW_NOT_OK(stream->Close());
+
+ rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ auto offset_index =
page_index_reader_->GetOffsetIndex(column_ordinal_);
+ if (offset_index != nullptr) {
+ page_index_builder->SetOffsetIndex(column_ordinal_, offset_index,
shift);
+ }
+ }
+
+ total_bytes_written += uncompressed_size;
+ } else {
+ auto column_path = metadata_->path_in_schema();
+ auto new_codec = writer_props.compression(column_path);
+ auto codec_options = writer_props.codec_options(column_path);
+
+ auto* cc_metadata_builder = rg_metadata_builder.NextColumnChunk();
+
+ OffsetIndexBuilder* offset_index_builder = nullptr;
+ std::shared_ptr<OffsetIndex> original_offset_index = nullptr;
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ offset_index_builder =
page_index_builder->GetOffsetIndexBuilder(column_ordinal_);
+ original_offset_index =
page_index_reader_->GetOffsetIndex(column_ordinal_);
+ }
+
+ auto page_reader = PageReader::Open(std::move(stream),
metadata_->num_values(),
+ metadata_->compression(),
reader_props);
+ auto page_writer = PageWriter::Open(
+ sink_, new_codec, cc_metadata_builder,
static_cast<int16_t>(row_group_ordinal_),
+ static_cast<int16_t>(column_ordinal_), props_.memory_pool(),
+ /*buffered_row_group=*/false,
+ /*header_encryptor=*/nullptr, /*data_encryptor=*/nullptr,
+ writer_props.page_checksum_enabled(),
+ /*column_index_builder=*/nullptr, offset_index_builder,
+ codec_options ? *codec_options : CodecOptions{});
+
+ PagesRewriter pages_rewriter(props_, std::move(page_reader),
std::move(page_writer),
+ std::move(original_offset_index));
+ pages_rewriter.WritePages();
+
+ total_bytes_written += pages_rewriter.total_uncompressed_size();
+ }
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_);
+ if (column_index != nullptr) {
+ page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+ }
+ }
+ if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+ auto bloom_filter =
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+ if (bloom_filter != nullptr) {
+ bloom_filter_builder->InsertBloomFilter(column_ordinal_,
std::move(bloom_filter));
+ }
+ }
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::unique_ptr<ColumnChunkMetaData> metadata_;
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+ std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+ const int32_t row_group_ordinal_;
+ const int32_t column_ordinal_;
+ const bool fast_copy_;
+};
+
+class RowGroupRewriter {
+ public:
+ RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ const RewriterProperties& props,
+ std::shared_ptr<RowGroupReader> row_group_reader,
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+ std::shared_ptr<RowGroupBloomFilterReader>
bloom_filter_reader)
+ : source_(std::move(source)),
+ sink_(std::move(sink)),
+ props_(props),
+ row_group_reader_(std::move(row_group_reader)),
+ page_index_reader_(std::move(page_index_reader)),
+ bloom_filter_reader_(std::move(bloom_filter_reader)),
+ metadata_(row_group_reader_->metadata()) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("RowGroupMetaData should not be nullptr");
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+ std::vector<bool> can_column_chunk_fast_copy(metadata_->num_columns());
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ can_column_chunk_fast_copy[i] =
+ ColumnChunkRewriter::CanFastCopy(props_, *cc_metadata);
+ }
+ bool fast_copy = std::ranges::all_of(can_column_chunk_fast_copy,
std::identity{});
+ if (fast_copy) {
+ fast_copy = metadata_->file_offset() != 0;
+ }
+ if (fast_copy) {
+ PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+ int64_t shift = sink_offset - metadata_->file_offset();
+
+ auto stream = props_.reader_properties().GetStream(
+ source_, metadata_->file_offset(),
metadata_->total_compressed_size());
+ CopyStream(stream, sink_, metadata_->total_compressed_size(),
props_.memory_pool());
+ PARQUET_THROW_NOT_OK(stream->Close());
+
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+ auto column_index =
+ page_index_reader_ ? page_index_reader_->GetColumnIndex(i) :
nullptr;
+ auto offset_index =
+ page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) :
nullptr;
+ auto bloom_filter = bloom_filter_reader_
+ ? bloom_filter_reader_->GetColumnBloomFilter(i)
+ : nullptr;
+
+ if (column_index != nullptr && page_index_builder != nullptr) {
+ page_index_builder->SetColumnIndex(i, column_index);
+ }
+ if (offset_index != nullptr && page_index_builder != nullptr) {
+ page_index_builder->SetOffsetIndex(i, offset_index, shift);
+ }
+ if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+ bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+ }
+ }
+
+ total_bytes_written += metadata_->total_byte_size();
+ } else {
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ ColumnChunkRewriter rewriter(source_, sink_, props_,
std::move(cc_metadata),
+ page_index_reader_, bloom_filter_reader_,
+ row_group_ordinal, i,
can_column_chunk_fast_copy[i]);
+ rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+ bloom_filter_builder,
total_bytes_written);
+ }
+ }
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::shared_ptr<RowGroupReader> row_group_reader_;
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+ std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+ const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+ SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ std::shared_ptr<FileMetaData> source_metadata,
+ const RewriterProperties& props)
+ : source_(source),
+ sink_(std::move(sink)),
+ props_(props),
+ parquet_file_reader_(ParquetFileReader::Open(
+ std::move(source), props_.reader_properties(),
std::move(source_metadata))),
+ page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+ bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+ metadata_(parquet_file_reader_->metadata()) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("FileMetaData should not be nullptr");
+ }
+
+ std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+ std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+ std::vector<int32_t> column_indices(metadata_->num_columns());
+ std::iota(column_indices.begin(), column_indices.end(), 0);
+ if (page_index_reader_) {
+ page_index_reader_->WillNeed(row_group_indices, column_indices,
+ {/*column_index=*/true,
/*offset_index=*/true});
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ if (current_row_group_index_ >= metadata_->num_row_groups()) {
+ std::stringstream ss;
+ ss << "Trying to read row group " << current_row_group_index_
+ << " but file only has " << metadata_->num_row_groups() << " row
groups";
+ throw ParquetException(ss.str());
+ }
+ auto row_group_reader =
parquet_file_reader_->RowGroup(current_row_group_index_);
+ auto page_index_reader = page_index_reader_
+ ?
page_index_reader_->RowGroup(current_row_group_index_)
+ : nullptr;
+ auto bloom_filter_reader =
bloom_filter_reader_.RowGroup(current_row_group_index_);
+ RowGroupRewriter rewriter(source_, sink_, props_,
std::move(row_group_reader),
+ std::move(page_index_reader),
+ std::move(bloom_filter_reader));
+ rewriter.WriteRowGroupData(row_group_ordinal, rg_metadata_builder,
page_index_builder,
+ bloom_filter_builder, total_bytes_written);
+ ++current_row_group_index_;
+ }
+
+ bool HasMoreRowGroup() {
+ return current_row_group_index_ < metadata_->num_row_groups();
+ }
+
+ void Close() { parquet_file_reader_->Close(); }
+
+ const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+ std::vector<int64_t> row_group_row_counts() const {
+ int num_row_groups = metadata_->num_row_groups();
+ std::vector<int64_t> row_counts;
+ row_counts.reserve(num_row_groups);
+ for (int i = 0; i < num_row_groups; ++i) {
+ row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+ }
+ return row_counts;
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+ std::shared_ptr<PageIndexReader> page_index_reader_;
+ BloomFilterReader& bloom_filter_reader_;
+ std::shared_ptr<FileMetaData> metadata_;
+ int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+ explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>>
rewriters)
+ : file_rewriters_(std::move(rewriters)) {
+ if (file_rewriters_.empty()) {
+ throw ParquetException("At least one SingleFileRewriter is required");
+ }
+ auto& schema = file_rewriters_[0]->schema();
+ if (std::ranges::any_of(
+ file_rewriters_ | std::views::drop(1),
+ [&schema](auto& rewriter) { return
!schema.Equals(rewriter->schema()); })) {
+ throw ParquetException("Input files have different schemas.");
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+ row_group_ordinal, rg_metadata_builder, page_index_builder,
bloom_filter_builder,
+ total_bytes_written);
+ }
+
+ bool HasMoreRowGroup() {
+ while (current_rewriter_index_ < file_rewriters_.size() &&
+ !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+ file_rewriters_[current_rewriter_index_]->Close();
+ ARROW_LOG(DEBUG) << "Finished rewriting file index " <<
current_rewriter_index_;
+ ++current_rewriter_index_;
+ }
+ return current_rewriter_index_ < file_rewriters_.size();
+ }
+
+ void Close() { std::ranges::for_each(file_rewriters_,
&SingleFileRewriter::Close); }
+
+ const SchemaDescriptor& schema() const { return
file_rewriters_[0]->schema(); }
+
+ std::vector<int64_t> row_group_row_counts() const {
+ std::vector<int64_t> row_counts;
+ for (auto& rewriter : file_rewriters_) {
+ auto count = rewriter->row_group_row_counts();
+ row_counts.insert(row_counts.end(), count.begin(), count.end());
+ }
+ return row_counts;
+ }
+
+ private:
+ std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_;
+ size_t current_rewriter_index_{};
+};
+
+class JoinRewriter {
+ public:
+ explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters)
+ : rewriters_(std::move(rewriters)) {
+ if (rewriters_.empty()) {
+ throw ParquetException("At least one ConcatRewriter is required");
+ }
+ auto row_counts = rewriters_[0]->row_group_row_counts();
+ for (size_t i = 1; i < rewriters_.size(); ++i) {
+ if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
+ row_counts != current_row_counts) {
+ auto vecToString = [](const std::vector<int64_t>& v) {
+ if (v.empty()) {
+ return std::string("[]");
+ }
+ std::ostringstream oss;
+ oss << "[" << v[0];
+ // TODO(anyone): use std::format and std::views::join_with when
C++23 available
+ for (const auto& val : v | std::views::drop(1)) {
+ oss << ", " << val;
+ }
+ oss << "]";
+ return oss.str();
+ };
+ throw ParquetException(
+ "The number of rows in each block must match! No.0 blocks row
counts: ",
+ vecToString(row_counts), ", No.", i,
+ " blocks row counts: ", vecToString(current_row_counts));
+ }
+ }
+
+ std::unordered_set<std::string> column_paths;
+ schema::NodeVector fields;
+
+ for (auto& rewriter : rewriters_) {
+ const SchemaDescriptor& schema_desc = rewriter->schema();
+
+ for (int i = 0; i < schema_desc.num_columns(); ++i) {
+ auto path = schema_desc.Column(i)->path()->ToDotString();
+ if (auto [_, inserted] = column_paths.emplace(path); !inserted) {
+ // TODO(HuaHuaY): support choose one column from columns with same
path
+ throw ParquetException("NotImplemented, files have same column path:
", path);
+ }
+ }
+
+ const auto& group_node = schema_desc.group_node();
+ for (int i = 0; i < group_node->field_count(); ++i) {
+ fields.push_back(group_node->field(i));
+ }
+ }
+
+ auto new_root = schema::GroupNode::Make(rewriters_[0]->schema().name(),
+ Repetition::REQUIRED, fields);
+ schema_.Init(new_root);
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ for (auto& rewriter : rewriters_) {
+ rewriter->WriteRowGroupData(row_group_ordinal, rg_metadata_builder,
+ page_index_builder, bloom_filter_builder,
+ total_bytes_written);
+ }
+ }
+
+ bool HasMoreRowGroup() {
+ return std::ranges::all_of(rewriters_, &ConcatRewriter::HasMoreRowGroup);
+ }
+
+ void Close() { std::ranges::for_each(rewriters_, &ConcatRewriter::Close); }
+
+ const SchemaDescriptor& schema() const { return schema_; }
+
+ private:
+ std::vector<std::unique_ptr<ConcatRewriter>> rewriters_;
+ SchemaDescriptor schema_;
+};
+
+// ----------------------------------------------------------------------
+// GeneratedFile
+
+class GeneratedFile : public ParquetFileRewriter::Contents {
+ public:
+ static std::unique_ptr<ParquetFileRewriter::Contents> Open(
+ std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
+ std::shared_ptr<ArrowOutputStream> sink,
+ std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
+ std::shared_ptr<const KeyValueMetadata> sink_metadata,
+ std::shared_ptr<RewriterProperties> props) {
+ if (sources.size() != sources_metadata.size() ||
+ // TODO(anyone): use std::views::zip when C++23 available
+ std::ranges::any_of(std::views::iota(0u, sources.size()), [&](size_t
i) {
+ return sources[i].size() != sources_metadata[i].size();
+ })) {
+ throw ParquetException(
+ "The number of sources and sources_metadata must be the same");
+ }
+ std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile(
+ std::move(sources), std::move(sink), std::move(sources_metadata),
+ std::move(sink_metadata), std::move(props)));
+ return result;
+ }
+
+ void Close() override {
+ if (rewriter_) {
+ rewriter_->Close();
+ rewriter_.reset();
+ }
+ }
+
+ void Rewrite() override {
+ int32_t row_group_ordinal = 0;
+ while (rewriter_->HasMoreRowGroup()) {
+ auto& rg_metadata_builder = *metadata_builder_->AppendRowGroup();
+ if (page_index_builder_) {
+ page_index_builder_->AppendRowGroup();
+ }
+ if (bloom_filter_builder_) {
+ bloom_filter_builder_->AppendRowGroup();
+ }
+ int64_t total_bytes_written = 0;
+ rewriter_->WriteRowGroupData(row_group_ordinal, rg_metadata_builder,
+ page_index_builder_.get(),
bloom_filter_builder_.get(),
+ total_bytes_written);
+ rg_metadata_builder.Finish(total_bytes_written);
+ row_group_ordinal++;
+ }
+ if (page_index_builder_) {
+ page_index_builder_->Finish();
+ auto [column_index_locations, offset_index_locations] =
+ page_index_builder_->WriteTo(sink_.get());
+ metadata_builder_->SetIndexLocations(IndexKind::kColumnIndex,
+ column_index_locations);
+ metadata_builder_->SetIndexLocations(IndexKind::kOffsetIndex,
+ offset_index_locations);
+ }
+ if (bloom_filter_builder_) {
+ auto bloom_filter_locations =
bloom_filter_builder_->WriteTo(sink_.get());
+ metadata_builder_->SetIndexLocations(IndexKind::kBloomFilter,
+ bloom_filter_locations);
+ }
+
+ auto file_metadata = metadata_builder_->Finish(sink_metadata_);
+ WriteFileMetaData(*file_metadata, sink_.get());
+ }
+
+ private:
+ GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>>
sources,
+ std::shared_ptr<ArrowOutputStream> sink,
+ std::vector<std::vector<std::shared_ptr<FileMetaData>>>
sources_metadata,
+ std::shared_ptr<const KeyValueMetadata> sink_metadata,
+ std::shared_ptr<RewriterProperties> props)
+ : sink_(std::move(sink)),
+ props_(std::move(props)),
+ sink_metadata_(std::move(sink_metadata)) {
+ std::vector<std::unique_ptr<ConcatRewriter>> rewriters;
+ rewriters.reserve(sources.size());
+ for (size_t i = 0; i < sources.size(); ++i) {
+ std::vector<std::unique_ptr<SingleFileRewriter>> concat_rewriters;
+ concat_rewriters.reserve(sources[i].size());
+ for (size_t j = 0; j < sources[i].size(); ++j) {
+ concat_rewriters.emplace_back(std::make_unique<SingleFileRewriter>(
+ std::move(sources[i][j]), sink_,
std::move(sources_metadata[i][j]), *props_));
+ }
+ rewriters.emplace_back(
+ std::make_unique<ConcatRewriter>(std::move(concat_rewriters)));
+ }
+ rewriter_ = std::make_unique<JoinRewriter>(std::move(rewriters));
+
+ if (props_->writer_properties()->file_encryption_properties() == nullptr) {
+ // Unencrypted parquet files always start with PAR1
+ PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4));
+ } else {
+ throw ParquetException(
+ "NotImplemented, rewriter does not support to write encrypted
files.");
Review Comment:
The error message "NotImplemented, rewriter does not support to write
encrypted files." has a minor grammatical issue. It should be "NotImplemented:
rewriter does not support writing encrypted files." or "NotImplemented,
rewriter does not support encrypted file writing."
```suggestion
"NotImplemented: rewriter does not support writing encrypted
files.");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]