This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fd18e3  ARROW-4267: [C++/Parquet] Handle duplicate and struct columns 
in RowGroup reads
3fd18e3 is described below

commit 3fd18e3f6e5ce3e8dce536088c893cb2ce4c3f72
Author: Korn, Uwe <[email protected]>
AuthorDate: Wed Feb 20 09:41:25 2019 -0600

    ARROW-4267: [C++/Parquet] Handle duplicate and struct columns in RowGroup 
reads
    
    Author: Korn, Uwe <[email protected]>
    
    Closes #3683 from xhochy/ARROW-4267 and squashes the following commits:
    
    cb3958c2e <Korn, Uwe> Handle struct columns on RowGroup level
    f2bd6aec2 <Korn, Uwe> Fix format errors
    14c73953b <Korn, Uwe> ARROW-4267:  Handle duplicate and struct columns in 
RowGroup reads
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc |  9 +++
 cpp/src/parquet/arrow/reader.cc                   | 94 +++++++++++++++++------
 python/pyarrow/tests/test_parquet.py              | 15 +++-
 3 files changed, 93 insertions(+), 25 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc 
b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index b6c3ee5..863c3ea 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -2221,6 +2221,15 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) {
   ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
   ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
 
+  // columns: {group1.leaf1, leaf3}
+  ASSERT_OK_NO_THROW(reader_->ReadRowGroup(0, {0, 2}, &table));
+  ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+  ASSERT_EQ(table->num_columns(), 2);
+  ASSERT_EQ(table->schema()->field(0)->name(), "group1");
+  ASSERT_EQ(table->schema()->field(1)->name(), "leaf3");
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
+  ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
+
   // columns: {group1.leaf1, group1.leaf2}
   ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
   ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 0b60c66..a71e083 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -231,6 +231,9 @@ class RowGroupRecordBatchReader : public 
::arrow::RecordBatchReader {
 // ----------------------------------------------------------------------
 // File reader implementation
 
+using FileColumnIteratorFactory =
+    std::function<FileColumnIterator*(int, ParquetFileReader*)>;
+
 class FileReader::Impl {
  public:
   Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -238,7 +241,8 @@ class FileReader::Impl {
 
   virtual ~Impl() {}
 
-  Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+  Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+                   std::unique_ptr<ColumnReader>* out);
 
   Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out);
   Status ReadSchemaField(int i, const std::vector<int>& indices,
@@ -246,9 +250,11 @@ class FileReader::Impl {
   Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out);
   Status ReadColumnChunk(int column_index, int row_group_index,
                          std::shared_ptr<ChunkedArray>* out);
+  Status ReadColumnChunk(int column_index, const std::vector<int>& indices,
+                         int row_group_index, std::shared_ptr<ChunkedArray>* 
out);
 
   Status GetReaderForNode(int index, const Node* node, const std::vector<int>& 
indices,
-                          int16_t def_level,
+                          int16_t def_level, FileColumnIteratorFactory 
iterator_factory,
                           std::unique_ptr<ColumnReader::ColumnReaderImpl>* 
out);
 
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
@@ -356,8 +362,9 @@ FileReader::FileReader(MemoryPool* pool, 
std::unique_ptr<ParquetFileReader> read
 
 FileReader::~FileReader() {}
 
-Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
-  std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, 
reader_.get()));
+Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory 
iterator_factory,
+                                   std::unique_ptr<ColumnReader>* out) {
+  std::unique_ptr<FileColumnIterator> input(iterator_factory(i, 
reader_.get()));
 
   std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
       new PrimitiveImpl(pool_, std::move(input)));
@@ -367,6 +374,7 @@ Status FileReader::Impl::GetColumn(int i, 
std::unique_ptr<ColumnReader>* out) {
 
 Status FileReader::Impl::GetReaderForNode(
     int index, const Node* node, const std::vector<int>& indices, int16_t 
def_level,
+    FileColumnIteratorFactory iterator_factory,
     std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
   *out = nullptr;
 
@@ -379,7 +387,8 @@ Status FileReader::Impl::GetReaderForNode(
       // are supported. This currently just signals the lower level reader 
resolution
       // to abort
       RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
-                                     static_cast<int16_t>(def_level + 1), 
&child_reader));
+                                     static_cast<int16_t>(def_level + 1),
+                                     iterator_factory, &child_reader));
       if (child_reader != nullptr) {
         children.push_back(std::move(child_reader));
       }
@@ -407,7 +416,7 @@ Status FileReader::Impl::GetReaderForNode(
     // Otherwise *out keeps the nullptr value.
     if (std::find(indices.begin(), indices.end(), column_index) != 
indices.end()) {
       std::unique_ptr<ColumnReader> reader;
-      RETURN_NOT_OK(GetColumn(column_index, &reader));
+      RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader));
       *out = std::move(reader->impl_);
     }
   }
@@ -427,12 +436,15 @@ Status FileReader::Impl::ReadSchemaField(int i, 
std::shared_ptr<ChunkedArray>* o
 
 Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& 
indices,
                                          std::shared_ptr<ChunkedArray>* out) {
+  FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* 
reader) {
+    return new AllRowGroupsIterator(i, reader);
+  };
   auto parquet_schema = reader_->metadata()->schema();
 
   auto node = parquet_schema->group_node()->field(i).get();
   std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
 
-  RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
+  RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, iterator_factory, 
&reader_impl));
   if (reader_impl == nullptr) {
     *out = nullptr;
     return Status::OK();
@@ -453,8 +465,11 @@ Status FileReader::Impl::ReadSchemaField(int i, const 
std::vector<int>& indices,
 }
 
 Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) 
{
+  FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* 
reader) {
+    return new AllRowGroupsIterator(i, reader);
+  };
   std::unique_ptr<ColumnReader> flat_column_reader;
-  RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
+  RETURN_NOT_OK(GetColumn(i, iterator_factory, &flat_column_reader));
 
   int64_t records_to_read = 0;
   for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
@@ -473,17 +488,39 @@ Status FileReader::Impl::GetSchema(const 
std::vector<int>& indices,
 
 Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
                                          std::shared_ptr<ChunkedArray>* out) {
+  std::vector<int> indices(reader_->metadata()->num_columns());
+
+  for (size_t i = 0; i < indices.size(); ++i) {
+    indices[i] = static_cast<int>(i);
+  }
+
+  return ReadColumnChunk(column_index, indices, row_group_index, out);
+}
+
+Status FileReader::Impl::ReadColumnChunk(int column_index,
+                                         const std::vector<int>& indices,
+                                         int row_group_index,
+                                         std::shared_ptr<ChunkedArray>* out) {
   auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
   int64_t records_to_read = 
rg_metadata->ColumnChunk(column_index)->num_values();
 
-  std::unique_ptr<FileColumnIterator> input(
-      new SingleRowGroupIterator(column_index, row_group_index, 
reader_.get()));
+  auto parquet_schema = reader_->metadata()->schema();
+  auto node = parquet_schema->group_node()->field(column_index).get();
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
 
-  std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
-      new PrimitiveImpl(pool_, std::move(input)));
-  ColumnReader flat_column_reader(std::move(impl));
+  FileColumnIteratorFactory iterator_factory = [row_group_index](
+                                                   int i, ParquetFileReader* 
reader) {
+    return new SingleRowGroupIterator(i, row_group_index, reader);
+  };
+  RETURN_NOT_OK(
+      GetReaderForNode(column_index, node, indices, 1, iterator_factory, 
&reader_impl));
+  if (reader_impl == nullptr) {
+    *out = nullptr;
+    return Status::OK();
+  }
+  ColumnReader reader(std::move(reader_impl));
 
-  return flat_column_reader.NextBatch(records_to_read, out);
+  return reader.NextBatch(records_to_read, out);
 }
 
 Status FileReader::Impl::ReadRowGroup(int row_group_index,
@@ -494,16 +531,22 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
 
   auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
 
-  int num_columns = static_cast<int>(indices.size());
-  std::vector<std::shared_ptr<Column>> columns(num_columns);
+  // We only need to read schema fields which have columns indicated
+  // in the indices vector
+  std::vector<int> field_indices;
+  if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
+                                   &field_indices)) {
+    return Status::Invalid("Invalid column index");
+  }
+  int num_fields = static_cast<int>(field_indices.size());
+  std::vector<std::shared_ptr<Column>> columns(num_fields);
 
   // TODO(wesm): Refactor to share more code with ReadTable
 
-  auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, 
this](int i) {
-    int column_index = indices[i];
-
+  auto ReadColumnFunc = [&indices, &field_indices, &row_group_index, &schema, 
&columns,
+                         this](int i) {
     std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
+    RETURN_NOT_OK(ReadColumnChunk(field_indices[i], indices, row_group_index, 
&array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
@@ -511,7 +554,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
   if (use_threads_) {
     std::vector<std::future<Status>> futures;
     auto pool = ::arrow::internal::GetCpuThreadPool();
-    for (int i = 0; i < num_columns; i++) {
+    for (int i = 0; i < num_fields; i++) {
       futures.push_back(pool->Submit(ReadColumnFunc, i));
     }
     Status final_status = Status::OK();
@@ -523,7 +566,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
     }
     RETURN_NOT_OK(final_status);
   } else {
-    for (int i = 0; i < num_columns; i++) {
+    for (int i = 0; i < num_fields; i++) {
       RETURN_NOT_OK(ReadColumnFunc(i));
     }
   }
@@ -640,7 +683,10 @@ Status OpenFile(const 
std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
 }
 
 Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
-  return impl_->GetColumn(i, out);
+  FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* 
reader) {
+    return new AllRowGroupsIterator(i, reader);
+  };
+  return impl_->GetColumn(i, iterator_factory, out);
 }
 
 Status FileReader::GetSchema(const std::vector<int>& indices,
@@ -1708,7 +1754,7 @@ Status 
ColumnChunkReader::Read(std::shared_ptr<::arrow::ChunkedArray>* out) {
 
 Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
   std::shared_ptr<ChunkedArray> chunked_out;
-  RETURN_NOT_OK(impl_->ReadColumnChunk(column_index_, row_group_index_, 
&chunked_out));
+  RETURN_NOT_OK(Read(&chunked_out));
   return GetSingleChunk(*chunked_out, out);
 }
 
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 5156300..dc7fbef 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -377,6 +377,13 @@ def test_pandas_column_selection(tempdir):
 
     tm.assert_frame_equal(df[['uint8']], df_read)
 
+    # ARROW-4267: Selection of duplicate columns still leads to these columns
+    # being read uniquely.
+    table_read = _read_table(filename, columns=['uint8', 'uint8'])
+    df_read = table_read.to_pandas()
+
+    tm.assert_frame_equal(df[['uint8']], df_read)
+
 
 def _random_integers(size, dtype):
     # We do not generate integers outside the int64 range
@@ -1062,11 +1069,17 @@ def test_read_single_row_group_with_column_subset():
     buf.seek(0)
     pf = pq.ParquetFile(buf)
 
-    cols = df.columns[:2]
+    cols = list(df.columns[:2])
     row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
     result = pa.concat_tables(row_groups)
     tm.assert_frame_equal(df[cols], result.to_pandas())
 
+    # ARROW-4267: Selection of duplicate columns still leads to these columns
+    # being read uniquely.
+    row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
 
 def test_scan_contents():
     N, K = 10000, 4

Reply via email to