Copilot commented on code in PR #47775:
URL: https://github.com/apache/arrow/pull/47775#discussion_r2850650602
##########
cpp/src/parquet/bloom_filter_writer.h:
##########
@@ -92,6 +92,18 @@ class PARQUET_EXPORT BloomFilterBuilder {
/// - `WriteTo()` has been called
virtual BloomFilter* CreateBloomFilter(int32_t column_ordinal) = 0;
+ /// \brief Insert a BloomFilter of the column ordinal of the current row
group.
+ ///
+ /// \param column_ordinal Column ordinal for the bloom filter.
+ /// \param bloom_filter The bloom filter to insert.
+ /// \throws ParquetException if any condition is violated:
+ /// - `AppendRowGroup()` has not been called yet
+ /// - The column ordinal is out of bound
+ /// - Bloom filter already exists for the column
+ /// - `WriteTo()` has been called
+ virtual void InsertBloomFilter(int32_t column_ordinal,
+ std::unique_ptr<BloomFilter> bloom_filter) =
0;
Review Comment:
`BloomFilterBuilder` is a `PARQUET_EXPORT` interface; adding a new
pure-virtual method (`InsertBloomFilter`) is a source/binary breaking change
for any downstream subclasses. If external implementations are expected,
consider adding a default implementation or a separate extension interface to
avoid vtable breaks.
```suggestion
std::unique_ptr<BloomFilter> bloom_filter) {
(void)column_ordinal;
(void)bloom_filter;
}
```
##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,808 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/compression.h"
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h" // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+ std::shared_ptr<ArrowOutputStream> to, int64_t size,
+ ::arrow::MemoryPool* pool) {
+ int64_t bytes_copied = 0;
+ if (from->supports_zero_copy()) {
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+ if (buffer->size() == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+ bytes_copied += buffer->size();
+ }
+ return;
+ }
+
+ std::shared_ptr<ResizableBuffer> buffer =
+ AllocateBuffer(pool, kDefaultOutputStreamSize);
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(
+ auto read_size,
+ from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+ if (read_size == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+ bytes_copied += read_size;
+ }
Review Comment:
`CopyStream` allocates a 1KB buffer (`kDefaultOutputStreamSize`) for the
non-zero-copy path. For large column chunks this will cause excessive small
reads/writes and significantly degrade rewrite performance. Consider using a
substantially larger chunk size (e.g., 64KB+ or an Arrow IO default) for the
copy buffer.
##########
cpp/src/parquet/arrow/arrow_rewriter_test.cc:
##########
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/config.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/test_util.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_rewriter.h"
+#include "parquet/metadata.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/test_util.h"
+
+using arrow::Table;
+using arrow::io::BufferReader;
+
+namespace parquet::arrow {
+
+TEST(ParquetRewriterTest, SimpleRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().disable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer,
WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(
+ schema, {R"([[1, "a"], [2,
"b"]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter =
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}},
sink,
+ {{NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_table = ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2,
"b"]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, ConcatRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_up,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"})));
+ ASSERT_OK_AND_ASSIGN(auto buffer_down,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema, {R"([[3,
"c"]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter =
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_up),
+
std::make_shared<BufferReader>(buffer_down)}},
+ sink, {{NULLPTR, NULLPTR}}, NULLPTR,
rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_table =
+ ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, JoinRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+ auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_left,
+ WriteFile(
+ rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"], [3,
"c"]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[10], [20],
[30]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter = ParquetFileRewriter::Open(
+ {{std::make_shared<BufferReader>(buffer_left)},
+ {std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_schema = ::arrow::schema({::arrow::field("a",
::arrow::int32()),
+ ::arrow::field("b", ::arrow::utf8()),
+ ::arrow::field("c",
::arrow::int64())});
+ auto expected_table = ::arrow::TableFromJSON(
+ expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, ConcatJoinRoundTrip) {
+ auto rewriter_properties = RewriterProperties::Builder()
+ .writer_properties(WriterProperties::Builder()
+
.enable_write_page_index()
+
->max_row_group_length(2)
+ ->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+ auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_left_up,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2,
"b"]])"})));
+ ASSERT_OK_AND_ASSIGN(auto buffer_left_down,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[3,
"c"]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[10], [20],
[30]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter = ParquetFileRewriter::Open(
+ {{std::make_shared<BufferReader>(buffer_left_up),
+ std::make_shared<BufferReader>(buffer_left_down)},
+ {std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR, NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_schema = ::arrow::schema({::arrow::field("a",
::arrow::int32()),
+ ::arrow::field("b", ::arrow::utf8()),
+ ::arrow::field("c",
::arrow::int64())});
+ auto expected_table = ::arrow::TableFromJSON(
+ expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, JoinRowCountsMismatch) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
+ auto right_schema = ::arrow::schema({::arrow::field("b", ::arrow::int32())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer_left,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1],
[2]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[3], [4], [5]])"})));
+
+ auto sink = CreateOutputStream();
+
+ EXPECT_THROW_THAT(
+ [&]() {
+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_left)},
+
{std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR}, {NULLPTR}}, NULLPTR,
+ rewriter_properties);
+ },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr("The number of rows in each block must
match")));
+}
+
+TEST(ParquetRewriterTest, InvalidInputDimensions) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema,
{R"([[1]])"})));
+
+ auto sink = CreateOutputStream();
+
+ EXPECT_THROW_THAT(
+ [&]() {
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}},
sink, {},
+ NULLPTR, rewriter_properties);
+ },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr(
+ "The number of sources and sources_metadata must be the same")));
Review Comment:
`EXPECT_THROW_THAT` is given a lambda expression that is never invoked, so
this assertion won't actually execute `ParquetFileRewriter::Open(...)`. Provide
the throwing statement/block directly to `EXPECT_THROW_THAT` (or invoke the
lambda) so the test fails/passes correctly.
##########
cpp/src/parquet/arrow/arrow_rewriter_test.cc:
##########
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/config.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/test_util.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_rewriter.h"
+#include "parquet/metadata.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/test_util.h"
+
+using arrow::Table;
+using arrow::io::BufferReader;
+
+namespace parquet::arrow {
+
+TEST(ParquetRewriterTest, SimpleRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().disable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer,
WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(
+ schema, {R"([[1, "a"], [2,
"b"]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter =
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}},
sink,
+ {{NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_table = ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2,
"b"]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, ConcatRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_up,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"})));
+ ASSERT_OK_AND_ASSIGN(auto buffer_down,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema, {R"([[3,
"c"]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter =
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_up),
+
std::make_shared<BufferReader>(buffer_down)}},
+ sink, {{NULLPTR, NULLPTR}}, NULLPTR,
rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_table =
+ ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, JoinRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+ auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_left,
+ WriteFile(
+ rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"], [3,
"c"]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[10], [20],
[30]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter = ParquetFileRewriter::Open(
+ {{std::make_shared<BufferReader>(buffer_left)},
+ {std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_schema = ::arrow::schema({::arrow::field("a",
::arrow::int32()),
+ ::arrow::field("b", ::arrow::utf8()),
+ ::arrow::field("c",
::arrow::int64())});
+ auto expected_table = ::arrow::TableFromJSON(
+ expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, ConcatJoinRoundTrip) {
+ auto rewriter_properties = RewriterProperties::Builder()
+ .writer_properties(WriterProperties::Builder()
+
.enable_write_page_index()
+
->max_row_group_length(2)
+ ->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+ auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_left_up,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2,
"b"]])"})));
+ ASSERT_OK_AND_ASSIGN(auto buffer_left_down,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[3,
"c"]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[10], [20],
[30]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter = ParquetFileRewriter::Open(
+ {{std::make_shared<BufferReader>(buffer_left_up),
+ std::make_shared<BufferReader>(buffer_left_down)},
+ {std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR, NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_schema = ::arrow::schema({::arrow::field("a",
::arrow::int32()),
+ ::arrow::field("b", ::arrow::utf8()),
+ ::arrow::field("c",
::arrow::int64())});
+ auto expected_table = ::arrow::TableFromJSON(
+ expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, JoinRowCountsMismatch) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
+ auto right_schema = ::arrow::schema({::arrow::field("b", ::arrow::int32())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer_left,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1],
[2]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[3], [4], [5]])"})));
+
+ auto sink = CreateOutputStream();
+
+ EXPECT_THROW_THAT(
+ [&]() {
+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_left)},
+
{std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR}, {NULLPTR}}, NULLPTR,
+ rewriter_properties);
+ },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr("The number of rows in each block must
match")));
Review Comment:
`EXPECT_THROW_THAT` is currently given a lambda expression that is never
invoked, so the assertion won't actually execute
`ParquetFileRewriter::Open(...)` and therefore won't observe the expected
exception. Pass the throwing statement/block directly to `EXPECT_THROW_THAT`
(or immediately invoke the lambda) so the test actually validates the behavior.
##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -219,6 +222,26 @@ BloomFilter*
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
return curr_rg_bfs.emplace(column_ordinal,
std::move(bf)).first->second.get();
}
+void BloomFilterBuilderImpl::InsertBloomFilter(
+ int32_t column_ordinal, std::unique_ptr<BloomFilter> bloom_filter) {
+ auto opts =
properties_->bloom_filter_options(schema_->Column(column_ordinal)->path());
+ if (!opts.has_value() || bloom_filter == nullptr) {
+ return;
+ }
+
+ CheckState(column_ordinal);
+
Review Comment:
`InsertBloomFilter` calls `schema_->Column(column_ordinal)` before
`CheckState(column_ordinal)`. If `column_ordinal` is out of bounds, this will
dereference invalid memory instead of throwing a `ParquetException` as
documented. Call `CheckState` before accessing `schema_->Column(...)` (and then
decide whether to no-op when bloom filters are disabled).
##########
cpp/src/parquet/arrow/arrow_rewriter_test.cc:
##########
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/config.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/test_util.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_rewriter.h"
+#include "parquet/metadata.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/test_util.h"
+
+using arrow::Table;
+using arrow::io::BufferReader;
+
+namespace parquet::arrow {
+
+TEST(ParquetRewriterTest, SimpleRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().disable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer,
WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(
+ schema, {R"([[1, "a"], [2,
"b"]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter =
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}},
sink,
+ {{NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_table = ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2,
"b"]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, ConcatRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_up,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"})));
+ ASSERT_OK_AND_ASSIGN(auto buffer_down,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema, {R"([[3,
"c"]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter =
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_up),
+
std::make_shared<BufferReader>(buffer_down)}},
+ sink, {{NULLPTR, NULLPTR}}, NULLPTR,
rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_table =
+ ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, JoinRoundTrip) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+ auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_left,
+ WriteFile(
+ rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"], [3,
"c"]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[10], [20],
[30]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter = ParquetFileRewriter::Open(
+ {{std::make_shared<BufferReader>(buffer_left)},
+ {std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_schema = ::arrow::schema({::arrow::field("a",
::arrow::int32()),
+ ::arrow::field("b", ::arrow::utf8()),
+ ::arrow::field("c",
::arrow::int64())});
+ auto expected_table = ::arrow::TableFromJSON(
+ expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, ConcatJoinRoundTrip) {
+ auto rewriter_properties = RewriterProperties::Builder()
+ .writer_properties(WriterProperties::Builder()
+
.enable_write_page_index()
+
->max_row_group_length(2)
+ ->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int32()), ::arrow::field("b",
::arrow::utf8())});
+ auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
+
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_left_up,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2,
"b"]])"})));
+ ASSERT_OK_AND_ASSIGN(auto buffer_left_down,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[3,
"c"]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[10], [20],
[30]])"})));
+
+ auto sink = CreateOutputStream();
+ auto rewriter = ParquetFileRewriter::Open(
+ {{std::make_shared<BufferReader>(buffer_left_up),
+ std::make_shared<BufferReader>(buffer_left_down)},
+ {std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR, NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
+ rewriter->Rewrite();
+ rewriter->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
+ auto file_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
+ ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(::arrow::default_memory_pool(),
+ std::move(file_reader)));
+
+ ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
+ ASSERT_OK(table->ValidateFull());
+
+ auto expected_schema = ::arrow::schema({::arrow::field("a",
::arrow::int32()),
+ ::arrow::field("b", ::arrow::utf8()),
+ ::arrow::field("c",
::arrow::int64())});
+ auto expected_table = ::arrow::TableFromJSON(
+ expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
+ AssertTablesEqual(*expected_table, *table);
+}
+
+TEST(ParquetRewriterTest, JoinRowCountsMismatch) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto left_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
+ auto right_schema = ::arrow::schema({::arrow::field("b", ::arrow::int32())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer_left,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(left_schema, {R"([[1],
[2]])"})));
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer_right,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(right_schema, {R"([[3], [4], [5]])"})));
+
+ auto sink = CreateOutputStream();
+
+ EXPECT_THROW_THAT(
+ [&]() {
+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_left)},
+
{std::make_shared<BufferReader>(buffer_right)}},
+ sink, {{NULLPTR}, {NULLPTR}}, NULLPTR,
+ rewriter_properties);
+ },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr("The number of rows in each block must
match")));
+}
+
+TEST(ParquetRewriterTest, InvalidInputDimensions) {
+ auto rewriter_properties =
+ RewriterProperties::Builder()
+ .writer_properties(
+ WriterProperties::Builder().enable_write_page_index()->build())
+ ->build();
+
+ auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
+
+ ASSERT_OK_AND_ASSIGN(auto buffer,
+ WriteFile(rewriter_properties->writer_properties(),
+ ::arrow::TableFromJSON(schema,
{R"([[1]])"})));
+
+ auto sink = CreateOutputStream();
+
+ EXPECT_THROW_THAT(
+ [&]() {
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}},
sink, {},
+ NULLPTR, rewriter_properties);
+ },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr(
+ "The number of sources and sources_metadata must be the same")));
+
+ EXPECT_THROW_THAT(
+ [&]() {
+ ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}},
sink, {{}},
+ NULLPTR, rewriter_properties);
+ },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr(
+ "The number of sources and sources_metadata must be the same")));
Review Comment:
Same issue as above: the lambda passed to `EXPECT_THROW_THAT` isn't invoked,
so the exception is never thrown from within the assertion. Pass a
statement/block that performs the call directly (or invoke the lambda) to make
this test effective.
##########
cpp/src/parquet/file_rewriter.cc:
##########
@@ -0,0 +1,808 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_rewriter.h"
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <optional>
+#include <ranges>
+#include <unordered_set>
+#include <utility>
+
+#include "arrow/util/compression.h"
+#include "arrow/util/logging.h"
+#include "parquet/bloom_filter.h" // IWYU pragma: keep
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/bloom_filter_writer.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
+#include "parquet/index_location.h"
+#include "parquet/metadata.h"
+#include "parquet/page_index.h"
+#include "parquet/platform.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+void CopyStream(std::shared_ptr<ArrowInputStream> from,
+ std::shared_ptr<ArrowOutputStream> to, int64_t size,
+ ::arrow::MemoryPool* pool) {
+ int64_t bytes_copied = 0;
+ if (from->supports_zero_copy()) {
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied));
+ if (buffer->size() == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size()));
+ bytes_copied += buffer->size();
+ }
+ return;
+ }
+
+ std::shared_ptr<ResizableBuffer> buffer =
+ AllocateBuffer(pool, kDefaultOutputStreamSize);
+ while (bytes_copied < size) {
+ PARQUET_ASSIGN_OR_THROW(
+ auto read_size,
+ from->Read(std::min(size - bytes_copied, buffer->size()), &buffer));
+ if (read_size == 0) {
+ throw ParquetException("Unexpected end of stream at ", bytes_copied);
+ }
+ PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size));
+ bytes_copied += read_size;
+ }
+}
+} // namespace
+
+const std::shared_ptr<RewriterProperties>& default_rewriter_properties() {
+ static std::shared_ptr<RewriterProperties> default_rewriter_properties =
+ RewriterProperties::Builder().build();
+ return default_rewriter_properties;
+}
+
+class PagesRewriter {
+ public:
+ PagesRewriter(const RewriterProperties& props, std::unique_ptr<PageReader>
page_reader,
+ std::unique_ptr<PageWriter> page_writer,
+ std::shared_ptr<OffsetIndex> original_offset_index)
+ : props_(props),
+ page_reader_(std::move(page_reader)),
+ page_writer_(std::move(page_writer)),
+ original_offset_index_(std::move(original_offset_index)) {}
+
+ void WritePages() {
+ bool has_dictionary = false;
+ bool fallback = false;
+ std::shared_ptr<Page> page;
+ size_t page_no = 0;
+ while ((page = page_reader_->NextPage()) != nullptr) {
+ switch (page->type()) {
+ case parquet::PageType::DICTIONARY_PAGE: {
+ WriteDictionaryPage(*static_cast<const DictionaryPage*>(page.get()));
+ has_dictionary = true;
+ break;
+ }
+ case parquet::PageType::DATA_PAGE: {
+ auto& data_page = *static_cast<const DataPageV1*>(page.get());
+ if (data_page.encoding() != Encoding::PLAIN_DICTIONARY) {
+ fallback = true;
+ }
+ WriteDataPageV1(data_page, page_no);
+ page_no++;
+ break;
+ }
+ case parquet::PageType::DATA_PAGE_V2: {
+ auto& data_page = *static_cast<const DataPageV2*>(page.get());
+ if (data_page.encoding() != Encoding::PLAIN_DICTIONARY) {
+ fallback = true;
+ }
+ WriteDataPageV2(data_page, page_no);
+ page_no++;
+ break;
+ }
+ default: {
+ ARROW_LOG(DEBUG) << "Unsupported page type: " <<
static_cast<int>(page->type());
+ break;
+ }
+ }
+ }
+ page_writer_->Close(has_dictionary, has_dictionary && fallback);
+ }
+
+ int64_t total_uncompressed_size() const { return total_uncompressed_size_; }
+
+ private:
+ void WriteDictionaryPage(const DictionaryPage& dict_page) {
+ total_uncompressed_size_ += page_writer_->WriteDictionaryPage(dict_page);
+ }
+
+ void WriteDataPageV1(const DataPageV1& data_page, const size_t page_no) {
+ std::shared_ptr<Buffer> compressed_data;
+ if (page_writer_->has_compressor()) {
+ auto buffer = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(props_.memory_pool(), data_page.size()));
+ page_writer_->Compress(*data_page.buffer(), buffer.get());
+ compressed_data = std::move(buffer);
+ } else {
+ compressed_data = data_page.buffer();
+ }
+ auto first_row_index =
+ original_offset_index_
+ ? std::optional{original_offset_index_->page_locations()[page_no]
+ .first_row_index}
+ : std::nullopt;
+ SizeStatistics size_statistics;
+ size_statistics.unencoded_byte_array_data_bytes =
+ original_offset_index_ &&
+
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+ ? std::optional{original_offset_index_
+ ->unencoded_byte_array_data_bytes()[page_no]}
+ : std::nullopt;
+ DataPageV1 new_page(compressed_data, data_page.num_values(),
data_page.encoding(),
+ data_page.definition_level_encoding(),
+ data_page.repetition_level_encoding(),
+ data_page.uncompressed_size(), data_page.statistics(),
+ first_row_index, size_statistics);
+ total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+ }
+
+ void WriteDataPageV2(const DataPageV2& data_page, const size_t page_no) {
+ int32_t levels_byte_len = data_page.repetition_levels_byte_length() +
+ data_page.definition_levels_byte_length();
+ bool page_is_compressed = false;
+ std::shared_ptr<Buffer> output_buffer;
+ if (page_writer_->has_compressor() && data_page.size() > levels_byte_len) {
+ auto values_buffer = SliceBuffer(data_page.buffer(), levels_byte_len);
+ auto compressed_values = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(props_.memory_pool(), values_buffer->size()));
+ page_writer_->Compress(*values_buffer, compressed_values.get());
+ if (compressed_values->size() < values_buffer->size()) {
+ page_is_compressed = true;
+ int64_t combined_size = levels_byte_len + compressed_values->size();
+ auto combined = AllocateBuffer(props_.memory_pool(), combined_size);
+ if (levels_byte_len > 0) {
+ std::memcpy(combined->mutable_data(), data_page.data(),
levels_byte_len);
+ }
+ std::memcpy(combined->mutable_data() + levels_byte_len,
compressed_values->data(),
+ compressed_values->size());
+ output_buffer = std::move(combined);
+ }
+ }
+ if (!page_is_compressed) {
+ output_buffer = data_page.buffer();
+ }
+
+ auto first_row_index =
+ original_offset_index_
+ ? std::optional{original_offset_index_->page_locations()[page_no]
+ .first_row_index}
+ : std::nullopt;
+ SizeStatistics size_statistics;
+ size_statistics.unencoded_byte_array_data_bytes =
+ original_offset_index_ &&
+
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
+ ? std::optional{original_offset_index_
+ ->unencoded_byte_array_data_bytes()[page_no]}
+ : std::nullopt;
+ DataPageV2 new_page(output_buffer, data_page.num_values(),
data_page.num_nulls(),
+ data_page.num_rows(), data_page.encoding(),
+ data_page.definition_levels_byte_length(),
+ data_page.repetition_levels_byte_length(),
+ data_page.uncompressed_size(), page_is_compressed,
+ data_page.statistics(), first_row_index,
size_statistics);
+ total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
+ }
+
+ const RewriterProperties& props_;
+ std::unique_ptr<PageReader> page_reader_;
+ std::unique_ptr<PageWriter> page_writer_;
+ std::shared_ptr<OffsetIndex> original_offset_index_;
+ int64_t total_uncompressed_size_{0};
+};
+
+class ColumnChunkRewriter {
+ public:
+ ColumnChunkRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ const RewriterProperties& props,
+ std::unique_ptr<ColumnChunkMetaData> metadata,
+ std::shared_ptr<RowGroupPageIndexReader>
page_index_reader,
+ std::shared_ptr<RowGroupBloomFilterReader>
bloom_filter_reader,
+ int32_t row_group_ordinal, int32_t column_ordinal, bool
fast_copy)
+ : source_(std::move(source)),
+ sink_(std::move(sink)),
+ props_(props),
+ metadata_(std::move(metadata)),
+ page_index_reader_(std::move(page_index_reader)),
+ bloom_filter_reader_(std::move(bloom_filter_reader)),
+ row_group_ordinal_(row_group_ordinal),
+ column_ordinal_(column_ordinal),
+ fast_copy_(fast_copy) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("ColumnChunkMetaData should not be nullptr");
+ }
+ }
+
+ static bool CanFastCopy(const RewriterProperties& props,
+ const ColumnChunkMetaData& metadata) {
+ Compression::type original_codec = metadata.compression();
+ auto column_path = metadata.path_in_schema();
+ Compression::type new_codec =
props.writer_properties()->compression(column_path);
+ return (original_codec == new_codec);
+ }
+
+ void WriteColumnChunkData(RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ auto& reader_props = props_.reader_properties();
+ auto& writer_props = *props_.writer_properties();
+ auto stream = reader_props.GetStream(source_, metadata_->start_offset(),
+ metadata_->total_compressed_size());
+
+ if (fast_copy_) {
+ PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+ int64_t shift = sink_offset - metadata_->start_offset();
+
+ CopyStream(stream, sink_, metadata_->total_compressed_size(),
props_.memory_pool());
+ PARQUET_THROW_NOT_OK(stream->Close());
+
+ rg_metadata_builder.NextColumnChunk(std::move(metadata_), shift);
+
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ auto offset_index =
page_index_reader_->GetOffsetIndex(column_ordinal_);
+ if (offset_index != nullptr) {
+ page_index_builder->SetOffsetIndex(column_ordinal_, offset_index,
shift);
+ }
+ }
+
+ total_bytes_written += metadata_->total_uncompressed_size();
+ } else {
+ auto column_path = metadata_->path_in_schema();
+ auto new_codec = writer_props.compression(column_path);
+ auto codec_options = writer_props.codec_options(column_path);
+
+ auto* cc_metadata_builder = rg_metadata_builder.NextColumnChunk();
+
+ OffsetIndexBuilder* offset_index_builder = nullptr;
+ std::shared_ptr<OffsetIndex> original_offset_index = nullptr;
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ offset_index_builder =
page_index_builder->GetOffsetIndexBuilder(column_ordinal_);
+ original_offset_index =
page_index_reader_->GetOffsetIndex(column_ordinal_);
+ }
+
+ auto page_reader = PageReader::Open(std::move(stream),
metadata_->num_values(),
+ metadata_->compression(),
reader_props);
+ auto page_writer = PageWriter::Open(
+ sink_, new_codec, cc_metadata_builder,
static_cast<int16_t>(row_group_ordinal_),
+ static_cast<int16_t>(column_ordinal_), props_.memory_pool(),
+ /*buffered_row_group=*/false,
+ /*header_encryptor=*/nullptr, /*data_encryptor=*/nullptr,
+ writer_props.page_checksum_enabled(),
+ /*column_index_builder=*/nullptr, offset_index_builder,
+ codec_options ? *codec_options : CodecOptions{});
+
+ PagesRewriter pages_rewriter(props_, std::move(page_reader),
std::move(page_writer),
+ std::move(original_offset_index));
+ pages_rewriter.WritePages();
+
+ total_bytes_written += pages_rewriter.total_uncompressed_size();
+ }
+ if (page_index_reader_ != nullptr && page_index_builder != nullptr) {
+ auto column_index = page_index_reader_->GetColumnIndex(column_ordinal_);
+ if (column_index != nullptr) {
+ page_index_builder->SetColumnIndex(column_ordinal_, column_index);
+ }
+ }
+ if (bloom_filter_reader_ != nullptr && bloom_filter_builder != nullptr) {
+ auto bloom_filter =
bloom_filter_reader_->GetColumnBloomFilter(column_ordinal_);
+ if (bloom_filter != nullptr) {
+ bloom_filter_builder->InsertBloomFilter(column_ordinal_,
std::move(bloom_filter));
+ }
+ }
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::unique_ptr<ColumnChunkMetaData> metadata_;
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+ std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+ const int32_t row_group_ordinal_;
+ const int32_t column_ordinal_;
+ const bool fast_copy_;
+};
+
+class RowGroupRewriter {
+ public:
+ RowGroupRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ const RewriterProperties& props,
+ std::shared_ptr<RowGroupReader> row_group_reader,
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader,
+ std::shared_ptr<RowGroupBloomFilterReader>
bloom_filter_reader)
+ : source_(std::move(source)),
+ sink_(std::move(sink)),
+ props_(props),
+ row_group_reader_(std::move(row_group_reader)),
+ page_index_reader_(std::move(page_index_reader)),
+ bloom_filter_reader_(std::move(bloom_filter_reader)),
+ metadata_(row_group_reader_->metadata()) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("RowGroupMetaData should not be nullptr");
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ rg_metadata_builder.set_num_rows(metadata_->num_rows());
+
+ std::vector<bool> can_column_chunk_fast_copy(metadata_->num_columns());
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ can_column_chunk_fast_copy[i] =
+ ColumnChunkRewriter::CanFastCopy(props_, *cc_metadata);
+ }
+ bool fast_copy = std::ranges::all_of(can_column_chunk_fast_copy,
std::identity{});
+ if (fast_copy) {
+ fast_copy = metadata_->file_offset() != 0;
+ }
+ if (fast_copy) {
+ PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell());
+ int64_t shift = sink_offset - metadata_->file_offset();
+
+ auto stream = props_.reader_properties().GetStream(
+ source_, metadata_->file_offset(),
metadata_->total_compressed_size());
+ CopyStream(stream, sink_, metadata_->total_compressed_size(),
props_.memory_pool());
+ PARQUET_THROW_NOT_OK(stream->Close());
+
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ rg_metadata_builder.NextColumnChunk(std::move(cc_metadata), shift);
+
+ auto column_index =
+ page_index_reader_ ? page_index_reader_->GetColumnIndex(i) :
nullptr;
+ auto offset_index =
+ page_index_reader_ ? page_index_reader_->GetOffsetIndex(i) :
nullptr;
+ auto bloom_filter = bloom_filter_reader_
+ ? bloom_filter_reader_->GetColumnBloomFilter(i)
+ : nullptr;
+
+ if (column_index != nullptr && page_index_builder != nullptr) {
+ page_index_builder->SetColumnIndex(i, column_index);
+ }
+ if (offset_index != nullptr && page_index_builder != nullptr) {
+ page_index_builder->SetOffsetIndex(i, offset_index, shift);
+ }
+ if (bloom_filter != nullptr && bloom_filter_builder != nullptr) {
+ bloom_filter_builder->InsertBloomFilter(i, std::move(bloom_filter));
+ }
+ }
+
+ total_bytes_written += metadata_->total_byte_size();
+ } else {
+ for (int i = 0; i < metadata_->num_columns(); ++i) {
+ auto cc_metadata = metadata_->ColumnChunk(i);
+ ColumnChunkRewriter rewriter(source_, sink_, props_,
std::move(cc_metadata),
+ page_index_reader_, bloom_filter_reader_,
+ row_group_ordinal, i,
can_column_chunk_fast_copy[i]);
+ rewriter.WriteColumnChunkData(rg_metadata_builder, page_index_builder,
+ bloom_filter_builder,
total_bytes_written);
+ }
+ }
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::shared_ptr<RowGroupReader> row_group_reader_;
+ std::shared_ptr<RowGroupPageIndexReader> page_index_reader_;
+ std::shared_ptr<RowGroupBloomFilterReader> bloom_filter_reader_;
+ const RowGroupMetaData* metadata_;
+};
+
+class SingleFileRewriter {
+ public:
+ SingleFileRewriter(std::shared_ptr<ArrowInputFile> source,
+ std::shared_ptr<ArrowOutputStream> sink,
+ std::shared_ptr<FileMetaData> source_metadata,
+ const RewriterProperties& props)
+ : source_(source),
+ sink_(std::move(sink)),
+ props_(props),
+ parquet_file_reader_(ParquetFileReader::Open(
+ std::move(source), props_.reader_properties(),
std::move(source_metadata))),
+ page_index_reader_(parquet_file_reader_->GetPageIndexReader()),
+ bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()),
+ metadata_(parquet_file_reader_->metadata()) {
+ if (metadata_ == nullptr) {
+ throw ParquetException("FileMetaData should not be nullptr");
+ }
+
+ std::vector<int32_t> row_group_indices(metadata_->num_row_groups());
+ std::iota(row_group_indices.begin(), row_group_indices.end(), 0);
+ std::vector<int32_t> column_indices(metadata_->num_columns());
+ std::iota(column_indices.begin(), column_indices.end(), 0);
+ if (page_index_reader_) {
+ page_index_reader_->WillNeed(row_group_indices, column_indices,
+ {/*column_index=*/true,
/*offset_index=*/true});
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ if (current_row_group_index_ >= metadata_->num_row_groups()) {
+ throw ParquetException("Trying to read row group ",
current_row_group_index_,
+ " but file only has ",
metadata_->num_row_groups(),
+ " row group");
+ }
+ auto row_group_reader =
parquet_file_reader_->RowGroup(current_row_group_index_);
+ auto page_index_reader = page_index_reader_
+ ?
page_index_reader_->RowGroup(current_row_group_index_)
+ : nullptr;
+ auto bloom_filter_reader =
bloom_filter_reader_.RowGroup(current_row_group_index_);
+ RowGroupRewriter rewriter(source_, sink_, props_,
std::move(row_group_reader),
+ std::move(page_index_reader),
+ std::move(bloom_filter_reader));
+ rewriter.WriteRowGroupData(row_group_ordinal, rg_metadata_builder,
page_index_builder,
+ bloom_filter_builder, total_bytes_written);
+ ++current_row_group_index_;
+ }
+
+ bool HasMoreRowGroup() {
+ return current_row_group_index_ < metadata_->num_row_groups();
+ }
+
+ void Close() { parquet_file_reader_->Close(); }
+
+ const SchemaDescriptor& schema() const { return *metadata_->schema(); }
+
+ std::vector<int64_t> row_group_row_counts() const {
+ int num_row_groups = metadata_->num_row_groups();
+ std::vector<int64_t> row_counts;
+ row_counts.reserve(num_row_groups);
+ for (int i = 0; i < num_row_groups; ++i) {
+ row_counts.emplace_back(metadata_->RowGroup(i)->num_rows());
+ }
+ return row_counts;
+ }
+
+ private:
+ std::shared_ptr<ArrowInputFile> source_;
+ std::shared_ptr<ArrowOutputStream> sink_;
+ const RewriterProperties& props_;
+ std::unique_ptr<ParquetFileReader> parquet_file_reader_;
+ std::shared_ptr<PageIndexReader> page_index_reader_;
+ BloomFilterReader& bloom_filter_reader_;
+ std::shared_ptr<FileMetaData> metadata_;
+ int current_row_group_index_{};
+};
+
+class ConcatRewriter {
+ public:
+ explicit ConcatRewriter(std::vector<std::unique_ptr<SingleFileRewriter>>
rewriters)
+ : file_rewriters_(std::move(rewriters)) {
+ if (file_rewriters_.empty()) {
+ throw ParquetException("At least one SingleFileRewriter is required");
+ }
+ auto& schema = file_rewriters_[0]->schema();
+ if (std::ranges::any_of(
+ file_rewriters_ | std::views::drop(1),
+ [&schema](auto& rewriter) { return
!schema.Equals(rewriter->schema()); })) {
+ throw ParquetException("Input files have different schemas.");
+ }
+ }
+
+ void WriteRowGroupData(int32_t row_group_ordinal,
+ RowGroupMetaDataBuilder& rg_metadata_builder,
+ PageIndexBuilder* page_index_builder,
+ BloomFilterBuilder* bloom_filter_builder,
+ int64_t& total_bytes_written) {
+ file_rewriters_[current_rewriter_index_]->WriteRowGroupData(
+ row_group_ordinal, rg_metadata_builder, page_index_builder,
bloom_filter_builder,
+ total_bytes_written);
+ }
+
+ bool HasMoreRowGroup() {
+ while (current_rewriter_index_ < file_rewriters_.size() &&
+ !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) {
+ file_rewriters_[current_rewriter_index_]->Close();
+ ARROW_LOG(DEBUG) << "Finished rewriting file index " <<
current_rewriter_index_;
+ ++current_rewriter_index_;
+ }
+ return current_rewriter_index_ < file_rewriters_.size();
+ }
+
+ void Close() { std::ranges::for_each(file_rewriters_,
&SingleFileRewriter::Close); }
+
+ const SchemaDescriptor& schema() const { return
file_rewriters_[0]->schema(); }
+
+ std::vector<int64_t> row_group_row_counts() const {
+ std::vector<int64_t> row_counts;
+ for (auto& rewriter : file_rewriters_) {
+ auto count = rewriter->row_group_row_counts();
+ row_counts.insert(row_counts.end(), count.begin(), count.end());
+ }
+ return row_counts;
+ }
+
+ private:
+ std::vector<std::unique_ptr<SingleFileRewriter>> file_rewriters_;
+ size_t current_rewriter_index_{};
+};
+
+class JoinRewriter {
+ public:
+ explicit JoinRewriter(std::vector<std::unique_ptr<ConcatRewriter>> rewriters)
+ : rewriters_(std::move(rewriters)) {
+ if (rewriters_.empty()) {
+ throw ParquetException("At least one ConcatRewriter is required");
+ }
+ auto row_counts = rewriters_[0]->row_group_row_counts();
+ for (size_t i = 1; i < rewriters_.size(); ++i) {
+ if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
+ row_counts != current_row_counts) {
+ // TODO(anyone): use `std::format("{}", row_counts)` instead when
C++23 available
+ auto vecToString = [](const std::vector<int64_t>& v) {
+ if (v.empty()) {
+ return std::string("[]");
+ }
+
+ std::string s = std::format("[{}", v[0]);
+ for (const auto& val : v | std::views::drop(1)) {
+ s += std::format(", {}", val);
+ }
+ s += "]";
+ return s;
+ };
+ throw ParquetException(
+ "The number of rows in each block must match! No.0 blocks row
counts: ",
+ vecToString(row_counts), ", No.", i,
+ " blocks row counts: ", vecToString(current_row_counts));
+ }
+ }
+
+ std::unordered_set<std::string> column_paths;
+ schema::NodeVector fields;
+
+ for (auto& rewriter : rewriters_) {
+ const SchemaDescriptor& schema_desc = rewriter->schema();
+
+ for (int i = 0; i < schema_desc.num_columns(); ++i) {
+ auto path = schema_desc.Column(i)->path()->ToDotString();
+ if (auto [_, inserted] = column_paths.emplace(path); !inserted) {
+ // TODO(HuaHuaY): support choose one column from columns with same
path
+ ParquetException::NYI(std::format("files have the same column path:
{}", path));
+ }
Review Comment:
This file uses `std::format` but does not include `<format>`, which will
fail to compile on standard-conforming toolchains. Also, Arrow/Parquet code
typically avoids `std::format` due to uneven standard library support; consider
building these strings with `std::stringstream`/`std::ostringstream` (or
another project utility) instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]