IMPALA-4640: Fix number of rows displayed by parquet-reader tool The variable just never got updated in the code. This change also adds verification that all columns contain the same number of rows.
Change-Id: I281a784a0aa2df4ed1852dfb864587a0c1aa4d9a Reviewed-on: http://gerrit.cloudera.org:8080/5453 Reviewed-by: Tim Armstrong <[email protected]> Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c40958f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c40958f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c40958f3 Branch: refs/heads/hadoop-next Commit: c40958f3ff2bbc27b17c9e6acb3c2a69442f6aeb Parents: c2faf4a Author: Lars Volker <[email protected]> Authored: Fri Dec 9 16:13:07 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Fri Dec 16 08:07:07 2016 +0000 ---------------------------------------------------------------------- be/src/util/parquet-reader.cc | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c40958f3/be/src/util/parquet-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index e6a9084..d48ef16 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -148,8 +148,8 @@ class ParquetLevelReader : public impala::RleDecoder { // def levels - with our RLE scheme it is not possible to determine how many values // were actually written if the final run is a literal run, only if the final run is // a repeated run (see util/rle-encoding.h for more details). -void CheckDataPage(const ColumnChunk& col, const PageHeader& header, - const uint8_t* page) { +// Returns the number of rows specified by the header. +int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) { const uint8_t* data = page; std::vector<uint8_t> decompressed_buffer; if (col.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) { @@ -191,6 +191,8 @@ void CheckDataPage(const ColumnChunk& col, const PageHeader& header, } } } + + return header.data_page_header.num_values; } // Simple utility to read parquet files on local disk. This utility validates the @@ -248,12 +250,14 @@ int main(int argc, char** argv) { int total_page_header_size = 0; int total_compressed_data_size = 0; int total_uncompressed_data_size = 0; - vector<int> column_sizes; + vector<int> column_byte_sizes; + vector<int> column_num_rows; for (int i = 0; i < file_metadata.row_groups.size(); ++i) { cerr << "Reading row group " << i << endl; RowGroup& rg = file_metadata.row_groups[i]; - column_sizes.resize(rg.columns.size()); + column_byte_sizes.resize(rg.columns.size()); + column_num_rows.resize(rg.columns.size()); for (int c = 0; c < rg.columns.size(); ++c) { cerr << " Reading column " << c << endl; @@ -278,18 +282,23 @@ int main(int argc, char** argv) { } data += header_size; - if (header.__isset.data_page_header) CheckDataPage(col, header, data); + if (header.__isset.data_page_header) { + column_num_rows[c] += CheckDataPage(col, header, data); + } total_page_header_size += header_size; - column_sizes[c] += header.compressed_page_size; + column_byte_sizes[c] += header.compressed_page_size; total_compressed_data_size += header.compressed_page_size; total_uncompressed_data_size += header.uncompressed_page_size; data += header.compressed_page_size; ++pages_read; } - // Check that we ended exactly where we should have + // Check that we ended exactly where we should have. assert(data == col_end); + // Check that all cols have the same number of rows. + assert(column_num_rows[0] == column_num_rows[c]); } + num_rows += column_num_rows[0]; } double compression_ratio = (double)total_uncompressed_data_size / total_compressed_data_size; @@ -306,9 +315,9 @@ int main(int argc, char** argv) { << "(" << (total_compressed_data_size / (double)file_len) << ")" << endl; ss << " Column uncompressed size: " << total_uncompressed_data_size << "(" << compression_ratio << ")" << endl; - for (int i = 0; i < column_sizes.size(); ++i) { - ss << " " << "Col " << i << ": " << column_sizes[i] - << "(" << (column_sizes[i] / (double)file_len) << ")" << endl; + for (int i = 0; i < column_byte_sizes.size(); ++i) { + ss << " " << "Col " << i << ": " << column_byte_sizes[i] + << "(" << (column_byte_sizes[i] / (double)file_len) << ")" << endl; } cerr << ss.str() << endl;
