Repository: parquet-cpp Updated Branches: refs/heads/master 9a1fd892f -> 8fc24f861
PARQUET-418: Refactored parquet_reader utility for printing file contents. This pull request contains the following changes: * Modified parquet_reader utility: refactored, fixed memory leaks, merged compute_stats utility to get rid of code duplication. * Added a flag --only-stats to parquet_reader to print only file statistics. * Modified InMemoryInputStream to own its buffer. All the code repetition still remaining in parquet_reader clearly highlights the need for specialized ColumnReader classes. I will create a new JIRA for this improvement. Author: Aliaksei Sandryhaila <[email protected]> Closes #18 from asandryh/PARQUET-418 and squashes the following commits: a378a1e [Aliaksei Sandryhaila] Changed the buffer in ScopedInMemoryInputStream to std::vector. 7f6f533 [Aliaksei Sandryhaila] [PARQUET-418]: Added/modified a utility for printing a file contents. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8fc24f86 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8fc24f86 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8fc24f86 Branch: refs/heads/master Commit: 8fc24f861a9768b3be9d9c2eb0eeefb114164008 Parents: 9a1fd89 Author: Aliaksei Sandryhaila <[email protected]> Authored: Tue Jan 26 16:53:17 2016 -0800 Committer: Nong Li <[email protected]> Committed: Tue Jan 26 16:53:17 2016 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- example/CMakeLists.txt | 3 - example/compute_stats.cc | 222 ---------------------- example/example_util.h | 23 ++- example/parquet_reader.cc | 419 +++++++++++++++++++---------------------- src/parquet.cc | 28 ++- src/parquet/parquet.h | 14 ++ 7 files changed, 259 insertions(+), 452 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 2554e6c..a2f7e6a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -133,7 +133,7 @@ message(STATUS "Boost libraries: " ${Boost_LIBRARIES}) # find thrift headers and libs find_package(Thrift REQUIRED) -include_directories(SYSTEM ${THRIFT_INCLUDE_DIR}) +include_directories(SYSTEM ${THRIFT_INCLUDE_DIR} ${THRIFT_INCLUDE_DIR}/thrift) set(LIBS ${LIBS} ${THRIFT_LIBS}) message(STATUS "Thrift include dir: ${THRIFT_INCLUDE_DIR}") message(STATUS "Thrift contrib dir: ${THRIFT_CONTRIB_DIR}") http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index a9f4fa3..a020184 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -23,9 +23,6 @@ SET(LINK_LIBS thriftstatic Example) -add_executable(compute_stats compute_stats.cc) -target_link_libraries(compute_stats ${LINK_LIBS}) - add_executable(decode_benchmark decode_benchmark.cc) target_link_libraries(decode_benchmark ${LINK_LIBS}) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/compute_stats.cc ---------------------------------------------------------------------- diff --git a/example/compute_stats.cc b/example/compute_stats.cc deleted file mode 100644 index 82a574f..0000000 --- a/example/compute_stats.cc +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include <parquet/parquet.h> -#include <iostream> -#include <stdio.h> - -#include "example_util.h" - -using namespace parquet; -using namespace parquet_cpp; -using namespace std; - -struct AnyType { - union { - bool bool_val; - int32_t int32_val; - int64_t int64_val; - float float_val; - double double_val; - ByteArray byte_array_val; - }; -}; - -static string ByteArrayToString(const ByteArray& a) { - return string(reinterpret_cast<const char*>(a.ptr), a.len); -} - -int ByteCompare(const ByteArray& x1, const ByteArray& x2) { - int len = ::min(x1.len, x2.len); - int cmp = memcmp(x1.ptr, x2.ptr, len); - if (cmp != 0) return cmp; - if (len < x1.len) return 1; - if (len < x2.len) return -1; - return 0; -} - -// Simple example which reads all the values in the file and outputs the number of -// values, number of nulls and min/max for each column. -int main(int argc, char** argv) { - int col_idx = -1; - if (argc < 2) { - cerr << "Usage: compute_stats <file> [col_idx]" << endl; - return -1; - } - if (argc == 3) col_idx = atoi(argv[2]); - FileMetaData metadata; - if (!GetFileMetadata(argv[1], &metadata)) return -1; - - FILE* file = fopen(argv[1], "r"); - if (file == NULL) { - cerr << "Could not open file: " << argv[1] << endl; - return -1; - } - - for (int i = 0; i < metadata.row_groups.size(); ++i) { - const RowGroup& row_group = metadata.row_groups[i]; - for (int c = 0; c < row_group.columns.size(); ++c) { - if (col_idx != -1 && col_idx != c) continue; - const ColumnChunk& col = row_group.columns[c]; - cout << "Reading column " << metadata.schema[c + 1].name << " (idx=" << c << ")\n"; - if (col.meta_data.type == Type::INT96) { - cout << " Skipping unsupported column" << endl; - continue; - } - - size_t col_start = col.meta_data.data_page_offset; - if (col.meta_data.__isset.dictionary_page_offset) { - if (col_start > col.meta_data.dictionary_page_offset) { - col_start = col.meta_data.dictionary_page_offset; - } - } - fseek(file, col_start, SEEK_SET); - vector<uint8_t> column_buffer; - column_buffer.resize(col.meta_data.total_compressed_size); - size_t num_read = fread(&column_buffer[0], 1, column_buffer.size(), file); - if (num_read != column_buffer.size()) { - cerr << "Could not read column data." << endl; - continue; - } - - InMemoryInputStream input(&column_buffer[0], column_buffer.size()); - ColumnReader reader(&col.meta_data, &metadata.schema[c + 1], &input); - - bool first_val = true; - AnyType min, max; - int num_values = 0; - int num_nulls = 0; - - int def_level, rep_level; - while (reader.HasNext()) { - switch (col.meta_data.type) { - case Type::BOOLEAN: { - bool val = reader.GetBool(&def_level, &rep_level); - if (def_level < rep_level) break; - if (first_val) { - min.bool_val = max.bool_val = val; - first_val = false; - } else { - min.bool_val = ::min(val, min.bool_val); - max.bool_val = ::max(val, max.bool_val); - } - break; - } - case Type::INT32: { - int32_t val = reader.GetInt32(&def_level, &rep_level);; - if (def_level < rep_level) break; - if (first_val) { - min.int32_val = max.int32_val = val; - first_val = false; - } else { - min.int32_val = ::min(val, min.int32_val); - max.int32_val = ::max(val, max.int32_val); - } - break; - } - case Type::INT64: { - int64_t val = reader.GetInt64(&def_level, &rep_level);; - if (def_level < rep_level) break; - if (first_val) { - min.int64_val = max.int64_val = val; - first_val = false; - } else { - min.int64_val = ::min(val, min.int64_val); - max.int64_val = ::max(val, max.int64_val); - } - break; - } - case Type::FLOAT: { - float val = reader.GetFloat(&def_level, &rep_level);; - if (def_level < rep_level) break; - if (first_val) { - min.float_val = max.float_val = val; - first_val = false; - } else { - min.float_val = ::min(val, min.float_val); - max.float_val = ::max(val, max.float_val); - } - break; - } - case Type::DOUBLE: { - double val = reader.GetDouble(&def_level, &rep_level);; - if (def_level < rep_level) break; - if (first_val) { - min.double_val = max.double_val = val; - first_val = false; - } else { - min.double_val = ::min(val, min.double_val); - max.double_val = ::max(val, max.double_val); - } - break; - } - case Type::BYTE_ARRAY: { - ByteArray val = reader.GetByteArray(&def_level, &rep_level);; - if (def_level < rep_level) break; - if (first_val) { - min.byte_array_val = max.byte_array_val = val; - first_val = false; - } else { - if (ByteCompare(val, min.byte_array_val) < 0) { - min.byte_array_val = val; - } - if (ByteCompare(val, max.byte_array_val) > 0) { - max.byte_array_val = val; - } - } - break; - } - default: - continue; - } - - if (def_level < rep_level) ++num_nulls; - ++num_values; - } - - cout << " Num Values: " << num_values << endl; - cout << " Num Nulls: " << num_nulls << endl; - switch (col.meta_data.type) { - case Type::BOOLEAN: - cout << " Min: " << min.bool_val << endl; - cout << " Max: " << max.bool_val << endl; - break; - case Type::INT32: - cout << " Min: " << min.int32_val << endl; - cout << " Max: " << max.int32_val << endl; - break; - case Type::INT64: - cout << " Min: " << min.int64_val << endl; - cout << " Max: " << max.int64_val << endl; - break; - case Type::FLOAT: - cout << " Min: " << min.float_val << endl; - cout << " Max: " << max.float_val << endl; - break; - case Type::DOUBLE: - cout << " Min: " << min.double_val << endl; - cout << " Max: " << max.double_val << endl; - break; - case Type::BYTE_ARRAY: - cout << " Min: " << ByteArrayToString(min.byte_array_val) << endl; - cout << " Max: " << ByteArrayToString(max.byte_array_val) << endl; - break; - default: - continue; - } - } - } - fclose(file); - return 0; -} http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/example_util.h ---------------------------------------------------------------------- diff --git a/example/example_util.h b/example/example_util.h index 4c523e7..a8b58fc 100644 --- a/example/example_util.h +++ b/example/example_util.h @@ -17,7 +17,28 @@ #include <string> #include <parquet/parquet.h> +#include <stdio.h> bool GetFileMetadata(const std::string& path, parquet::FileMetaData* metadata); -#endif +class InputFile { +private: + FILE* file; + std::string filename; + +public: + InputFile(const std::string& _filename): filename(_filename) { + file = fopen(_filename.c_str(), "r"); + } + ~InputFile() { + if (file != NULL) { + fclose(file); + } + } + + FILE* getFileHandle() { return file; } + bool isOpen() { return file != NULL; } + std::string getFilename() { return filename; } +}; + +#endif // PARQUET_EXAMPLE_UTIL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/parquet_reader.cc ---------------------------------------------------------------------- diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc index 55895ce..5379c5e 100644 --- a/example/parquet_reader.cc +++ b/example/parquet_reader.cc @@ -13,14 +13,12 @@ // limitations under the License. #include <parquet/parquet.h> -#include <iostream> -#include <stdio.h> - #include "example_util.h" +#include <iostream> + // the fixed initial size is just for an example -#define INIT_SIZE 100 -#define COL_WIDTH "17" +#define COL_WIDTH "17" using namespace parquet; using namespace parquet_cpp; @@ -41,254 +39,229 @@ static string ByteArrayToString(const ByteArray& a) { return string(reinterpret_cast<const char*>(a.ptr), a.len); } -void* read_parquet(char* filename); - -// Simple example which prints out the content of the Parquet file -int main(int argc, char** argv) { - - if (argc < 2) { - cerr << "Usage: parquet_reader <file>" << endl; - return -1; - } - - void *column_ptr = read_parquet(argv[1]); - - // an example to use the returned column_ptr - // printf("%-"COL_WIDTH"d\n",((int32_t *)(((int32_t **)column_ptr)[0]))[0]); - +int ByteCompare(const ByteArray& x1, const ByteArray& x2) { + int len = ::min(x1.len, x2.len); + int cmp = memcmp(x1.ptr, x2.ptr, len); + if (cmp != 0) return cmp; + if (len < x1.len) return 1; + if (len < x2.len) return -1; return 0; } +string type2String(Type::type t) { + switch(t) { + case Type::BOOLEAN: + return "BOOLEAN"; + break; + case Type::INT32: + return "INT32"; + break; + case Type::INT64: + return "INT64"; + break; + case Type::FLOAT: + return "FLOAT"; + break; + case Type::DOUBLE: + return "DOUBLE"; + break; + case Type::BYTE_ARRAY: + return "BYTE_ARRAY"; + break; + case Type::INT96: + return "INT96"; + break; + case Type::FIXED_LEN_BYTE_ARRAY: + return "FIXED_LEN_BYTE_ARRAY"; + break; + default: + return "UNKNOWN"; + break; + } +} -void* read_parquet(char* filename) { - - unsigned int total_row_number = 0; +void readParquet(const string& filename, const bool printValues) { + InputFile file(filename); + if (!file.isOpen()) { + cerr << "Could not open file " << file.getFilename() << endl; + return; + } FileMetaData metadata; - if (!GetFileMetadata(filename, &metadata)) return NULL; + if (!GetFileMetadata(file.getFilename().c_str(), &metadata)) { + cerr << "Could not read metadata from file " << file.getFilename() << endl; + return; + } - FILE* file = fopen(filename, "r"); - if (file == NULL) { - cerr << "Could not open file: " << filename << endl; - return NULL; + cout << "File statistics:\n" ; + cout << "Total rows: " << metadata.num_rows << "\n"; + for (int c = 1; c < metadata.schema.size(); ++c) { + cout << "Column " << c-1 << ": " << metadata.schema[c].name << " (" + << type2String(metadata.schema[c].type); + if (metadata.schema[c].type == Type::INT96 || + metadata.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) { + cout << " - not supported"; + } + cout << ")\n"; } for (int i = 0; i < metadata.row_groups.size(); ++i) { - const RowGroup& row_group = metadata.row_groups[i]; + cout << "--- Row Group " << i << " ---\n"; - Type::type* type_array = (Type::type*)malloc( - row_group.columns.size() * sizeof(Type::type)); - assert(type_array); + // Print column metadata + const RowGroup& row_group = metadata.row_groups[i]; + size_t nColumns = row_group.columns.size(); + + for (int c = 0; c < nColumns; ++c) { + const ColumnMetaData& meta_data = row_group.columns[c].meta_data; + cout << "Column " << c + << ": " << meta_data.num_values << " rows, " + << meta_data.statistics.null_count << " null values, " + << meta_data.statistics.distinct_count << " distinct values, " + << "min value: " << (meta_data.statistics.min.length()>0 ? + meta_data.statistics.min : "N/A") + << ", max value: " << (meta_data.statistics.max.length()>0 ? + meta_data.statistics.max : "N/A") << ".\n"; + } - void* column_ptr = (void*)malloc(row_group.columns.size() * sizeof(void*)); - assert(column_ptr); + if (!printValues) { + continue; + } - for (int c = 0; c < row_group.columns.size(); ++c) { + // Create readers for all columns and print contents + vector<ColumnReader*> readers(nColumns, NULL); + try { + for (int c = 0; c < nColumns; ++c) { + const ColumnChunk& col = row_group.columns[c]; + printf("%-" COL_WIDTH"s", metadata.schema[c+1].name.c_str()); - const ColumnChunk& col = row_group.columns[c]; - if (col.meta_data.type == Type::INT96 || - col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) { - cout << " Skipping unsupported column" << endl; - continue; - } - - size_t col_start = col.meta_data.data_page_offset; - if (col.meta_data.__isset.dictionary_page_offset) { - if (col_start > col.meta_data.dictionary_page_offset) { - col_start = col.meta_data.dictionary_page_offset; + if (col.meta_data.type == Type::INT96 || + col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) { + continue; } - } - fseek(file, col_start, SEEK_SET); - vector<uint8_t> column_buffer; - column_buffer.resize(col.meta_data.total_compressed_size); - size_t num_read = fread(&column_buffer[0], 1, column_buffer.size(), file); - if (num_read != column_buffer.size()) { - cerr << "Could not read column data." << endl; - continue; - } - - InMemoryInputStream input(&column_buffer[0], column_buffer.size()); - ColumnReader reader(&col.meta_data, &metadata.schema[c + 1], &input); - AnyType min, max; - int num_values = 0; - int num_nulls = 0; - - switch (col.meta_data.type) { - case Type::BOOLEAN: { - ((bool**)column_ptr)[c] = (bool*)malloc(sizeof(bool) * INIT_SIZE); - type_array[c] = Type::BOOLEAN; - break; - } - case Type::INT32: { - ((int32_t**)column_ptr)[c] = (int32_t*)malloc(sizeof(int32_t) * INIT_SIZE); - type_array[c] = Type::INT32; - break; - } - case Type::INT64: { - ((int64_t**)column_ptr)[c] = (int64_t*)malloc(sizeof(int64_t) * INIT_SIZE); - type_array[c] = Type::INT64; - break; + size_t col_start = col.meta_data.data_page_offset; + if (col.meta_data.__isset.dictionary_page_offset && + col_start > col.meta_data.dictionary_page_offset) { + col_start = col.meta_data.dictionary_page_offset; } - case Type::FLOAT: { - ((float**)column_ptr)[c] = (float*)malloc(sizeof(float) * INIT_SIZE); - type_array[c] = Type::FLOAT; - break; - } - case Type::DOUBLE: { - ((double**)column_ptr)[c] = (double*)malloc(sizeof(double) * INIT_SIZE); - type_array[c] = Type::DOUBLE; - break; - } - case Type::BYTE_ARRAY: { - ((ByteArray**)column_ptr)[c] = - (ByteArray*)malloc(sizeof(ByteArray) * INIT_SIZE); - type_array[c] = Type::BYTE_ARRAY; - break; - } - case Type::FIXED_LEN_BYTE_ARRAY: - case Type::INT96: - assert(false); - break; + std::unique_ptr<ScopedInMemoryInputStream> input( + new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); + fseek(file.getFileHandle(), col_start, SEEK_SET); + size_t num_read = fread(input->data(), + 1, + input->size(), + file.getFileHandle()); + if (num_read != input->size()) { + cerr << "Could not read column data." << endl; + continue; + } + + readers[c] = new ColumnReader(&col.meta_data, + &metadata.schema[c+1], + input.release()); } + cout << "\n"; - int def_level = 0, rep_level = 0; - while (reader.HasNext()) { - switch (col.meta_data.type) { - case Type::BOOLEAN: { - bool val = reader.GetBool(&def_level, &rep_level); - if (def_level < rep_level) break; - ((bool*)(((bool**)column_ptr)[c]))[num_values] = val; - break; - } - case Type::INT32: { - int32_t val = reader.GetInt32(&def_level, &rep_level);; - if (def_level < rep_level) break; - ((int32_t*)(((int32_t**)column_ptr)[c]))[num_values] = val; - break; - } - case Type::INT64: { - int64_t val = reader.GetInt64(&def_level, &rep_level);; - if (def_level < rep_level) break; - ((int64_t *)(((int64_t**)column_ptr)[c]))[num_values] = val; - break; - } - case Type::FLOAT: { - float val = reader.GetFloat(&def_level, &rep_level);; - if (def_level < rep_level) break; - ((float*)(((float**)column_ptr)[c]))[num_values] = val; - break; - } - case Type::DOUBLE: { - double val = reader.GetDouble(&def_level, &rep_level);; - if (def_level < rep_level) break; - ((double*)(((double**)column_ptr)[c]))[num_values] = val; - break; - } - case Type::BYTE_ARRAY: { - ByteArray val = reader.GetByteArray(&def_level, &rep_level);; - if (def_level < rep_level) break; - ((ByteArray*)(((ByteArray**)column_ptr)[c]))[num_values] = val; - break; - } + vector<int> def_level(nColumns, 0); + vector<int> rep_level(nColumns, 0); - default: + bool hasRow; + do { + hasRow = false; + for (int c = 0; c < nColumns; ++c) { + if (readers[c] == NULL) { + printf("%-" COL_WIDTH"s", " "); continue; + } + const ColumnChunk& col = row_group.columns[c]; + if (readers[c]->HasNext()) { + hasRow = true; + switch (col.meta_data.type) { + case Type::BOOLEAN: { + bool val = readers[c]->GetBool(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + printf("%-" COL_WIDTH"d",val); + } + break; + } + case Type::INT32: { + int32_t val = readers[c]->GetInt32(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + printf("%-" COL_WIDTH"d",val); + } + break; + } + case Type::INT64: { + int64_t val = readers[c]->GetInt64(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + printf("%-" COL_WIDTH"ld",val); + } + break; + } + case Type::FLOAT: { + float val = readers[c]->GetFloat(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + printf("%-" COL_WIDTH"f",val); + } + break; + } + case Type::DOUBLE: { + double val = readers[c]->GetDouble(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + printf("%-" COL_WIDTH"lf",val); + } + break; + } + case Type::BYTE_ARRAY: { + ByteArray val = readers[c]->GetByteArray(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + string result = ByteArrayToString(val); + printf("%-" COL_WIDTH"s", result.c_str()); + } + break; + } + default: + continue; + } + } } - - if (def_level < rep_level) ++num_nulls; - ++num_values; - } - - total_row_number = num_values; + cout << "\n"; + } while (hasRow); + } catch (exception& e) { + cout << "Caught an exception: " << e.what() << "\n"; + } catch (...) { + cout << "Caught an exception.\n"; } - // prints out the table - cout << "=========================================================================\n"; - - // j is the row, k is the column - int k = 0, j = 0; - - // prints column name - for (j = 0; j < row_group.columns.size(); ++j) { - char *str = (char*)malloc(50); - assert(str); - strcpy(str, metadata.schema[j+1].name.c_str()); - printf("%-" COL_WIDTH"s", str); - free(str); + for(vector<ColumnReader*>::iterator it = readers.begin(); it != readers.end(); it++) { + delete *it; } + } +} - cout << "\n"; - - - for (j = 0;j < row_group.columns.size(); ++j) - switch(type_array[j]) { - case Type::BOOLEAN: - printf("%-" COL_WIDTH"s","BOOLEAN"); - break; - case Type::INT32: - printf("%-" COL_WIDTH"s","INT32"); - break; - case Type::INT64: - printf("%-" COL_WIDTH"s","INT64"); - break; - case Type::FLOAT: - printf("%-" COL_WIDTH"s","FLOAT"); - break; - case Type::DOUBLE: - printf("%-" COL_WIDTH"s","DOUBLE"); - break; - case Type::BYTE_ARRAY: - printf("%-" COL_WIDTH"s","BYTE_ARRAY"); - break; - default: - continue; - } - - cout << "\n"; +int main(int argc, char** argv) { + if (argc > 3) { + cerr << "Usage: parquet_reader [--only-stats] <file>" << endl; + return -1; + } - static string result; - char* str1; + string filename; + bool printContents = true; - for (k = 0; k < total_row_number; ++k) { - for (j = 0; j < row_group.columns.size(); ++j) { - switch(type_array[j]) { - case Type::BOOLEAN: - printf("%-" COL_WIDTH"d",((bool*)(((bool**)column_ptr)[j]))[k]); - break; - case Type::INT32: - printf("%-" COL_WIDTH"d",((int32_t *)(((int32_t **)column_ptr)[j]))[k]); - break; - case Type::INT64: - printf("%-" COL_WIDTH"ld",((int64_t *)(((int64_t **)column_ptr)[j]))[k]); - break; - case Type::FLOAT: - printf("%-" COL_WIDTH"f",((float*)(((float**)column_ptr)[j]))[k]); - break; - case Type::DOUBLE: - printf("%-" COL_WIDTH"lf",((double*)(((double**)column_ptr)[j]))[k]); - break; - case Type::BYTE_ARRAY: - result = ByteArrayToString( ((ByteArray*)(((ByteArray**)column_ptr)[j]))[k] ); - str1 = (char*)malloc(result.size()); - assert(str1); - strcpy(str1, result.c_str()); - printf("%-" COL_WIDTH"s", str1); - free(str1); - break; - default: - continue; - } - } - cout << "\n"; - - // print ends + // Read command-line options + char *param, *value; + for (int i = 1; i < argc; i++) { + if ( (param = std::strstr(argv[i], "--only-stats")) ) { + printContents = false; + } else { + filename = argv[i]; } - - return column_ptr; } - fclose(file); - return NULL; + readParquet(filename, printContents); + + return 0; } + http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/src/parquet.cc ---------------------------------------------------------------------- diff --git a/src/parquet.cc b/src/parquet.cc index 5a0f8f4..6b6adaa 100644 --- a/src/parquet.cc +++ b/src/parquet.cc @@ -36,8 +36,7 @@ using parquet::SchemaElement; using parquet::Type; InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : - buffer_(buffer), len_(len), offset_(0) { -} + buffer_(buffer), len_(len), offset_(0) {} const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); @@ -50,7 +49,32 @@ const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { return result; } +ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { + buffer_.resize(len); + stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); +} + +uint8_t* ScopedInMemoryInputStream::data() { + return buffer_.data(); +} + +int64_t ScopedInMemoryInputStream::size() { + return buffer_.size(); +} + +const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek, + int* num_bytes) { + return stream_->Peek(num_to_peek, num_bytes); +} + +const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read, + int* num_bytes) { + return stream_->Read(num_to_read, num_bytes); +} + + ColumnReader::~ColumnReader() { + delete stream_; } ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/src/parquet/parquet.h ---------------------------------------------------------------------- diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h index a1af6b7..4469a82 100644 --- a/src/parquet/parquet.h +++ b/src/parquet/parquet.h @@ -88,6 +88,20 @@ class InMemoryInputStream : public InputStream { int64_t offset_; }; +// A wrapper for InMemoryInputStream to manage the memory. +class ScopedInMemoryInputStream : public InputStream { + public: + ScopedInMemoryInputStream(int64_t len); + uint8_t* data(); + int64_t size(); + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); + virtual const uint8_t* Read(int num_to_read, int* num_bytes); + + private: + std::vector<uint8_t> buffer_; + std::unique_ptr<InMemoryInputStream> stream_; +}; + // API to read values from a single column. This is the main client facing API. class ColumnReader { public:
