This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3fd18e3 ARROW-4267: [C++/Parquet] Handle duplicate and struct columns
in RowGroup reads
3fd18e3 is described below
commit 3fd18e3f6e5ce3e8dce536088c893cb2ce4c3f72
Author: Korn, Uwe <[email protected]>
AuthorDate: Wed Feb 20 09:41:25 2019 -0600
ARROW-4267: [C++/Parquet] Handle duplicate and struct columns in RowGroup
reads
Author: Korn, Uwe <[email protected]>
Closes #3683 from xhochy/ARROW-4267 and squashes the following commits:
cb3958c2e <Korn, Uwe> Handle struct columns on RowGroup level
f2bd6aec2 <Korn, Uwe> Fix format errors
14c73953b <Korn, Uwe> ARROW-4267: Handle duplicate and struct columns in
RowGroup reads
---
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 9 +++
cpp/src/parquet/arrow/reader.cc | 94 +++++++++++++++++------
python/pyarrow/tests/test_parquet.py | 15 +++-
3 files changed, 93 insertions(+), 25 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index b6c3ee5..863c3ea 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -2221,6 +2221,15 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) {
ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
+ // columns: {group1.leaf1, leaf3}
+ ASSERT_OK_NO_THROW(reader_->ReadRowGroup(0, {0, 2}, &table));
+ ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+ ASSERT_EQ(table->num_columns(), 2);
+ ASSERT_EQ(table->schema()->field(0)->name(), "group1");
+ ASSERT_EQ(table->schema()->field(1)->name(), "leaf3");
+ ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
+ ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
+
// columns: {group1.leaf1, group1.leaf2}
ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 0b60c66..a71e083 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -231,6 +231,9 @@ class RowGroupRecordBatchReader : public
::arrow::RecordBatchReader {
// ----------------------------------------------------------------------
// File reader implementation
+using FileColumnIteratorFactory =
+ std::function<FileColumnIterator*(int, ParquetFileReader*)>;
+
class FileReader::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -238,7 +241,8 @@ class FileReader::Impl {
virtual ~Impl() {}
- Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+ Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+ std::unique_ptr<ColumnReader>* out);
Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out);
Status ReadSchemaField(int i, const std::vector<int>& indices,
@@ -246,9 +250,11 @@ class FileReader::Impl {
Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out);
Status ReadColumnChunk(int column_index, int row_group_index,
std::shared_ptr<ChunkedArray>* out);
+ Status ReadColumnChunk(int column_index, const std::vector<int>& indices,
+ int row_group_index, std::shared_ptr<ChunkedArray>*
out);
Status GetReaderForNode(int index, const Node* node, const std::vector<int>&
indices,
- int16_t def_level,
+ int16_t def_level, FileColumnIteratorFactory
iterator_factory,
std::unique_ptr<ColumnReader::ColumnReaderImpl>*
out);
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
@@ -356,8 +362,9 @@ FileReader::FileReader(MemoryPool* pool,
std::unique_ptr<ParquetFileReader> read
FileReader::~FileReader() {}
-Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
- std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i,
reader_.get()));
+Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory
iterator_factory,
+ std::unique_ptr<ColumnReader>* out) {
+ std::unique_ptr<FileColumnIterator> input(iterator_factory(i,
reader_.get()));
std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
new PrimitiveImpl(pool_, std::move(input)));
@@ -367,6 +374,7 @@ Status FileReader::Impl::GetColumn(int i,
std::unique_ptr<ColumnReader>* out) {
Status FileReader::Impl::GetReaderForNode(
int index, const Node* node, const std::vector<int>& indices, int16_t
def_level,
+ FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
*out = nullptr;
@@ -379,7 +387,8 @@ Status FileReader::Impl::GetReaderForNode(
// are supported. This currently just signals the lower level reader
resolution
// to abort
RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
- static_cast<int16_t>(def_level + 1),
&child_reader));
+ static_cast<int16_t>(def_level + 1),
+ iterator_factory, &child_reader));
if (child_reader != nullptr) {
children.push_back(std::move(child_reader));
}
@@ -407,7 +416,7 @@ Status FileReader::Impl::GetReaderForNode(
// Otherwise *out keeps the nullptr value.
if (std::find(indices.begin(), indices.end(), column_index) !=
indices.end()) {
std::unique_ptr<ColumnReader> reader;
- RETURN_NOT_OK(GetColumn(column_index, &reader));
+ RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader));
*out = std::move(reader->impl_);
}
}
@@ -427,12 +436,15 @@ Status FileReader::Impl::ReadSchemaField(int i,
std::shared_ptr<ChunkedArray>* o
Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>&
indices,
std::shared_ptr<ChunkedArray>* out) {
+ FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader*
reader) {
+ return new AllRowGroupsIterator(i, reader);
+ };
auto parquet_schema = reader_->metadata()->schema();
auto node = parquet_schema->group_node()->field(i).get();
std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
- RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
+ RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, iterator_factory,
&reader_impl));
if (reader_impl == nullptr) {
*out = nullptr;
return Status::OK();
@@ -453,8 +465,11 @@ Status FileReader::Impl::ReadSchemaField(int i, const
std::vector<int>& indices,
}
Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out)
{
+ FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader*
reader) {
+ return new AllRowGroupsIterator(i, reader);
+ };
std::unique_ptr<ColumnReader> flat_column_reader;
- RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
+ RETURN_NOT_OK(GetColumn(i, iterator_factory, &flat_column_reader));
int64_t records_to_read = 0;
for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
@@ -473,17 +488,39 @@ Status FileReader::Impl::GetSchema(const
std::vector<int>& indices,
Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
std::shared_ptr<ChunkedArray>* out) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
+
+ for (size_t i = 0; i < indices.size(); ++i) {
+ indices[i] = static_cast<int>(i);
+ }
+
+ return ReadColumnChunk(column_index, indices, row_group_index, out);
+}
+
+Status FileReader::Impl::ReadColumnChunk(int column_index,
+ const std::vector<int>& indices,
+ int row_group_index,
+ std::shared_ptr<ChunkedArray>* out) {
auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
int64_t records_to_read =
rg_metadata->ColumnChunk(column_index)->num_values();
- std::unique_ptr<FileColumnIterator> input(
- new SingleRowGroupIterator(column_index, row_group_index,
reader_.get()));
+ auto parquet_schema = reader_->metadata()->schema();
+ auto node = parquet_schema->group_node()->field(column_index).get();
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
- std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
- new PrimitiveImpl(pool_, std::move(input)));
- ColumnReader flat_column_reader(std::move(impl));
+ FileColumnIteratorFactory iterator_factory = [row_group_index](
+ int i, ParquetFileReader*
reader) {
+ return new SingleRowGroupIterator(i, row_group_index, reader);
+ };
+ RETURN_NOT_OK(
+ GetReaderForNode(column_index, node, indices, 1, iterator_factory,
&reader_impl));
+ if (reader_impl == nullptr) {
+ *out = nullptr;
+ return Status::OK();
+ }
+ ColumnReader reader(std::move(reader_impl));
- return flat_column_reader.NextBatch(records_to_read, out);
+ return reader.NextBatch(records_to_read, out);
}
Status FileReader::Impl::ReadRowGroup(int row_group_index,
@@ -494,16 +531,22 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
- int num_columns = static_cast<int>(indices.size());
- std::vector<std::shared_ptr<Column>> columns(num_columns);
+ // We only need to read schema fields which have columns indicated
+ // in the indices vector
+ std::vector<int> field_indices;
+ if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
+ &field_indices)) {
+ return Status::Invalid("Invalid column index");
+ }
+ int num_fields = static_cast<int>(field_indices.size());
+ std::vector<std::shared_ptr<Column>> columns(num_fields);
// TODO(wesm): Refactor to share more code with ReadTable
- auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns,
this](int i) {
- int column_index = indices[i];
-
+ auto ReadColumnFunc = [&indices, &field_indices, &row_group_index, &schema,
&columns,
+ this](int i) {
std::shared_ptr<ChunkedArray> array;
- RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
+ RETURN_NOT_OK(ReadColumnChunk(field_indices[i], indices, row_group_index,
&array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};
@@ -511,7 +554,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
if (use_threads_) {
std::vector<std::future<Status>> futures;
auto pool = ::arrow::internal::GetCpuThreadPool();
- for (int i = 0; i < num_columns; i++) {
+ for (int i = 0; i < num_fields; i++) {
futures.push_back(pool->Submit(ReadColumnFunc, i));
}
Status final_status = Status::OK();
@@ -523,7 +566,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
}
RETURN_NOT_OK(final_status);
} else {
- for (int i = 0; i < num_columns; i++) {
+ for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(ReadColumnFunc(i));
}
}
@@ -640,7 +683,10 @@ Status OpenFile(const
std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
}
Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
- return impl_->GetColumn(i, out);
+ FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader*
reader) {
+ return new AllRowGroupsIterator(i, reader);
+ };
+ return impl_->GetColumn(i, iterator_factory, out);
}
Status FileReader::GetSchema(const std::vector<int>& indices,
@@ -1708,7 +1754,7 @@ Status
ColumnChunkReader::Read(std::shared_ptr<::arrow::ChunkedArray>* out) {
Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
std::shared_ptr<ChunkedArray> chunked_out;
- RETURN_NOT_OK(impl_->ReadColumnChunk(column_index_, row_group_index_,
&chunked_out));
+ RETURN_NOT_OK(Read(&chunked_out));
return GetSingleChunk(*chunked_out, out);
}
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 5156300..dc7fbef 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -377,6 +377,13 @@ def test_pandas_column_selection(tempdir):
tm.assert_frame_equal(df[['uint8']], df_read)
+ # ARROW-4267: Selection of duplicate columns still leads to these columns
+ # being read uniquely.
+ table_read = _read_table(filename, columns=['uint8', 'uint8'])
+ df_read = table_read.to_pandas()
+
+ tm.assert_frame_equal(df[['uint8']], df_read)
+
def _random_integers(size, dtype):
# We do not generate integers outside the int64 range
@@ -1062,11 +1069,17 @@ def test_read_single_row_group_with_column_subset():
buf.seek(0)
pf = pq.ParquetFile(buf)
- cols = df.columns[:2]
+ cols = list(df.columns[:2])
row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
result = pa.concat_tables(row_groups)
tm.assert_frame_equal(df[cols], result.to_pandas())
+ # ARROW-4267: Selection of duplicate columns still leads to these columns
+ # being read uniquely.
+ row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)]
+ result = pa.concat_tables(row_groups)
+ tm.assert_frame_equal(df[cols], result.to_pandas())
+
def test_scan_contents():
N, K = 10000, 4