This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch maint-0.14.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 6dadd494616fa5ebd37be94ceec98a50324fa7fb Author: Wes McKinney <[email protected]> AuthorDate: Fri Jul 12 12:47:36 2019 -0500 ARROW-5878: [C++][Parquet] Restore pre-0.14.0 Parquet forward compatibility by adding option to unconditionally set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType Set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType whether `isAdjustedToUTC` is set in the LogicalType or not Author: Wes McKinney <[email protected]> Author: Benjamin Kietzman <[email protected]> Closes #4825 from bkietz/5878-C-Parquet-reader-not-forward-compatible- and squashes the following commits: f5213f576 <Wes McKinney> Add force_set_converted_type to ToString and ToJSON output 9350fea40 <Wes McKinney> Add option to force setting ConvertedType for timestamps b8fd4d61d <Benjamin Kietzman> Coerce strictly to TIMESTAMP_MICROS/TIMESTAMP_MILLIS --- cpp/src/parquet/arrow/arrow-schema-test.cc | 21 +++++++----- cpp/src/parquet/arrow/schema.cc | 12 +++++-- cpp/src/parquet/schema-test.cc | 42 +++++++++++++++-------- cpp/src/parquet/types.cc | 55 ++++++++++++++++++++++-------- cpp/src/parquet/types.h | 18 ++++++++-- 5 files changed, 106 insertions(+), 42 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc index 9a43c7d..6972620 100644 --- a/cpp/src/parquet/arrow/arrow-schema-test.cc +++ b/cpp/src/parquet/arrow/arrow-schema-test.cc @@ -855,14 +855,18 @@ TEST_F(TestConvertArrowSchema, ArrowFields) { {"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO), LogicalType::Time(true, LogicalType::TimeUnit::NANOS), ParquetType::INT64, -1}, {"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI), - LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), ParquetType::INT64, - -1}, + LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, + /*force_set_converted_type=*/true), + ParquetType::INT64, -1}, {"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO), - LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64, - -1}, + LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS, + /*force_set_converted_type=*/true), + ParquetType::INT64, -1}, + // Parquet v1, values converted to microseconds {"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO), - LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64, - -1}, + LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS, + /*force_set_converted_type=*/true), + ParquetType::INT64, -1}, {"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"), LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS), ParquetType::INT64, -1}, @@ -887,14 +891,15 @@ TEST_F(TestConvertArrowSchema, ArrowFields) { std::vector<NodePtr> parquet_fields; for (const FieldConstructionArguments& c : cases) { - arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false)); + arrow_fields.push_back(::arrow::field(c.name, c.datatype, false)); parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED, c.logical_type, c.physical_type, c.physical_length)); } ASSERT_OK(ConvertSchema(arrow_fields)); - ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); + CheckFlatSchema(parquet_fields); + // ASSERT_NO_FATAL_FAILURE(); } TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 9284c8c..12d4194 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -521,11 +521,19 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp( const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) { const bool utc = !(timestamp_type.timezone().empty()); + // ARROW-5878(wesm): for forward compatibility reasons, and because + // there's no other way to signal to old readers that values are + // timestamps, we force the ConvertedType field to be set to the + // corresponding TIMESTAMP_* value. This does cause some ambiguity + // as Parquet readers have not been consistent about the + // interpretation of TIMESTAMP_* values as being UTC-normalized. switch (time_unit) { case ::arrow::TimeUnit::MILLI: - return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS); + return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS, + /*force_set_converted_type=*/true); case ::arrow::TimeUnit::MICRO: - return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS); + return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS, + /*force_set_converted_type=*/true); case ::arrow::TimeUnit::NANO: return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS); case ::arrow::TimeUnit::SECOND: diff --git a/cpp/src/parquet/schema-test.cc b/cpp/src/parquet/schema-test.cc index 4e11d1f..badd997 100644 --- a/cpp/src/parquet/schema-test.cc +++ b/cpp/src/parquet/schema-test.cc @@ -1079,8 +1079,6 @@ TEST(TestLogicalTypeConstruction, NewTypeIncompatibility) { {LogicalType::Time(false, LogicalType::TimeUnit::MICROS), check_is_time}, {LogicalType::Time(false, LogicalType::TimeUnit::NANOS), check_is_time}, {LogicalType::Time(true, LogicalType::TimeUnit::NANOS), check_is_time}, - {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), check_is_timestamp}, - {LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), check_is_timestamp}, {LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS), check_is_timestamp}, {LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS), check_is_timestamp}, }; @@ -1399,23 +1397,29 @@ TEST(TestLogicalTypeOperation, LogicalTypeRepresentation) { "Time(isAdjustedToUTC=false, timeUnit=nanoseconds)", R"({"Type": "Time", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"}, {LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS), - "Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds)", - R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds"})"}, + "Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds, " + "force_set_converted_type=false)", + R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds", "force_set_converted_type": false})"}, {LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS), - "Timestamp(isAdjustedToUTC=true, timeUnit=microseconds)", - R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds"})"}, + "Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, " + "force_set_converted_type=false)", + R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds", "force_set_converted_type": false})"}, {LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS), - "Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds)", - R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds"})"}, - {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), - "Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds)", - R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds"})"}, + "Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds, " + "force_set_converted_type=false)", + R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds", "force_set_converted_type": false})"}, + {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, true), + "Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, " + "force_set_converted_type=true)", + R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds", "force_set_converted_type": true})"}, {LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), - "Timestamp(isAdjustedToUTC=false, timeUnit=microseconds)", - R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds"})"}, + "Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, " + "force_set_converted_type=false)", + R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "force_set_converted_type": false})"}, {LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS), - "Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds)", - R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"}, + "Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds, " + "force_set_converted_type=false)", + R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds", "force_set_converted_type": false})"}, {LogicalType::Interval(), "Interval", R"({"Type": "Interval"})"}, {LogicalType::Int(8, false), "Int(bitWidth=8, isSigned=false)", R"({"Type": "Int", "bitWidth": 8, "isSigned": false})"}, @@ -1914,10 +1918,18 @@ TEST_F(TestTemporalSchemaElementConstruction, TemporalCases) { Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP}, {"timestamp_F_ms", LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP}, + {"timestamp_F_ms_force", + LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, + /*force_set_converted_type=*/true), + Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP}, {"timestamp_T_us", LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS), Type::INT64, -1, true, ConvertedType::TIMESTAMP_MICROS, true, check_TIMESTAMP}, {"timestamp_F_us", LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP}, + {"timestamp_F_us_force", + LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, + /*force_set_converted_type=*/true), + Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP}, {"timestamp_T_ns", LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS), Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP}, {"timestamp_F_ns", LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS), diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 644e28f..2c75439 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -496,9 +496,11 @@ std::shared_ptr<const LogicalType> LogicalType::Time( } std::shared_ptr<const LogicalType> LogicalType::Timestamp( - bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit) { + bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit, + bool force_set_converted_type) { DCHECK(time_unit != LogicalType::TimeUnit::UNKNOWN); - return TimestampLogicalType::Make(is_adjusted_to_utc, time_unit); + return TimestampLogicalType::Make(is_adjusted_to_utc, time_unit, + force_set_converted_type); } std::shared_ptr<const LogicalType> LogicalType::Interval() { @@ -1204,14 +1206,19 @@ class LogicalType::Impl::Timestamp final : public LogicalType::Impl::Compatible, bool is_adjusted_to_utc() const { return adjusted_; } LogicalType::TimeUnit::unit time_unit() const { return unit_; } + bool force_set_converted_type() const { return force_set_converted_type_; } + private: - Timestamp(bool a, LogicalType::TimeUnit::unit u) + Timestamp(bool adjusted, LogicalType::TimeUnit::unit unit, + bool force_set_converted_type) : LogicalType::Impl(LogicalType::Type::TIMESTAMP, SortOrder::SIGNED), LogicalType::Impl::SimpleApplicable(parquet::Type::INT64), - adjusted_(a), - unit_(u) {} + adjusted_(adjusted), + unit_(unit), + force_set_converted_type_(force_set_converted_type) {} bool adjusted_ = false; LogicalType::TimeUnit::unit unit_; + bool force_set_converted_type_ = false; }; bool LogicalType::Impl::Timestamp::is_compatible( @@ -1219,10 +1226,20 @@ bool LogicalType::Impl::Timestamp::is_compatible( schema::DecimalMetadata converted_decimal_metadata) const { if (converted_decimal_metadata.isset) { return false; - } else if (adjusted_ && unit_ == LogicalType::TimeUnit::MILLIS) { - return converted_type == ConvertedType::TIMESTAMP_MILLIS; - } else if (adjusted_ && unit_ == LogicalType::TimeUnit::MICROS) { - return converted_type == ConvertedType::TIMESTAMP_MICROS; + } else if (unit_ == LogicalType::TimeUnit::MILLIS) { + if (adjusted_ || force_set_converted_type_) { + return converted_type == ConvertedType::TIMESTAMP_MILLIS; + } else { + return (converted_type == ConvertedType::NONE) || + (converted_type == ConvertedType::NA); + } + } else if (unit_ == LogicalType::TimeUnit::MICROS) { + if (adjusted_ || force_set_converted_type_) { + return converted_type == ConvertedType::TIMESTAMP_MICROS; + } else { + return (converted_type == ConvertedType::NONE) || + (converted_type == ConvertedType::NA); + } } else { return (converted_type == ConvertedType::NONE) || (converted_type == ConvertedType::NA); @@ -1232,7 +1249,7 @@ bool LogicalType::Impl::Timestamp::is_compatible( ConvertedType::type LogicalType::Impl::Timestamp::ToConvertedType( schema::DecimalMetadata* out_decimal_metadata) const { reset_decimal_metadata(out_decimal_metadata); - if (adjusted_) { + if (adjusted_ || force_set_converted_type_) { if (unit_ == LogicalType::TimeUnit::MILLIS) { return ConvertedType::TIMESTAMP_MILLIS; } else if (unit_ == LogicalType::TimeUnit::MICROS) { @@ -1245,14 +1262,16 @@ ConvertedType::type LogicalType::Impl::Timestamp::ToConvertedType( std::string LogicalType::Impl::Timestamp::ToString() const { std::stringstream type; type << "Timestamp(isAdjustedToUTC=" << std::boolalpha << adjusted_ - << ", timeUnit=" << time_unit_string(unit_) << ")"; + << ", timeUnit=" << time_unit_string(unit_) + << ", force_set_converted_type=" << force_set_converted_type_ << ")"; return type.str(); } std::string LogicalType::Impl::Timestamp::ToJSON() const { std::stringstream json; json << R"({"Type": "Timestamp", "isAdjustedToUTC": )" << std::boolalpha << adjusted_ - << R"(, "timeUnit": ")" << time_unit_string(unit_) << R"("})"; + << R"(, "timeUnit": ")" << time_unit_string(unit_) + << R"(", "force_set_converted_type": )" << force_set_converted_type_ << R"(})"; return json.str(); } @@ -1288,13 +1307,14 @@ bool LogicalType::Impl::Timestamp::Equals(const LogicalType& other) const { } std::shared_ptr<const LogicalType> TimestampLogicalType::Make( - bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit) { + bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit, + bool force_set_converted_type) { if (time_unit == LogicalType::TimeUnit::MILLIS || time_unit == LogicalType::TimeUnit::MICROS || time_unit == LogicalType::TimeUnit::NANOS) { auto* logical_type = new TimestampLogicalType(); - logical_type->impl_.reset( - new LogicalType::Impl::Timestamp(is_adjusted_to_utc, time_unit)); + logical_type->impl_.reset(new LogicalType::Impl::Timestamp( + is_adjusted_to_utc, time_unit, force_set_converted_type)); return std::shared_ptr<const LogicalType>(logical_type); } else { throw ParquetException( @@ -1310,6 +1330,11 @@ LogicalType::TimeUnit::unit TimestampLogicalType::time_unit() const { return (dynamic_cast<const LogicalType::Impl::Timestamp&>(*impl_)).time_unit(); } +bool TimestampLogicalType::force_set_converted_type() const { + return (dynamic_cast<const LogicalType::Impl::Timestamp&>(*impl_)) + .force_set_converted_type(); +} + class LogicalType::Impl::Interval final : public LogicalType::Impl::SimpleCompatible, public LogicalType::Impl::TypeLengthApplicable { public: diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 0bfaf99..6f1906f 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -182,8 +182,17 @@ class PARQUET_EXPORT LogicalType { static std::shared_ptr<const LogicalType> Date(); static std::shared_ptr<const LogicalType> Time(bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit); + + /// \brief Create a Timestamp logical type + /// \param[in] is_adjusted_to_utc set true if the data is UTC-normalized + /// \param[in] time_unit the resolution of the timestamp + /// \param[in] force_set_converted_type if true, always set the + /// legacy ConvertedType TIMESTAMP_MICROS and TIMESTAMP_MILLIS + /// metadata. Default is false static std::shared_ptr<const LogicalType> Timestamp( - bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit); + bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit, + bool force_set_converted_type = false); + static std::shared_ptr<const LogicalType> Interval(); static std::shared_ptr<const LogicalType> Int(int bit_width, bool is_signed); static std::shared_ptr<const LogicalType> Null(); @@ -337,10 +346,15 @@ class PARQUET_EXPORT TimeLogicalType : public LogicalType { class PARQUET_EXPORT TimestampLogicalType : public LogicalType { public: static std::shared_ptr<const LogicalType> Make(bool is_adjusted_to_utc, - LogicalType::TimeUnit::unit time_unit); + LogicalType::TimeUnit::unit time_unit, + bool force_set_converted_type = false); bool is_adjusted_to_utc() const; LogicalType::TimeUnit::unit time_unit() const; + /// \brief If true, will set ConvertedType for micros and millis + /// resolution in legacy ConvertedType Thrift metadata + bool force_set_converted_type() const; + private: TimestampLogicalType() = default; };
