This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 710f96069f GH-36189: [C++][Parquet] StreamReader::SkipRows() skips to
incorrect place in multi-row-group files (#36191)
710f96069f is described below
commit 710f96069f9f036338a0dd99202f0486bb7d1984
Author: KarateSnowMachine <[email protected]>
AuthorDate: Thu Aug 3 05:32:52 2023 -0400
GH-36189: [C++][Parquet] StreamReader::SkipRows() skips to incorrect place
in multi-row-group files (#36191)
### Rationale for this change
The behavior of Parquet `StreamReader::SkipRows()` is wrong due to an error
in calculating the row offset from the current row group.
### What changes are included in this PR?
A unit test case demonstrating the failure and a trivial fix.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
I am not sure if this bug is critical given how long it has existed in the
code and no one has seemed to notice. There are two manifestations of this bug
that might give the user the wrong impression about what is in their data:
* sometimes a negative return value is returned, which is unexpected given
the nature of the API, so the user should know something is up (this is how I
discovered the bug)
* the `SkipRows()` call leads to setting of the `eof` flag prematurely,
which might lead the user to think there is less data in the file than there is.
* Closes: #36189
Lead-authored-by: Paul Rosenfeld <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: KarateSnowMachine
<[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
cpp/src/parquet/stream_reader.cc | 2 +-
cpp/src/parquet/stream_reader_test.cc | 112 ++++++++++++++++++++++++++++++----
2 files changed, 101 insertions(+), 13 deletions(-)
diff --git a/cpp/src/parquet/stream_reader.cc b/cpp/src/parquet/stream_reader.cc
index 0fecb1bf24..d3353aa334 100644
--- a/cpp/src/parquet/stream_reader.cc
+++ b/cpp/src/parquet/stream_reader.cc
@@ -441,7 +441,7 @@ int64_t StreamReader::SkipRows(int64_t num_rows_to_skip) {
while (!eof_ && (num_rows_remaining_to_skip > 0)) {
int64_t num_rows_in_row_group = row_group_reader_->metadata()->num_rows();
int64_t num_rows_remaining_in_row_group =
- num_rows_in_row_group - current_row_ - row_group_row_offset_;
+ num_rows_in_row_group - (current_row_ - row_group_row_offset_);
if (num_rows_remaining_in_row_group > num_rows_remaining_to_skip) {
for (auto reader : column_readers_) {
diff --git a/cpp/src/parquet/stream_reader_test.cc
b/cpp/src/parquet/stream_reader_test.cc
index fed036bca5..04140f6ad0 100644
--- a/cpp/src/parquet/stream_reader_test.cc
+++ b/cpp/src/parquet/stream_reader_test.cc
@@ -17,13 +17,11 @@
#include "parquet/stream_reader.h"
-#include <fcntl.h>
#include <gtest/gtest.h>
#include <chrono>
#include <ctime>
#include <memory>
-#include <utility>
#include "arrow/io/file.h"
#include "arrow/util/decimal.h"
@@ -38,7 +36,7 @@ using optional = StreamReader::optional<T>;
using ::std::nullopt;
struct TestData {
- static void init() { std::time(&ts_offset_); }
+ static void Init() { std::time(&ts_offset_); }
static constexpr int num_rows = 2000;
@@ -145,18 +143,18 @@ constexpr int TestData::num_rows;
class TestStreamReader : public ::testing::Test {
public:
- TestStreamReader() { createTestFile(); }
+ TestStreamReader() { CreateTestFile(); }
protected:
const char* GetDataFile() const { return "stream_reader_test.parquet"; }
- void SetUp() {
+ void SetUp() override {
PARQUET_ASSIGN_OR_THROW(auto infile,
::arrow::io::ReadableFile::Open(GetDataFile()));
auto file_reader = parquet::ParquetFileReader::Open(infile);
reader_ = StreamReader{std::move(file_reader)};
}
- void TearDown() { reader_ = StreamReader{}; }
+ void TearDown() override { reader_ = StreamReader{}; }
std::shared_ptr<schema::GroupNode> GetSchema() {
schema::NodeVector fields;
@@ -201,7 +199,7 @@ class TestStreamReader : public ::testing::Test {
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}
- void createTestFile() {
+ void CreateTestFile() {
PARQUET_ASSIGN_OR_THROW(auto outfile,
::arrow::io::FileOutputStream::Open(GetDataFile()));
@@ -209,7 +207,7 @@ class TestStreamReader : public ::testing::Test {
StreamWriter os{std::move(file_writer)};
- TestData::init();
+ TestData::Init();
for (auto i = 0; i < TestData::num_rows; ++i) {
os << TestData::GetBool(i);
@@ -586,7 +584,7 @@ TEST_F(TestStreamReader, SkipColumns) {
class TestOptionalFields : public ::testing::Test {
public:
- TestOptionalFields() { createTestFile(); }
+ TestOptionalFields() { CreateTestFile(); }
protected:
const char* GetDataFile() const { return
"stream_reader_test_optional_fields.parquet"; }
@@ -644,13 +642,13 @@ class TestOptionalFields : public ::testing::Test {
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}
- void createTestFile() {
+ void CreateTestFile() {
PARQUET_ASSIGN_OR_THROW(auto outfile,
::arrow::io::FileOutputStream::Open(GetDataFile()));
StreamWriter os{ParquetFileWriter::Open(outfile, GetSchema())};
- TestData::init();
+ TestData::Init();
for (auto i = 0; i < TestData::num_rows; ++i) {
os << TestData::GetOptBool(i);
@@ -732,7 +730,7 @@ TEST_F(TestOptionalFields,
ReadOptionalFieldAsRequiredField) {
_provided_ that the optional value is available.
This can be useful if a schema is changed such that a required
- field beomes optional. Applications can continue reading the
+ field becomes optional. Applications can continue reading the
field as if it were mandatory and do not need to be changed if the
field value is always provided.
@@ -947,5 +945,95 @@ TEST_F(TestReadingDataFiles, ByteArrayDecimal) {
EXPECT_EQ(i, 25);
}
+class TestMultiRowGroupStreamReader : public ::testing::Test {
+ protected:
+ const char* GetDataFile() const { return
"stream_reader_multirowgroup_test.parquet"; }
+
+ void SetUp() override {
+ CreateTestFile();
+ PARQUET_ASSIGN_OR_THROW(auto infile,
::arrow::io::ReadableFile::Open(GetDataFile()));
+ auto file_reader = parquet::ParquetFileReader::Open(infile);
+ reader_ = StreamReader{std::move(file_reader)};
+ }
+
+ void TearDown() override { reader_ = StreamReader{}; }
+
+ std::shared_ptr<schema::GroupNode> GetSchema() {
+ schema::NodeVector fields;
+ fields.push_back(schema::PrimitiveNode::Make("row_group_number",
Repetition::REQUIRED,
+ Type::INT32,
ConvertedType::UINT_16));
+
+ fields.push_back(schema::PrimitiveNode::Make("row_number",
Repetition::REQUIRED,
+ Type::INT64,
ConvertedType::UINT_64));
+
+ return std::static_pointer_cast<schema::GroupNode>(
+ schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+ }
+
+ void CreateTestFile() {
+ PARQUET_ASSIGN_OR_THROW(auto outfile,
+
::arrow::io::FileOutputStream::Open(GetDataFile()));
+
+ auto file_writer = ParquetFileWriter::Open(outfile, GetSchema());
+
+ StreamWriter os{std::move(file_writer)};
+
+ int nrows = 0;
+ for (auto group = 0; group < kNumGroups; ++group) {
+ for (auto i = 0; i < kNumRowsPerGroup; ++i) {
+ os << static_cast<uint16_t>(group);
+ os << static_cast<uint64_t>(nrows);
+ os << EndRow;
+ nrows++;
+ }
+ os.EndRowGroup();
+ }
+ }
+
+ void ReadRowAndAssertPosition(uint64_t expected_row_num) {
+ const auto expected_group_num =
+ static_cast<uint16_t>(expected_row_num / kNumRowsPerGroup);
+ ASSERT_FALSE(reader_.eof());
+ uint16_t group_num = 0;
+ uint64_t row_num = 0;
+ reader_ >> group_num >> row_num >> EndRow;
+ ASSERT_EQ(group_num, expected_group_num);
+ ASSERT_EQ(row_num, expected_row_num);
+ }
+
+ StreamReader reader_;
+ static constexpr int kNumGroups = 5;
+ static constexpr int kNumRowsPerGroup = 10;
+};
+
+TEST_F(TestMultiRowGroupStreamReader, SkipRows) {
+ // skip somewhere into the middle of a row group somewhere in the middle of
the file
+ auto current_row = 33;
+
+ auto retval = reader_.SkipRows(current_row);
+ ASSERT_EQ(retval, current_row);
+ ReadRowAndAssertPosition(current_row);
+ // reading the row advances by 1
+ current_row += 1; // row=34
+
+ // skip a few more but stay inside the row group
+ retval = reader_.SkipRows(4);
+ current_row += 4; // row=38
+ ASSERT_EQ(retval, 4);
+ ReadRowAndAssertPosition(current_row);
+ current_row += 1; // row=39
+
+ // skip one more row to get to a group boundary
+ retval = reader_.SkipRows(1);
+ current_row += 1; // row=40
+ ASSERT_EQ(retval, 1);
+ ReadRowAndAssertPosition(current_row);
+
+ // finally, skip off the end of the file
+ retval = reader_.SkipRows(10);
+ ASSERT_EQ(retval, 9); // requested to skip 10 but only 9 rows left in file
+ EXPECT_TRUE(reader_.eof());
+}
+
} // namespace test
} // namespace parquet