Repository: parquet-cpp Updated Branches: refs/heads/master a74c3016a -> 16466b109
PARQUET-805: Read Int96 into Arrow Timestamp(ns) Author: Uwe L. Korn <[email protected]> Closes #204 from xhochy/PARQUET-805 and squashes the following commits: 895dc30 [Uwe L. Korn] Add missing return type a2f7f5b [Uwe L. Korn] Incorporate review f2255a3 [Uwe L. Korn] PARQUET-805: Read Int96 into Arrow Timestamp(ns) Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/16466b10 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/16466b10 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/16466b10 Branch: refs/heads/master Commit: 16466b109e87792786d0612fb9f94fad39d13d6c Parents: a74c301 Author: Uwe L. Korn <[email protected]> Authored: Tue Dec 20 16:18:13 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Tue Dec 20 16:18:13 2016 -0500 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 52 ++++++++++++++++++++++ src/parquet/arrow/arrow-schema-test.cc | 11 +++-- src/parquet/arrow/reader.cc | 51 ++++++++++++++++++++- src/parquet/arrow/schema.cc | 7 +-- 4 files changed, 111 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 25ba457..a8a5db0 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -391,6 +391,58 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { this->ReadAndCheckSingleColumnTable(values); } +using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>; + +TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { + // This test explicitly tests the conversion from an Impala-style timestamp + // to a nanoseconds-since-epoch one. + + // 2nd January 1970, 11:35min 145738543ns + Int96 day; + day.value[2] = 2440589l; + int64_t seconds = ((1 * 24 + 11) * 60 + 35) * 60; + *(reinterpret_cast<int64_t*>(&(day.value))) = + seconds * 1000l * 1000l * 1000l + 145738543; + // Compute the corresponding nanosecond timestamp + struct tm datetime = {0}; + datetime.tm_year = 70; + datetime.tm_mon = 0; + datetime.tm_mday = 2; + datetime.tm_hour = 11; + datetime.tm_min = 35; + struct tm epoch = {0}; + epoch.tm_year = 70; + epoch.tm_mday = 1; + // Nanoseconds since the epoch + int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000000; + val += 145738543; + + std::vector<std::shared_ptr<schema::Node>> fields( + {schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)}); + std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>( + schema::GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API + // to write an Int96 file. + this->sink_ = std::make_shared<InMemoryOutputStream>(); + auto writer = ParquetFileWriter::Open(this->sink_, schema); + RowGroupWriter* rg_writer = writer->AppendRowGroup(1); + ColumnWriter* c_writer = rg_writer->NextColumn(); + auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer); + ASSERT_NE(typed_writer, nullptr); + typed_writer->WriteBatch(1, nullptr, nullptr, &day); + c_writer->Close(); + rg_writer->Close(); + writer->Close(); + + ::arrow::TimestampBuilder builder( + default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO)); + builder.Append(val); + std::shared_ptr<Array> values; + ASSERT_OK(builder.Finish(&values)); + this->ReadAndCheckSingleColumnFile(values.get()); +} + using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/arrow-schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 360680f..3d07561 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -47,6 +47,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); const auto UTF8 = std::make_shared<::arrow::StringType>(); const auto TIMESTAMP_MS = std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto TIMESTAMP_NS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO); // TODO: This requires parquet-cpp implementing the MICROS enum value // const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO); const auto BINARY = @@ -98,9 +100,9 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false)); - // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, - // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); - // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false)); + parquet_fields.push_back( + PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); + arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false)); parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); @@ -339,9 +341,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { TEST_F(TestConvertParquetSchema, UnsupportedThings) { std::vector<NodePtr> unsupported_nodes; - unsupported_nodes.push_back( - PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)); - unsupported_nodes.push_back(PrimitiveNode::Make( "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index f2d4639..2efa806 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -18,6 +18,7 @@ #include "parquet/arrow/reader.h" #include <algorithm> +#include <chrono> #include <queue> #include <string> #include <vector> @@ -44,6 +45,15 @@ using ParquetReader = parquet::ParquetFileReader; namespace parquet { namespace arrow { +constexpr int64_t kJulianToUnixEpochDays = 2440588L; +constexpr int64_t kNanosecondsInADay = 86400L * 1000L * 1000L * 1000L; + +static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) { + int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays; + int64_t nanoseconds = *(reinterpret_cast<const int64_t*>(&(impala_timestamp.value))); + return days_since_epoch * kNanosecondsInADay + nanoseconds; +} + template <typename ArrowType> struct ArrowTypeTraits { typedef ::arrow::NumericArray<ArrowType> array_type; @@ -239,6 +249,15 @@ void FlatColumnReader::Impl::ReadNonNullableBatch( } template <> +void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( + Int96* values, int64_t values_read) { + int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_); + for (int64_t i = 0; i < values_read; i++) { + out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]); + } +} + +template <> void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( bool* values, int64_t values_read) { for (int64_t i = 0; i < values_read; i++) { @@ -266,6 +285,22 @@ void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, } template <> +void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>( + const int16_t* def_levels, Int96* values, int64_t values_read, int64_t levels_read) { + auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + null_count_++; + } else { + ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_); + data_ptr[valid_bits_idx_] = impala_timestamp_to_nanoseconds(values[values_idx++]); + } + valid_bits_idx_++; + } +} + +template <> void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>( const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) { int values_idx = 0; @@ -518,7 +553,21 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType) - TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) + case ::arrow::Type::TIMESTAMP: { + ::arrow::TimestampType* timestamp_type = + static_cast<::arrow::TimestampType*>(field_->type.get()); + switch (timestamp_type->unit) { + case ::arrow::TimeUnit::MILLI: + return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out); + break; + case ::arrow::TimeUnit::NANO: + return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out); + break; + default: + return Status::NotImplemented("TimeUnit not supported"); + } + break; + } default: return Status::NotImplemented(field_->type->ToString()); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 2875dc6..e578ec2 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -58,6 +58,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); const auto UTF8 = std::make_shared<::arrow::StringType>(); const auto TIMESTAMP_MS = std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto TIMESTAMP_NS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO); const auto BINARY = std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8)); @@ -162,9 +164,8 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) { RETURN_NOT_OK(FromInt64(primitive, out)); break; case ParquetType::INT96: - // TODO: Do we have that type in Arrow? - // type = TypePtr(new Int96Type()); - return Status::NotImplemented("int96"); + *out = TIMESTAMP_NS; + break; case ParquetType::FLOAT: *out = FLOAT; break;
