wgtmac commented on code in PR #47775: URL: https://github.com/apache/arrow/pull/47775#discussion_r2434994093
########## cpp/src/parquet/file_rewriter.h: ########## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "parquet/metadata.h" Review Comment: Can we include `parquet/type_fwd.h` to use forward declaration as much as possible? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, Review Comment: Perhaps introduce a `RowGroupContext` to hold all row group xxx readers? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = Review Comment: This is blocked by https://github.com/apache/arrow/pull/37400 ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { Review Comment: We might need a per-column context to indicate what to do? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); Review Comment: We need to check if any of these is nullptr since they are optional. ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); Review Comment: ```suggestion page_index_reader_->WillNeed(row_group_indices, column_indices, {/*column_index=*/true, /*offset_index=*/true}); ``` ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < rewriters_.size(); ++i) { + rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> rewriters_; + size_t current_rewriter_index_{}; +}; + +class ExtendRewriter { + public: + // TODO(HuaHuaY): use type like `std::vector<std::unique_ptr<std::varint<ConcatRewriter, + // RecordBatchStream>>>` to handle batch stream in memory. + explicit ExtendRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + std::unordered_set<std::string> column_paths; + for (auto& rewriter : rewriters_) { + auto schema = rewriter->schema(); + for (int i = 0; i < schema->num_columns(); ++i) { + auto path = schema->Column(i)->path()->ToDotString(); + // TODO(HuaHuaY): give an option about keeping which column. + if (column_paths.find(path) != column_paths.end()) { + throw ParquetException("NotImplemented, files have same column path: ", path); + } + column_paths.emplace(std::move(path)); + } + } + auto row_counts = rewriters_[0]->row_group_row_counts(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (auto current_row_counts = rewriters_[i]->row_group_row_counts(); + row_counts != current_row_counts) { + auto vecToString = [](const std::vector<int64_t>& v) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < v.size(); ++i) { + oss << v[i] << (i + 1 < v.size() ? ", " : ""); + } + oss << "]"; + return oss.str(); + }; + throw ParquetException( + "The number of rows in each block must match! No.0 blocks row counts: ", + vecToString(row_counts), ", No.", i, + " blocks row counts: ", vecToString(current_row_counts)); + } + } + } + + void WriteData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (rewriters_.size() == 1) { + rewriters_[0]->WriteData(true, rg_metadata_builder, page_index_builder, + total_bytes_written); + return; + } + for (auto& rewriter : rewriters_) { + rewriter->WriteData(false, rg_metadata_builder, page_index_builder, + total_bytes_written); + } + } + + bool HasData() { return rewriters_[0]->HasData(); } + + void Close() { + for (auto& rewriter : rewriters_) { + rewriter->Close(); + } + } + + const SchemaDescriptor* schema() const { + // TODO(HuaHuaY): support file joining later. + if (rewriters_.size() > 1) { + throw ParquetException("NotImplemented, only support one ConcatRewriter now."); + } + return rewriters_[0]->schema(); + } + + private: + std::vector<std::unique_ptr<ConcatRewriter>> rewriters_; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr<ParquetFileRewriter::Contents> Open( + std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) { + std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile( + std::move(sources), std::move(sink), std::move(sources_metadata), + std::move(sink_metadata), std::move(props))); + return result; + } + + void Close() override { + if (rewriter_) { + rewriter_->Close(); + rewriter_.reset(); + } + } + + void Rewrite() override { + while (rewriter_->HasData()) { + auto* rg_metadata_builder = new_file_metadata_->AppendRowGroup(); + page_index_builder_->AppendRowGroup(); + int64_t total_bytes_written = 0; + rewriter_->WriteData(rg_metadata_builder, page_index_builder_.get(), + total_bytes_written); + rg_metadata_builder->Finish(total_bytes_written); + } + page_index_builder_->Finish(); + PageIndexLocation location; + page_index_builder_->WriteTo(sink_.get(), &location); + new_file_metadata_->SetPageIndexLocation(location); + + auto file_metadata = new_file_metadata_->Finish(sink_metadata_); + WriteFileMetaData(*file_metadata, sink_.get()); + } + + private: + GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) + : sink_(std::move(sink)), + props_(std::move(props)), + sink_metadata_(std::move(sink_metadata)) { + std::vector<std::unique_ptr<ConcatRewriter>> rewriters; + rewriters.reserve(sources.size()); + for (size_t i = 0; i < sources.size(); ++i) { + std::vector<std::unique_ptr<SingleFileRewriter>> concat_rewriters; + concat_rewriters.reserve(sources[i].size()); + for (size_t j = 0; j < sources[i].size(); ++j) { + concat_rewriters.emplace_back(std::make_unique<SingleFileRewriter>( + std::move(sources[i][j]), sink_, std::move(sources_metadata[i][j]), + props_.get())); + } + rewriters.emplace_back( + std::make_unique<ConcatRewriter>(std::move(concat_rewriters))); + } + rewriter_ = std::make_unique<ExtendRewriter>(std::move(rewriters)); + + if (props_->writer_properties()->file_encryption_properties() == nullptr) { + // Unencrypted parquet files always start with PAR1 + PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); + } else { + throw ParquetException( + "NotImplemented, rewriter does not support to write encrypted files."); + } + + auto new_schema = rewriter_->schema()->schema_root(); + new_schema_.Init(new_schema); + new_file_metadata_ = + FileMetaDataBuilder::Make(&new_schema_, props_->writer_properties()); + if (props_->writer_properties()->page_index_enabled()) { + page_index_builder_ = PageIndexBuilder::Make(&new_schema_, nullptr); + } + } + + std::shared_ptr<ArrowOutputStream> sink_; + std::shared_ptr<RewriterProperties> props_; + std::shared_ptr<const KeyValueMetadata> sink_metadata_; + std::unique_ptr<ExtendRewriter> rewriter_; + + SchemaDescriptor new_schema_; + std::unique_ptr<FileMetaDataBuilder> new_file_metadata_; Review Comment: ```suggestion std::unique_ptr<FileMetaDataBuilder> metadata_builder_; ``` At least we need to add builder to its name. ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < rewriters_.size(); ++i) { + rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> rewriters_; + size_t current_rewriter_index_{}; +}; + +class ExtendRewriter { + public: + // TODO(HuaHuaY): use type like `std::vector<std::unique_ptr<std::varint<ConcatRewriter, + // RecordBatchStream>>>` to handle batch stream in memory. + explicit ExtendRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + std::unordered_set<std::string> column_paths; + for (auto& rewriter : rewriters_) { + auto schema = rewriter->schema(); + for (int i = 0; i < schema->num_columns(); ++i) { + auto path = schema->Column(i)->path()->ToDotString(); + // TODO(HuaHuaY): give an option about keeping which column. + if (column_paths.find(path) != column_paths.end()) { + throw ParquetException("NotImplemented, files have same column path: ", path); + } + column_paths.emplace(std::move(path)); + } + } + auto row_counts = rewriters_[0]->row_group_row_counts(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (auto current_row_counts = rewriters_[i]->row_group_row_counts(); + row_counts != current_row_counts) { + auto vecToString = [](const std::vector<int64_t>& v) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < v.size(); ++i) { + oss << v[i] << (i + 1 < v.size() ? ", " : ""); + } + oss << "]"; + return oss.str(); + }; + throw ParquetException( + "The number of rows in each block must match! No.0 blocks row counts: ", + vecToString(row_counts), ", No.", i, + " blocks row counts: ", vecToString(current_row_counts)); + } + } + } + + void WriteData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (rewriters_.size() == 1) { + rewriters_[0]->WriteData(true, rg_metadata_builder, page_index_builder, + total_bytes_written); + return; + } + for (auto& rewriter : rewriters_) { + rewriter->WriteData(false, rg_metadata_builder, page_index_builder, + total_bytes_written); + } + } + + bool HasData() { return rewriters_[0]->HasData(); } + + void Close() { + for (auto& rewriter : rewriters_) { + rewriter->Close(); + } + } + + const SchemaDescriptor* schema() const { + // TODO(HuaHuaY): support file joining later. + if (rewriters_.size() > 1) { + throw ParquetException("NotImplemented, only support one ConcatRewriter now."); + } + return rewriters_[0]->schema(); + } + + private: + std::vector<std::unique_ptr<ConcatRewriter>> rewriters_; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr<ParquetFileRewriter::Contents> Open( + std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, Review Comment: Any enforcement should be exerted on `sources_metadata`? For example, it should be empty or have the same size as `sources`? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < rewriters_.size(); ++i) { + rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return rewriters_[0]->schema(); } Review Comment: Is this still valid once Close has been called on it? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { Review Comment: Perhaps we need to split this into smaller but dedicated functions? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < rewriters_.size(); ++i) { + rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> rewriters_; + size_t current_rewriter_index_{}; +}; + +class ExtendRewriter { Review Comment: Why naming it ExtendRewriter? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < rewriters_.size(); ++i) { + rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> rewriters_; + size_t current_rewriter_index_{}; +}; + +class ExtendRewriter { + public: + // TODO(HuaHuaY): use type like `std::vector<std::unique_ptr<std::varint<ConcatRewriter, + // RecordBatchStream>>>` to handle batch stream in memory. + explicit ExtendRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + std::unordered_set<std::string> column_paths; + for (auto& rewriter : rewriters_) { + auto schema = rewriter->schema(); + for (int i = 0; i < schema->num_columns(); ++i) { + auto path = schema->Column(i)->path()->ToDotString(); + // TODO(HuaHuaY): give an option about keeping which column. + if (column_paths.find(path) != column_paths.end()) { + throw ParquetException("NotImplemented, files have same column path: ", path); + } + column_paths.emplace(std::move(path)); + } + } + auto row_counts = rewriters_[0]->row_group_row_counts(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (auto current_row_counts = rewriters_[i]->row_group_row_counts(); + row_counts != current_row_counts) { + auto vecToString = [](const std::vector<int64_t>& v) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < v.size(); ++i) { + oss << v[i] << (i + 1 < v.size() ? ", " : ""); + } + oss << "]"; + return oss.str(); + }; + throw ParquetException( + "The number of rows in each block must match! No.0 blocks row counts: ", + vecToString(row_counts), ", No.", i, + " blocks row counts: ", vecToString(current_row_counts)); + } + } + } + + void WriteData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (rewriters_.size() == 1) { + rewriters_[0]->WriteData(true, rg_metadata_builder, page_index_builder, + total_bytes_written); + return; + } + for (auto& rewriter : rewriters_) { + rewriter->WriteData(false, rg_metadata_builder, page_index_builder, + total_bytes_written); + } + } + + bool HasData() { return rewriters_[0]->HasData(); } + + void Close() { + for (auto& rewriter : rewriters_) { + rewriter->Close(); + } + } + + const SchemaDescriptor* schema() const { + // TODO(HuaHuaY): support file joining later. + if (rewriters_.size() > 1) { + throw ParquetException("NotImplemented, only support one ConcatRewriter now."); + } + return rewriters_[0]->schema(); + } + + private: + std::vector<std::unique_ptr<ConcatRewriter>> rewriters_; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr<ParquetFileRewriter::Contents> Open( + std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources, + std::shared_ptr<ArrowOutputStream> sink, + std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata, + std::shared_ptr<const KeyValueMetadata> sink_metadata, + std::shared_ptr<RewriterProperties> props) { + std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile( + std::move(sources), std::move(sink), std::move(sources_metadata), + std::move(sink_metadata), std::move(props))); + return result; + } + + void Close() override { + if (rewriter_) { + rewriter_->Close(); + rewriter_.reset(); + } + } + + void Rewrite() override { + while (rewriter_->HasData()) { Review Comment: ```suggestion while (rewriter_->HasMoreRowGroup()) { ``` Let's be clear here? Same for `WriteData` below. ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < rewriters_.size(); ++i) { + rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return rewriters_[0]->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + std::vector<int64_t> row_counts; + for (auto& rewriter : rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector<std::unique_ptr<SingleFileRewriter>> rewriters_; Review Comment: ```suggestion std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_; ``` ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; Review Comment: ```suggestion int32_t current_row_group_index_{}; ``` ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); Review Comment: Add a log for detailed schema? ########## cpp/src/parquet/file_rewriter.cc: ########## @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include <memory> +#include <numeric> +#include <sstream> +#include <unordered_set> +#include <utility> + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +const std::shared_ptr<RewriterProperties>& default_rewriter_properties() { + static std::shared_ptr<RewriterProperties> default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + const RewriterProperties* props, + std::shared_ptr<RowGroupReader> row_group_reader, + std::shared_ptr<RowGroupPageIndexReader> page_index_reader, + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } else { + // TODO(HuaHuaY): rewrite column chunk with new encoding, compression, etc. + } + } + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::shared_ptr<RowGroupReader> row_group_reader_; + std::shared_ptr<RowGroupPageIndexReader> page_index_reader_; + std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<ArrowOutputStream> sink, + std::shared_ptr<FileMetaData> source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector<int32_t> row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector<int32_t> column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, {true, true}); + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, total_bytes_written); + ++current_row_group_index_; + } + + bool HasData() { return current_row_group_index_ < metadata_->num_row_groups(); } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector<int64_t> row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector<int64_t> row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<ArrowOutputStream> sink_; + const RewriterProperties* props_; + std::unique_ptr<ParquetFileReader> parquet_file_reader_; + std::shared_ptr<PageIndexReader> page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr<FileMetaData> metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>> rewriters) + : rewriters_(std::move(rewriters)) { + auto* schema = rewriters_[0]->schema(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (!schema->Equals(*rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, int64_t& total_bytes_written) { + rewriters_[current_rewriter_index_]->WriteData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasData() { + while (current_rewriter_index_ < rewriters_.size() && + !rewriters_[current_rewriter_index_]->HasData()) { + rewriters_[current_rewriter_index_]->Close(); Review Comment: Add a debug log to show the progress? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
