IMPALA-4640: Fix number of rows displayed by parquet-reader tool

The variable just never got updated in the code. This change also adds
verification that all columns contain the same number of rows.

Change-Id: I281a784a0aa2df4ed1852dfb864587a0c1aa4d9a
Reviewed-on: http://gerrit.cloudera.org:8080/5453
Reviewed-by: Tim Armstrong <[email protected]>
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c40958f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c40958f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c40958f3

Branch: refs/heads/hadoop-next
Commit: c40958f3ff2bbc27b17c9e6acb3c2a69442f6aeb
Parents: c2faf4a
Author: Lars Volker <[email protected]>
Authored: Fri Dec 9 16:13:07 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Fri Dec 16 08:07:07 2016 +0000

----------------------------------------------------------------------
 be/src/util/parquet-reader.cc | 29 +++++++++++++++++++----------
 1 file changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c40958f3/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e6a9084..d48ef16 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -148,8 +148,8 @@ class ParquetLevelReader : public impala::RleDecoder {
 //     def levels - with our RLE scheme it is not possible to determine how 
many values
 //     were actually written if the final run is a literal run, only if the 
final run is
 //     a repeated run (see util/rle-encoding.h for more details).
-void CheckDataPage(const ColumnChunk& col, const PageHeader& header,
-    const uint8_t* page) {
+// Returns the number of rows specified by the header.
+int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const 
uint8_t* page) {
   const uint8_t* data = page;
   std::vector<uint8_t> decompressed_buffer;
   if (col.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
@@ -191,6 +191,8 @@ void CheckDataPage(const ColumnChunk& col, const 
PageHeader& header,
       }
     }
   }
+
+  return header.data_page_header.num_values;
 }
 
 // Simple utility to read parquet files on local disk.  This utility validates 
the
@@ -248,12 +250,14 @@ int main(int argc, char** argv) {
   int total_page_header_size = 0;
   int total_compressed_data_size = 0;
   int total_uncompressed_data_size = 0;
-  vector<int> column_sizes;
+  vector<int> column_byte_sizes;
+  vector<int> column_num_rows;
 
   for (int i = 0; i < file_metadata.row_groups.size(); ++i) {
     cerr << "Reading row group " << i << endl;
     RowGroup& rg = file_metadata.row_groups[i];
-    column_sizes.resize(rg.columns.size());
+    column_byte_sizes.resize(rg.columns.size());
+    column_num_rows.resize(rg.columns.size());
 
     for (int c = 0; c < rg.columns.size(); ++c) {
       cerr << "  Reading column " << c << endl;
@@ -278,18 +282,23 @@ int main(int argc, char** argv) {
         }
 
         data += header_size;
-        if (header.__isset.data_page_header) CheckDataPage(col, header, data);
+        if (header.__isset.data_page_header) {
+          column_num_rows[c] += CheckDataPage(col, header, data);
+        }
 
         total_page_header_size += header_size;
-        column_sizes[c] += header.compressed_page_size;
+        column_byte_sizes[c] += header.compressed_page_size;
         total_compressed_data_size += header.compressed_page_size;
         total_uncompressed_data_size += header.uncompressed_page_size;
         data += header.compressed_page_size;
         ++pages_read;
       }
-      // Check that we ended exactly where we should have
+      // Check that we ended exactly where we should have.
       assert(data == col_end);
+      // Check that all cols have the same number of rows.
+      assert(column_num_rows[0] == column_num_rows[c]);
     }
+    num_rows += column_num_rows[0];
   }
   double compression_ratio =
       (double)total_uncompressed_data_size / total_compressed_data_size;
@@ -306,9 +315,9 @@ int main(int argc, char** argv) {
      << "(" << (total_compressed_data_size / (double)file_len) << ")" << endl;
   ss << "  Column uncompressed size: " << total_uncompressed_data_size
      << "(" << compression_ratio << ")" << endl;
-  for (int i = 0; i < column_sizes.size(); ++i) {
-    ss << "    " << "Col " << i << ": " << column_sizes[i]
-       << "(" << (column_sizes[i] / (double)file_len) << ")" << endl;
+  for (int i = 0; i < column_byte_sizes.size(); ++i) {
+    ss << "    " << "Col " << i << ": " << column_byte_sizes[i]
+       << "(" << (column_byte_sizes[i] / (double)file_len) << ")" << endl;
   }
   cerr << ss.str() << endl;
 

Reply via email to