[ 
https://issues.apache.org/jira/browse/PARQUET-1200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362462#comment-16362462
 ] 

ASF GitHub Bot commented on PARQUET-1200:
-----------------------------------------

xhochy closed pull request #434: PARQUET-1200: Support reading a single Arrow 
column from a Parquet file
URL: https://github.com/apache/parquet-cpp/pull/434
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc 
b/src/parquet/arrow/arrow-reader-writer-test.cc
index db12fb45..369eb2e1 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1496,7 +1496,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
   std::shared_ptr<Table> r1, r2;
   // Read everything
   ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
-  ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));
+  ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
 
   std::shared_ptr<Table> concatenated;
   ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
diff --git a/src/parquet/arrow/arrow-schema-test.cc 
b/src/parquet/arrow/arrow-schema-test.cc
index b33eda14..771b9960 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
       auto lhs = result_schema_->field(i);
       auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
-                                    << " != " << rhs->ToString();
+      EXPECT_TRUE(lhs->Equals(rhs))
+          << i << " " << lhs->ToString() << " != " << rhs->ToString();
     }
   }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 93183051..7f81771b 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -170,6 +170,8 @@ class FileReader::Impl {
                           int16_t def_level,
                           std::unique_ptr<ColumnReader::ColumnReaderImpl>* 
out);
   Status ReadColumn(int i, std::shared_ptr<Array>* out);
+  Status ReadColumnChunk(int column_index, int row_group_index,
+                         std::shared_ptr<Array>* out);
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
                    std::shared_ptr<::arrow::Schema>* out);
@@ -391,6 +393,24 @@ Status FileReader::Impl::GetSchema(const std::vector<int>& 
indices,
   return FromParquetSchema(descr, indices, parquet_key_value_metadata, out);
 }
 
+Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
+                                         std::shared_ptr<Array>* out) {
+  auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
+  int64_t records_to_read = 
rg_metadata->ColumnChunk(column_index)->num_values();
+
+  std::unique_ptr<FileColumnIterator> input(
+      new SingleRowGroupIterator(column_index, row_group_index, 
reader_.get()));
+
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+      new PrimitiveImpl(pool_, std::move(input)));
+  ColumnReader flat_column_reader(std::move(impl));
+
+  std::shared_ptr<Array> array;
+  RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
+  *out = array;
+  return Status::OK();
+}
+
 Status FileReader::Impl::ReadRowGroup(int row_group_index,
                                       const std::vector<int>& indices,
                                       std::shared_ptr<::arrow::Table>* out) {
@@ -408,17 +428,9 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
   auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, 
&rg_metadata,
                          this](int i) {
     int column_index = indices[i];
-    int64_t records_to_read = 
rg_metadata->ColumnChunk(column_index)->num_values();
-
-    std::unique_ptr<FileColumnIterator> input(
-        new SingleRowGroupIterator(column_index, row_group_index, 
reader_.get()));
-
-    std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
-        new PrimitiveImpl(pool_, std::move(input)));
-    ColumnReader flat_column_reader(std::move(impl));
 
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
+    RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
@@ -561,6 +573,11 @@ Status FileReader::ReadRowGroup(int i, const 
std::vector<int>& indices,
   }
 }
 
+std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
+  return std::shared_ptr<RowGroupReader>(
+      new RowGroupReader(impl_.get(), row_group_index));
+}
+
 int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
 
 void FileReader::set_num_threads(int num_threads) { 
impl_->set_num_threads(num_threads); }
@@ -1354,5 +1371,34 @@ Status StructImpl::NextBatch(int64_t records_to_read, 
std::shared_ptr<Array>* ou
   return Status::OK();
 }
 
+std::shared_ptr<ColumnChunkReader> RowGroupReader::Column(int column_index) {
+  return std::shared_ptr<ColumnChunkReader>(
+      new ColumnChunkReader(impl_, row_group_index_, column_index));
+}
+
+Status RowGroupReader::ReadTable(const std::vector<int>& column_indices,
+                                 std::shared_ptr<::arrow::Table>* out) {
+  return impl_->ReadRowGroup(row_group_index_, column_indices, out);
+}
+
+Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) {
+  return impl_->ReadRowGroup(row_group_index_, out);
+}
+
+RowGroupReader::~RowGroupReader() {}
+
+RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index)
+    : impl_(impl), row_group_index_(row_group_index) {}
+
+Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
+  return impl_->ReadColumnChunk(column_index_, row_group_index_, out);
+}
+
+ColumnChunkReader::~ColumnChunkReader() {}
+
+ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int 
row_group_index,
+                                     int column_index)
+    : impl_(impl), column_index_(column_index), 
row_group_index_(row_group_index) {}
+
 }  // namespace arrow
 }  // namespace parquet
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index faaef9a1..95b21866 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -39,11 +39,27 @@ namespace parquet {
 
 namespace arrow {
 
+class ColumnChunkReader;
 class ColumnReader;
+class RowGroupReader;
 
 // Arrow read adapter class for deserializing Parquet files as Arrow row
 // batches.
 //
+// This interfaces caters for different use cases and thus provides different
+// interfaces. In its most simplistic form, we cater for a user that wants to
+// read the whole Parquet at once with the FileReader::ReadTable method.
+//
+// More advanced users that also want to implement parallelism on top of each
+// single Parquet files should do this on the RowGroup level. For this, they 
can
+// call FileReader::RowGroup(i)->ReadTable to receive only the specified
+// RowGroup as a table.
+//
+// In the most advanced situation, where a consumer wants to independently read
+// RowGroups in parallel and consume each column individually, they can call
+// FileReader::RowGroup(i)->Column(j)->Read and receive an arrow::Column
+// instance.
+//
 // TODO(wesm): nested data does not always make sense with this user
 // interface unless you are only reading a single leaf node from a branch of
 // a table. For example:
@@ -150,6 +166,10 @@ class PARQUET_EXPORT FileReader {
   ::arrow::Status ScanContents(std::vector<int> columns, const int32_t 
column_batch_size,
                                int64_t* num_rows);
 
+  /// \brief Return a reader for the RowGroup, this object must not outlive the
+  ///   FileReader.
+  std::shared_ptr<RowGroupReader> RowGroup(int row_group_index);
+
   int num_row_groups() const;
 
   const ParquetFileReader* parquet_reader() const;
@@ -161,10 +181,46 @@ class PARQUET_EXPORT FileReader {
   virtual ~FileReader();
 
  private:
+  friend ColumnChunkReader;
+  friend RowGroupReader;
+
   class PARQUET_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
 };
 
+class PARQUET_EXPORT RowGroupReader {
+ public:
+  std::shared_ptr<ColumnChunkReader> Column(int column_index);
+
+  ::arrow::Status ReadTable(const std::vector<int>& column_indices,
+                            std::shared_ptr<::arrow::Table>* out);
+  ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
+
+  virtual ~RowGroupReader();
+
+ private:
+  friend FileReader;
+  RowGroupReader(FileReader::Impl* reader, int row_group_index);
+
+  FileReader::Impl* impl_;
+  int row_group_index_;
+};
+
+class PARQUET_EXPORT ColumnChunkReader {
+ public:
+  ::arrow::Status Read(std::shared_ptr<::arrow::Array>* out);
+
+  virtual ~ColumnChunkReader();
+
+ private:
+  friend RowGroupReader;
+  ColumnChunkReader(FileReader::Impl* impl, int row_group_index, int 
column_index);
+
+  FileReader::Impl* impl_;
+  int column_index_;
+  int row_group_index_;
+};
+
 // At this point, the column reader is a stream iterator. It only knows how to
 // read the next batch of values for a particular column from the file until it
 // runs out.
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index 72c71c66..7b748120 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -64,9 +64,9 @@ RowGroupReader::RowGroupReader(std::unique_ptr<Contents> 
contents)
     : contents_(std::move(contents)) {}
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   const ColumnDescriptor* descr = metadata()->schema()->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -76,9 +76,9 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
 }
 
 std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   return contents_->GetColumnPageReader(i);
 }
 
@@ -302,9 +302,9 @@ std::shared_ptr<FileMetaData> ParquetFileReader::metadata() 
const {
 }
 
 std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
-  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
-                                           << metadata()->num_row_groups()
-                                           << "row groups, requested reader 
for: " << i;
+  DCHECK(i < metadata()->num_row_groups())
+      << "The file only has " << metadata()->num_row_groups()
+      << "row groups, requested reader for: " << i;
   return contents_->GetRowGroup(i);
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [C++] Support reading a single Arrow column from a Parquet file
> ---------------------------------------------------------------
>
>                 Key: PARQUET-1200
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1200
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-cpp
>            Reporter: Uwe L. Korn
>            Assignee: Uwe L. Korn
>            Priority: Major
>             Fix For: cpp-1.4.0
>
>
> A small convenience for consumers that read in columns with separate function 
> contexts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to