Repository: parquet-cpp Updated Branches: refs/heads/master 56fbdb63b -> f9ff60797
PARQUET-836: Bugfix + testcase for column subsetting in arrow::FileReader::ReadFlatTable Also adds an option to set the random seed when generating data for the Arrow test cases Author: Wes McKinney <[email protected]> Closes #223 from wesm/PARQUET-836 and squashes the following commits: 673cc0d [Wes McKinney] Add randomness option to Arrow data generation. Test case for FileReader::ReadFlatTable with column subsetting Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/f9ff6079 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/f9ff6079 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/f9ff6079 Branch: refs/heads/master Commit: f9ff607972629346c39ca85862d958edc87b2e3a Parents: 56fbdb6 Author: Wes McKinney <[email protected]> Authored: Tue Jan 24 08:42:03 2017 +0100 Committer: Uwe L. Korn <[email protected]> Committed: Tue Jan 24 08:42:03 2017 +0100 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 106 +++++++++++++++------ src/parquet/arrow/reader.cc | 2 +- src/parquet/arrow/test-util.h | 29 ++++-- 3 files changed, 99 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/f9ff6079/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 6748a8d..d681e57 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -32,6 +32,7 @@ using arrow::Array; using arrow::Buffer; +using arrow::Column; using arrow::ChunkedArray; using arrow::default_memory_pool; using arrow::io::BufferReader; @@ -51,6 +52,8 @@ namespace arrow { const int SMALL_SIZE = 100; const int LARGE_SIZE = 10000; +constexpr uint32_t kDefaultSeed = 0; + template <typename TestType> struct test_traits {}; @@ -185,23 +188,6 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>; template <typename T> using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>; -void DoTableRoundtrip( - const std::shared_ptr<Table>& table, int num_threads, std::shared_ptr<Table>* out) { - auto sink = std::make_shared<InMemoryOutputStream>(); - - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); - - std::shared_ptr<Buffer> buffer = sink->GetBuffer(); - std::unique_ptr<FileReader> reader; - ASSERT_OK_NO_THROW( - OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); - - reader->set_num_threads(num_threads); - ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); -} - template <typename TestType> class TestParquetIO : public ::testing::Test { public: @@ -324,7 +310,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { // This also tests max_definition_level = 1 std::shared_ptr<Array> values; - ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values)); + + ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); this->WriteFlatColumn(schema, values); @@ -335,7 +322,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { // This also tests max_definition_level = 1 std::shared_ptr<Array> values; - ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values)); + + ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); this->sink_ = std::make_shared<InMemoryOutputStream>(); ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), @@ -407,7 +395,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { int64_t chunk_size = SMALL_SIZE / 4; std::shared_ptr<Array> values; - ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values)); + + ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); @@ -424,7 +413,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { // This also tests max_definition_level = 1 std::shared_ptr<Array> values; - ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, &values)); + + ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); this->sink_ = std::make_shared<InMemoryOutputStream>(); ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), @@ -490,7 +480,8 @@ using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { // This also tests max_definition_level = 1 std::shared_ptr<Array> values; - ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &values)); + + ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); // Parquet 2.0 roundtrip should yield an uint32_t column again @@ -507,7 +498,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { // This also tests max_definition_level = 1 std::shared_ptr<Array> arr; - ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &arr)); + ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr)); std::shared_ptr<::arrow::UInt32Array> values = std::dynamic_pointer_cast<::arrow::UInt32Array>(arr); @@ -660,18 +651,15 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -TEST(TestArrowReadWrite, MultithreadedRead) { - const int num_columns = 20; - const int num_rows = 1000; - const int num_threads = 4; - +void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) { std::shared_ptr<::arrow::Column> column; std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns); std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns); std::shared_ptr<Array> values; for (int i = 0; i < num_columns; ++i) { - ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values)); + ASSERT_OK(NullableArray<::arrow::DoubleType>( + num_rows, num_rows / 10, static_cast<uint32_t>(i), &values)); std::stringstream ss; ss << "col" << i; column = MakeColumn(ss.str(), values, true); @@ -680,13 +668,69 @@ TEST(TestArrowReadWrite, MultithreadedRead) { fields[i] = column->field(); } auto schema = std::make_shared<::arrow::Schema>(fields); - auto table = std::make_shared<Table>("schema", schema, columns); + *out = std::make_shared<Table>("schema", schema, columns); +} + +void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads, + const std::vector<int>& column_subset, std::shared_ptr<Table>* out) { + auto sink = std::make_shared<InMemoryOutputStream>(); + + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); + + std::shared_ptr<Buffer> buffer = sink->GetBuffer(); + std::unique_ptr<FileReader> reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + + if (column_subset.size() > 0) { + ASSERT_OK_NO_THROW(reader->ReadFlatTable(column_subset, out)); + } else { + // Read everything + ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); + } +} + +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr<Table> table; + MakeDoubleTable(num_columns, num_rows, &table); std::shared_ptr<Table> result; - DoTableRoundtrip(table, num_threads, &result); + DoTableRoundtrip(table, num_threads, {}, &result); ASSERT_TRUE(table->Equals(result)); } +TEST(TestArrowReadWrite, ReadColumnSubset) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr<Table> table; + MakeDoubleTable(num_columns, num_rows, &table); + + std::shared_ptr<Table> result; + std::vector<int> column_subset = {0, 4, 8, 10}; + DoTableRoundtrip(table, num_threads, column_subset, &result); + + std::vector<std::shared_ptr<::arrow::Column>> ex_columns; + std::vector<std::shared_ptr<::arrow::Field>> ex_fields; + for (int i : column_subset) { + ex_columns.push_back(table->column(i)); + ex_fields.push_back(table->column(i)->field()); + } + + auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields); + auto expected = std::make_shared<Table>("schema", ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(expected)); +} + } // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/f9ff6079/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 9221041..a60d0b2 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -222,7 +222,7 @@ Status FileReader::Impl::ReadFlatTable( auto ReadColumn = [&indices, &schema, &columns, this](int i) { std::shared_ptr<Array> array; RETURN_NOT_OK(ReadFlatColumn(indices[i], &array)); - columns[i] = std::make_shared<Column>(schema->field(indices[i]), array); + columns[i] = std::make_shared<Column>(schema->field(i), array); return Status::OK(); }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/f9ff6079/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index f996c2c..4760f0e 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -90,9 +90,10 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullAr // This helper function only supports (size/2) nulls. template <typename ArrowType> typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NullableArray( - size_t size, size_t num_nulls, std::shared_ptr<Array>* out) { + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) { std::vector<typename ArrowType::c_type> values; - ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values); + ::arrow::test::random_real<typename ArrowType::c_type>( + size, seed, -1e10, 1e10, &values); std::vector<uint8_t> valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -108,8 +109,11 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable // This helper function only supports (size/2) nulls. template <typename ArrowType> typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray( - size_t size, size_t num_nulls, std::shared_ptr<Array>* out) { + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) { std::vector<typename ArrowType::c_type> values; + + // Seed is random in Arrow right now + (void)seed; ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); std::vector<uint8_t> valid_bytes(size, 1); @@ -127,7 +131,8 @@ typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableAr template <typename ArrowType> typename std::enable_if< is_arrow_string<ArrowType>::value || is_arrow_binary<ArrowType>::value, Status>::type -NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* out) { +NullableArray( + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { std::vector<uint8_t> valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -136,8 +141,16 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* ou using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType; BuilderType builder(::arrow::default_memory_pool(), std::make_shared<ArrowType>()); + + const int kBufferSize = 10; + uint8_t buffer[kBufferSize]; for (size_t i = 0; i < size; i++) { - builder.Append("test-string"); + if (!valid_bytes[i]) { + builder.AppendNull(); + } else { + ::arrow::test::random_bytes(kBufferSize, seed + i, buffer); + builder.Append(buffer, kBufferSize); + } } return builder.Finish(out); } @@ -145,8 +158,12 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* ou // This helper function only supports (size/2) nulls yet. template <class ArrowType> typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray( - size_t size, size_t num_nulls, std::shared_ptr<Array>* out) { + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) { std::vector<uint8_t> values; + + // Seed is random in Arrow right now + (void)seed; + ::arrow::test::randint<uint8_t>(size, 0, 1, &values); std::vector<uint8_t> valid_bytes(size, 1);
