Repository: parquet-cpp Updated Branches: refs/heads/master 7a7234276 -> b2c734b89
PARQUET-867: Support writing sliced Arrow arrays Author: Korn, Uwe <[email protected]> Closes #238 from xhochy/PARQUET-867 and squashes the following commits: e9f79d3 [Korn, Uwe] Remove alt-space from code 5fe849d [Korn, Uwe] Address review comments fa7a1e0 [Korn, Uwe] Use Slice instead of offset,length d68e9db [Korn, Uwe] Update Arrow hash 61e3ac0 [Korn, Uwe] Use references instead of pointers d3c4ec6 [Korn, Uwe] Remove offset,length from public interface ec0577f [Korn, Uwe] PARQUET-867: Support writing sliced Arrow arrays Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b2c734b8 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b2c734b8 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b2c734b8 Branch: refs/heads/master Commit: b2c734b89e8e766d84c062345552e99b9b2eec04 Parents: 7a72342 Author: Korn, Uwe <[email protected]> Authored: Thu Feb 9 15:25:36 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Thu Feb 9 15:25:36 2017 -0500 ---------------------------------------------------------------------- cmake_modules/ThirdpartyToolchain.cmake | 2 +- .../arrow/arrow-reader-writer-benchmark.cc | 4 +- src/parquet/arrow/arrow-reader-writer-test.cc | 75 ++++++--- src/parquet/arrow/writer.cc | 162 +++++++++---------- src/parquet/arrow/writer.h | 7 +- src/parquet/column/column-writer-test.cc | 3 +- src/parquet/column/scanner-test.cc | 3 +- src/parquet/encoding-internal.h | 4 +- src/parquet/util/bit-util.h | 6 +- 9 files changed, 150 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index edccc3a..526e8c6 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1") # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd") -set(ARROW_VERSION "5439b71586f4b0f9a36544b9e2417ee6ad7b48e8") +set(ARROW_VERSION "0bdfd5efb2d7360f8ec8f6a65401d4c76a8df597") # find boost headers and libs set(Boost_DEBUG TRUE) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/arrow-reader-writer-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc index 3f2a688..c4a4777 100644 --- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc +++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc @@ -109,7 +109,7 @@ static void BM_WriteColumn(::benchmark::State& state) { while (state.KeepRunning()) { auto output = std::make_shared<InMemoryOutputStream>(); - WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); + WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); } SetBytesProcessed<nullable, ParquetType>(state); } @@ -128,7 +128,7 @@ static void BM_ReadColumn(::benchmark::State& state) { std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128); std::shared_ptr<::arrow::Table> table = TableFromVector<nullable, ParquetType>(values); auto output = std::make_shared<InMemoryOutputStream>(); - WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); + WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE); std::shared_ptr<Buffer> buffer = output->GetBuffer(); while (state.KeepRunning()) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 63953ca..2dfdbd2 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -258,10 +258,12 @@ class TestParquetIO : public ::testing::Test { std::shared_ptr<Array> values; ASSERT_OK(NullableArray<TestType>( size * size, nullable_elements ? null_count : 0, kDefaultSeed, &values)); + // Also test that slice offsets are respected + values = values->Slice(5, values->length() - 5); std::shared_ptr<ListArray> lists; ASSERT_OK(MakeListArary( values, size, nullable_lists ? null_count : 0, nullable_elements, &lists)); - *out = MakeSimpleTable(lists, nullable_lists); + *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists); } void PrepareListOfListTable(int64_t size, bool nullable_parent_lists, @@ -282,8 +284,8 @@ class TestParquetIO : public ::testing::Test { void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) { std::shared_ptr<Array> values = table->column(0)->data()->chunk(0); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), - this->sink_, values->length(), default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); } @@ -293,7 +295,7 @@ class TestParquetIO : public ::testing::Test { const std::shared_ptr<ArrayType>& values) { FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema)); ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); - ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values)); ASSERT_OK_NO_THROW(writer.Close()); } @@ -308,7 +310,8 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType, ::arrow::BinaryType> TestTypes; + ::arrow::StringType, ::arrow::BinaryType> + TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -327,7 +330,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, values->length(), default_writer_properties())); std::shared_ptr<Table> out; @@ -354,6 +357,36 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { this->ReadAndCheckSingleColumnFile(values.get()); } +TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { + std::shared_ptr<Array> values; + ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values)); + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); + + std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); + this->WriteColumn(schema, sliced_values); + this->ReadAndCheckSingleColumnFile(sliced_values.get()); + + // Slice offset 1 higher + sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); + this->WriteColumn(schema, sliced_values); + this->ReadAndCheckSingleColumnFile(sliced_values.get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { + std::shared_ptr<Array> values; + ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values)); + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); + + std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); + this->WriteColumn(schema, sliced_values); + this->ReadAndCheckSingleColumnFile(sliced_values.get()); + + // Slice offset 1 higher, thus different null bitmap. + sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); + this->WriteColumn(schema, sliced_values); + this->ReadAndCheckSingleColumnFile(sliced_values.get()); +} + TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { // This also tests max_definition_level = 1 std::shared_ptr<Array> values; @@ -402,7 +435,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size)); + std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size); + ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); } ASSERT_OK_NO_THROW(writer.Close()); @@ -415,7 +449,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); ASSERT_OK_NO_THROW(WriteTable( - table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); + *table, default_memory_pool(), this->sink_, 512, default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); } @@ -430,8 +464,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { { // BufferOutputStream closed on gc auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); - ASSERT_OK_NO_THROW(WriteTable(table.get(), default_memory_pool(), arrow_sink_, 512, - default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable( + *table, default_memory_pool(), arrow_sink_, 512, default_writer_properties())); // XXX: Remove this after ARROW-455 completed ASSERT_OK(arrow_sink_->Close()); @@ -462,7 +496,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); - ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size)); + std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size); + ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); } ASSERT_OK_NO_THROW(writer.Close()); @@ -476,8 +511,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values)); std::shared_ptr<Table> table = MakeSimpleTable(values, true); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_, - 512, default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, + default_writer_properties())); this->ReadAndCheckSingleColumnTable(values); } @@ -550,7 +585,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { .version(ParquetVersion::PARQUET_2_0) ->build(); ASSERT_OK_NO_THROW( - WriteTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + WriteTable(*table, default_memory_pool(), this->sink_, 512, properties)); this->ReadAndCheckSingleColumnTable(values); } @@ -571,8 +606,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { ::parquet::WriterProperties::Builder() .version(ParquetVersion::PARQUET_1_0) ->build(); - ASSERT_OK_NO_THROW(WriteTable( - table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties)); + ASSERT_OK_NO_THROW( + WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties)); std::shared_ptr<Array> expected_values; std::shared_ptr<PoolBuffer> int64_data = @@ -606,7 +641,7 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { ASSERT_OK(builder.Finish(&values)); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_, + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, values->length(), default_writer_properties())); std::shared_ptr<Table> out; @@ -692,8 +727,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type, - ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, - ::arrow::DoubleType> PrimitiveTestTypes; + ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType> + PrimitiveTestTypes; TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); @@ -738,7 +773,7 @@ void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads, auto sink = std::make_shared<InMemoryOutputStream>(); ASSERT_OK_NO_THROW(WriteTable( - table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); + *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); std::shared_ptr<Buffer> buffer = sink->GetBuffer(); std::unique_ptr<FileReader> reader; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 0be6b69..90e037f 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -59,6 +59,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { #define PRIMITIVE_VISIT(ArrowTypePrefix) \ Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ + array_offsets_.push_back(array.offset()); \ valid_bitmaps_.push_back(array.null_bitmap_data()); \ null_counts_.push_back(array.null_count()); \ values_type_ = array.type_enum(); \ @@ -86,12 +87,13 @@ class LevelBuilder : public ::arrow::ArrayVisitor { PRIMITIVE_VISIT(Interval) Status Visit(const ListArray& array) override { + array_offsets_.push_back(array.offset()); valid_bitmaps_.push_back(array.null_bitmap_data()); null_counts_.push_back(array.null_count()); offsets_.push_back(array.raw_value_offsets()); - min_offset_idx_ = array.raw_value_offsets()[min_offset_idx_]; - max_offset_idx_ = array.raw_value_offsets()[max_offset_idx_]; + min_offset_idx_ = array.value_offset(min_offset_idx_); + max_offset_idx_ = array.value_offset(max_offset_idx_); return array.values()->Accept(this); } @@ -108,15 +110,14 @@ class LevelBuilder : public ::arrow::ArrayVisitor { NOT_IMPLEMENTED_VIST(Decimal) NOT_IMPLEMENTED_VIST(Dictionary) - Status GenerateLevels(const Array* array, int64_t offset, int64_t length, - const std::shared_ptr<Field>& field, int64_t* values_offset, - ::arrow::Type::type* values_type, int64_t* num_values, int64_t* num_levels, - std::shared_ptr<Buffer>* def_levels, std::shared_ptr<Buffer>* rep_levels, - const Array** values_array) { + Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field, + int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values, + int64_t* num_levels, std::shared_ptr<Buffer>* def_levels, + std::shared_ptr<Buffer>* rep_levels, const Array** values_array) { // Work downwards to extract bitmaps and offsets - min_offset_idx_ = offset; - max_offset_idx_ = offset + length; - RETURN_NOT_OK(array->Accept(this)); + min_offset_idx_ = 0; + max_offset_idx_ = array.length(); + RETURN_NOT_OK(array.Accept(this)); *num_values = max_offset_idx_ - min_offset_idx_; *values_offset = min_offset_idx_; *values_type = values_type_; @@ -140,15 +141,15 @@ class LevelBuilder : public ::arrow::ArrayVisitor { // We have a PrimitiveArray *rep_levels = nullptr; if (nullable_[0]) { - RETURN_NOT_OK(def_levels_buffer_->Resize(length * sizeof(int16_t))); + RETURN_NOT_OK(def_levels_buffer_->Resize(array.length() * sizeof(int16_t))); auto def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data()); - if (array->null_count() == 0) { - std::fill(def_levels_ptr, def_levels_ptr + length, 1); + if (array.null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1); } else { - const uint8_t* valid_bits = array->null_bitmap_data(); - INIT_BITSET(valid_bits, offset); - for (int i = 0; i < length; i++) { + const uint8_t* valid_bits = array.null_bitmap_data(); + INIT_BITSET(valid_bits, array.offset()); + for (int i = 0; i < array.length(); i++) { if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { def_levels_ptr[i] = 1; } else { @@ -161,10 +162,10 @@ class LevelBuilder : public ::arrow::ArrayVisitor { } else { *def_levels = nullptr; } - *num_levels = length; + *num_levels = array.length(); } else { RETURN_NOT_OK(rep_levels_.Append(0)); - HandleListEntries(0, 0, offset, length); + HandleListEntries(0, 0, 0, array.length()); std::shared_ptr<Array> def_levels_array; RETURN_NOT_OK(def_levels_.Finish(&def_levels_array)); @@ -182,7 +183,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) { if (nullable_[rep_level]) { if (null_counts_[rep_level] == 0 || - BitUtil::GetBit(valid_bitmaps_[rep_level], index)) { + BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level])) { return HandleNonNullList(def_level + 1, rep_level, index); } else { return def_levels_.Append(def_level); @@ -205,7 +206,8 @@ class LevelBuilder : public ::arrow::ArrayVisitor { if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level + 1)); } if (nullable_[recursion_level] && ((null_counts_[recursion_level] == 0) || - BitUtil::GetBit(valid_bitmaps_[recursion_level], inner_offset + i))) { + BitUtil::GetBit(valid_bitmaps_[recursion_level], + inner_offset + i + array_offsets_[recursion_level]))) { RETURN_NOT_OK(def_levels_.Append(def_level + 2)); } else { // This can be produced in two case: @@ -236,6 +238,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { std::vector<int64_t> null_counts_; std::vector<const uint8_t*> valid_bitmaps_; std::vector<const int32_t*> offsets_; + std::vector<int32_t> array_offsets_; std::vector<bool> nullable_; int32_t min_offset_idx_; @@ -250,9 +253,8 @@ class FileWriter::Impl { Status NewRowGroup(int64_t chunk_size); template <typename ParquetType, typename ArrowType> - Status TypedWriteBatch(ColumnWriter* writer, const Array* data, int64_t offset, - int64_t num_values, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels); + Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr<Array>& data, + int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels); template <typename ParquetType, typename ArrowType> Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values, @@ -288,7 +290,7 @@ class FileWriter::Impl { return Status::OK(); } - Status WriteColumnChunk(const Array* data, int64_t offset, int64_t length); + Status WriteColumnChunk(const Array& data); Status Close(); virtual ~Impl() {} @@ -317,28 +319,28 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { } template <typename ParquetType, typename ArrowType> -Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const Array* array, - int64_t offset, int64_t num_values, int64_t num_levels, const int16_t* def_levels, +Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, + const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; - DCHECK((offset + num_values) <= array->length()); - auto data = static_cast<const PrimitiveArray*>(array); - auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset; + auto data = static_cast<const PrimitiveArray*>(array.get()); + auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()); auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer); if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data const ParquetCType* data_writer_ptr = nullptr; RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>( - data_ptr, num_values, &data_writer_ptr))); + data_ptr + data->offset(), array->length(), &data_writer_ptr))); PARQUET_CATCH_NOT_OK( writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr)); } else { const uint8_t* valid_bits = data->null_bitmap_data(); - RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, num_values, - num_levels, def_levels, rep_levels, valid_bits, offset, data_ptr))); + RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(), + num_levels, def_levels, rep_levels, valid_bits, data->offset(), + data_ptr + data->offset()))); } PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); @@ -389,18 +391,18 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) // ArrowType::c_type to ParquetType::c_type template <> Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>( - ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { - DCHECK((offset + num_values) <= array->length()); - RETURN_NOT_OK(data_buffer_.Resize(num_values)); - auto data = static_cast<const BooleanArray*>(array); + ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + RETURN_NOT_OK(data_buffer_.Resize(array->length())); + auto data = static_cast<const BooleanArray*>(array.get()); auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data()); auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data()); auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer); int buffer_idx = 0; - for (int i = 0; i < num_values; i++) { - if (!data->IsNull(offset + i)) { + int32_t offset = array->offset(); + for (int i = 0; i < data->length(); i++) { + if (!data->IsNull(i)) { buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i); } } @@ -412,11 +414,10 @@ Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>( template <> Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>( - ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { - DCHECK((offset + num_values) <= array->length()); - RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ByteArray))); - auto data = static_cast<const BinaryArray*>(array); + ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(ByteArray))); + auto data = static_cast<const BinaryArray*>(array.get()); auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data()); // In the case of an array consisting of only empty strings or all null, // data->data() points already to a nullptr, thus data->data()->data() will @@ -427,21 +428,22 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>( DCHECK(data_ptr != nullptr); } auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer); + const int32_t* value_offset = data->raw_value_offsets(); if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data - for (int64_t i = 0; i < num_values; i++) { + for (int64_t i = 0; i < data->length(); i++) { buffer_ptr[i] = - ByteArray(data->value_length(i + offset), data_ptr + data->value_offset(i)); + ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]); } PARQUET_CATCH_NOT_OK( writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } else { int buffer_idx = 0; - for (int64_t i = 0; i < num_values; i++) { - if (!data->IsNull(offset + i)) { - buffer_ptr[buffer_idx++] = ByteArray( - data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); + for (int64_t i = 0; i < data->length(); i++) { + if (!data->IsNull(i)) { + buffer_ptr[buffer_idx++] = + ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]); } } PARQUET_CATCH_NOT_OK( @@ -464,11 +466,9 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) { return impl_->NewRowGroup(chunk_size); } -Status FileWriter::Impl::WriteColumnChunk( - const Array* data, int64_t offset, int64_t length) { +Status FileWriter::Impl::WriteColumnChunk(const Array& data) { ColumnWriter* column_writer; PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); - DCHECK((offset + length) <= data->length()); int current_column_idx = row_group_writer_->current_column(); std::shared_ptr<::arrow::Schema> arrow_schema; @@ -481,10 +481,10 @@ Status FileWriter::Impl::WriteColumnChunk( ::arrow::Type::type values_type; int64_t num_levels; int64_t num_values; - const Array* values_array; - RETURN_NOT_OK(level_builder.GenerateLevels(data, offset, length, arrow_schema->field(0), - &values_offset, &values_type, &num_values, &num_levels, &def_levels_buffer, - &rep_levels_buffer, &values_array)); + const Array* _values_array; + RETURN_NOT_OK(level_builder.GenerateLevels(data, arrow_schema->field(0), &values_offset, + &values_type, &num_values, &num_levels, &def_levels_buffer, &rep_levels_buffer, + &_values_array)); const int16_t* def_levels = nullptr; if (def_levels_buffer) { def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data()); @@ -493,11 +493,12 @@ Status FileWriter::Impl::WriteColumnChunk( if (rep_levels_buffer) { rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data()); } + std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values); -#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \ - case ::arrow::Type::ArrowEnum: \ - return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(column_writer, values_array, \ - values_offset, num_values, num_levels, def_levels, rep_levels); \ +#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \ + case ::arrow::Type::ArrowEnum: \ + return TypedWriteBatch<ParquetType, ::arrow::ArrowType>( \ + column_writer, values_array, num_levels, def_levels, rep_levels); \ break; switch (values_type) { @@ -505,11 +506,11 @@ Status FileWriter::Impl::WriteColumnChunk( if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need // to use the larger Int64Type to store them lossless. - return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(column_writer, - values_array, values_offset, num_values, num_levels, def_levels, rep_levels); + return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>( + column_writer, values_array, num_levels, def_levels, rep_levels); } else { - return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(column_writer, - values_array, values_offset, num_values, num_levels, def_levels, rep_levels); + return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>( + column_writer, values_array, num_levels, def_levels, rep_levels); } } WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType) @@ -536,11 +537,8 @@ Status FileWriter::Impl::WriteColumnChunk( return Status::OK(); } -Status FileWriter::WriteColumnChunk( - const ::arrow::Array* array, int64_t offset, int64_t length) { - int64_t real_length = length; - if (length == -1) { real_length = array->length(); } - return impl_->WriteColumnChunk(array, offset, real_length); +Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) { + return impl_->WriteColumnChunk(array); } Status FileWriter::Close() { @@ -553,39 +551,39 @@ MemoryPool* FileWriter::memory_pool() const { FileWriter::~FileWriter() {} -Status WriteTable(const Table* table, MemoryPool* pool, +Status WriteTable(const Table& table, MemoryPool* pool, const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties) { std::shared_ptr<SchemaDescriptor> parquet_schema; - RETURN_NOT_OK( - ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); + RETURN_NOT_OK(ToParquetSchema(table.schema().get(), *properties, &parquet_schema)); auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); std::unique_ptr<ParquetFileWriter> parquet_writer = ParquetFileWriter::Open(sink, schema_node, properties); FileWriter writer(pool, std::move(parquet_writer)); // TODO(ARROW-232) Support writing chunked arrays. - for (int i = 0; i < table->num_columns(); i++) { - if (table->column(i)->data()->num_chunks() != 1) { + for (int i = 0; i < table.num_columns(); i++) { + if (table.column(i)->data()->num_chunks() != 1) { return Status::NotImplemented("No support for writing chunked arrays yet."); } } - for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { + for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { int64_t offset = chunk * chunk_size; - int64_t size = std::min(chunk_size, table->num_rows() - offset); + int64_t size = std::min(chunk_size, table.num_rows() - offset); RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); - for (int i = 0; i < table->num_columns(); i++) { - std::shared_ptr<Array> array = table->column(i)->data()->chunk(0); - RETURN_NOT_OK_ELSE(writer.WriteColumnChunk(array.get(), offset, size), - PARQUET_IGNORE_NOT_OK(writer.Close())); + for (int i = 0; i < table.num_columns(); i++) { + std::shared_ptr<Array> array = table.column(i)->data()->chunk(0); + array = array->Slice(offset, size); + RETURN_NOT_OK_ELSE( + writer.WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(writer.Close())); } } return writer.Close(); } -Status WriteTable(const Table* table, MemoryPool* pool, +Status WriteTable(const Table& table, MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties) { auto wrapper = std::make_shared<ArrowOutputStream>(sink); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h index 4a39c99..e3b281b 100644 --- a/src/parquet/arrow/writer.h +++ b/src/parquet/arrow/writer.h @@ -50,8 +50,7 @@ class PARQUET_EXPORT FileWriter { FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer); ::arrow::Status NewRowGroup(int64_t chunk_size); - ::arrow::Status WriteColumnChunk( - const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1); + ::arrow::Status WriteColumnChunk(const ::arrow::Array& data); ::arrow::Status Close(); virtual ~FileWriter(); @@ -68,12 +67,12 @@ class PARQUET_EXPORT FileWriter { * * The table shall only consist of columns of primitive type or of primitive lists. */ -::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table, +::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); -::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table, +::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 19a7c49..dedb2c2 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -244,7 +244,8 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio } typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - BooleanType, ByteArrayType, FLBAType> TestTypes; + BooleanType, ByteArrayType, FLBAType> + TestTypes; TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index fa31e62..5d137b7 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -145,7 +145,8 @@ static int num_pages = 20; static int batch_size = 32; typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - ByteArrayType> TestTypes; + ByteArrayType> + TestTypes; using TestBooleanFlatScanner = TestFlatScanner<BooleanType>; using TestFLBAFlatScanner = TestFlatScanner<FLBAType>; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/encoding-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index ad4a78f..67cd7ba 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -26,12 +26,12 @@ #include <arrow/util/bit-util.h> -#include "parquet/exception.h" #include "parquet/encoding.h" +#include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/types.h" -#include "parquet/util/bit-util.h" #include "parquet/util/bit-stream-utils.inline.h" +#include "parquet/util/bit-util.h" #include "parquet/util/cpu-info.h" #include "parquet/util/hash-util.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/util/bit-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h index d1e81a3..56d6c03 100644 --- a/src/parquet/util/bit-util.h +++ b/src/parquet/util/bit-util.h @@ -32,9 +32,9 @@ namespace parquet { -#define INIT_BITSET(valid_bits_vector, valid_bits_index) \ - int byte_offset_##valid_bits_vector = valid_bits_index / 8; \ - int bit_offset_##valid_bits_vector = valid_bits_index % 8; \ +#define INIT_BITSET(valid_bits_vector, valid_bits_index) \ + int byte_offset_##valid_bits_vector = (valid_bits_index) / 8; \ + int bit_offset_##valid_bits_vector = (valid_bits_index) % 8; \ uint8_t bitset_##valid_bits_vector = valid_bits_vector[byte_offset_##valid_bits_vector]; #define READ_NEXT_BITSET(valid_bits_vector) \
