isichei commented on a change in pull request #10461:
URL: https://github.com/apache/arrow/pull/10461#discussion_r650521753



##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -1671,6 +1671,91 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) {
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
 }
 
+// Test for added functionality in ARROW-12096
+TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+  using ::arrow::schema;
+
+  std::vector<bool> is_valid = {true, true, true, true};
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+  auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+  auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+  // Values demonstrate loss of resolution when "down sampling" INT96 to units 
that are not NS
+  std::vector<int64_t> s_values = {1489269, 1489269, 1489269, 1489269};
+  std::vector<int64_t> ms_values = {1489269000, 1489269000,
+                                    1489269000, 1489269001};
+  std::vector<int64_t> us_values = {1489269000000, 1489269000000,
+                                    1489269000001, 1489269001000};
+  std::vector<int64_t> ns_values = {1489269000000000LL, 1489269000000001LL,
+                                    1489269000001000LL, 1489269001000000LL};
+
+  std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, 
&a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, 
&a_ms);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, 
&a_us);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, 
&a_ns);

Review comment:
       Have rewritten tests to use a helper function. Hopefully cleaner.

##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -1671,6 +1671,91 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) {
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
 }
 
+// Test for added functionality in ARROW-12096
+TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+  using ::arrow::schema;
+
+  std::vector<bool> is_valid = {true, true, true, true};
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+  auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+  auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+  // Values demonstrate loss of resolution when "down sampling" INT96 to units 
that are not NS
+  std::vector<int64_t> s_values = {1489269, 1489269, 1489269, 1489269};
+  std::vector<int64_t> ms_values = {1489269000, 1489269000,
+                                    1489269000, 1489269001};
+  std::vector<int64_t> us_values = {1489269000000, 1489269000000,
+                                    1489269000001, 1489269001000};
+  std::vector<int64_t> ns_values = {1489269000000000LL, 1489269000000001LL,
+                                    1489269000001000LL, 1489269001000000LL};
+
+  std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, 
&a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, 
&a_ms);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, 
&a_us);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, 
&a_ns);
+
+  // Create single input table of NS to be written to parquet with INT96
+  auto input_schema = schema({field("f", t_ns)});
+  auto input = Table::Make(input_schema, {a_ns});
+
+  // Create an expected schema for each resulting table (one for each "down 
sampled" ts)
+  auto ex_schema_s = schema({field("f", t_s)});
+  auto ex_schema_ms = schema({field("f", t_ms)});
+  auto ex_schema_us = schema({field("f", t_us)});
+  
+  // Create tables
+  auto ex_result_s = Table::Make(ex_schema_s, {a_s});
+  auto ex_result_ms = Table::Make(ex_schema_ms, {a_ms});
+  auto ex_result_us = Table::Make(ex_schema_us, {a_us});
+
+  std::shared_ptr<Table> result_s;
+  std::shared_ptr<Table> result_ms;
+  std::shared_ptr<Table> result_us;
+
+  ArrowReaderProperties arrow_reader_prop_s, arrow_reader_prop_ms, 
arrow_reader_prop_us;
+  
arrow_reader_prop_s.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::SECOND);
+  
arrow_reader_prop_ms.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::MILLI);
+  
arrow_reader_prop_us.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::MICRO);
+
+// SECOND
+  ASSERT_NO_FATAL_FAILURE(DoRoundtrip(
+    input, input->num_rows(), &result_s, default_writer_properties(),
+    
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(),
+    arrow_reader_prop_s));
+
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_s->schema(),
+                                                     *result_s->schema(),
+                                                     
/*check_metadata=*/false));
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_s, *result_s));

Review comment:
       See previous comment thread.

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -353,7 +353,8 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, 
Datum* out) {
 }
 
 Status TransferInt96(RecordReader* reader, MemoryPool* pool,
-                     const std::shared_ptr<DataType>& type, Datum* out) {
+                     const std::shared_ptr<DataType>& type, Datum* out,
+                     const ::arrow::TimeUnit::type& int96_arrow_time_unit) {

Review comment:
       Addressed.

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -353,7 +353,8 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, 
Datum* out) {
 }
 
 Status TransferInt96(RecordReader* reader, MemoryPool* pool,
-                     const std::shared_ptr<DataType>& type, Datum* out) {
+                     const std::shared_ptr<DataType>& type, Datum* out,
+                     const ::arrow::TimeUnit::type& int96_arrow_time_unit) {

Review comment:
       Done.

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -742,20 +752,20 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
     case ::arrow::Type::TIMESTAMP: {
       const ::arrow::TimestampType& timestamp_type =
           checked_cast<::arrow::TimestampType&>(*value_type);
-      switch (timestamp_type.unit()) {
-        case ::arrow::TimeUnit::MILLI:
-        case ::arrow::TimeUnit::MICRO: {
-          result = TransferZeroCopy(reader, value_type);
-        } break;
-        case ::arrow::TimeUnit::NANO: {
-          if (descr->physical_type() == ::parquet::Type::INT96) {
-            RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
-          } else {
+      if (descr->physical_type() == ::parquet::Type::INT96) {
+            RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result, 
timestamp_type.unit()));
+        }
+      else {
+        switch (timestamp_type.unit()) {
+          case ::arrow::TimeUnit::SECOND:
+          case ::arrow::TimeUnit::MILLI:
+          case ::arrow::TimeUnit::MICRO:
+          case ::arrow::TimeUnit::NANO: {
             result = TransferZeroCopy(reader, value_type);
-          }
-        } break;
-        default:
-          return Status::NotImplemented("TimeUnit not supported");
+          } break;
+          default:

Review comment:
       Done.

##########
File path: cpp/src/parquet/arrow/schema_internal.cc
##########
@@ -181,7 +181,8 @@ Result<std::shared_ptr<ArrowType>> FromInt64(const 
LogicalType& logical_type) {
 
 Result<std::shared_ptr<ArrowType>> GetArrowType(Type::type physical_type,
                                                 const LogicalType& 
logical_type,
-                                                int type_length) {
+                                                int type_length,
+                                                const ::arrow::TimeUnit::type& 
int96_arrow_time_unit) {

Review comment:
       Done.

##########
File path: cpp/src/parquet/arrow/schema_internal.cc
##########
@@ -211,14 +212,22 @@ Result<std::shared_ptr<ArrowType>> 
GetArrowType(Type::type physical_type,
   }
 }
 
+// ARROW-12096 -- Overloading functions with new input (setting default as 
NANO)

Review comment:
       Done.

##########
File path: cpp/src/parquet/arrow/schema_internal.cc
##########
@@ -211,14 +212,22 @@ Result<std::shared_ptr<ArrowType>> 
GetArrowType(Type::type physical_type,
   }
 }
 
+// ARROW-12096 -- Overloading functions with new input (setting default as 
NANO)
 Result<std::shared_ptr<ArrowType>> GetArrowType(const schema::PrimitiveNode& 
primitive) {
   return GetArrowType(primitive.physical_type(), *primitive.logical_type(),
-                      primitive.type_length());
+                      primitive.type_length(), ::arrow::TimeUnit::NANO);
 }
 
 Result<std::shared_ptr<ArrowType>> GetArrowType(const ColumnDescriptor& 
descriptor) {
   return GetArrowType(descriptor.physical_type(), *descriptor.logical_type(),
-                      descriptor.type_length());
+                      descriptor.type_length(), ::arrow::TimeUnit::NANO);
+}
+
+// ARROW-12096 -- Exposing INT96 arrow type definition fromm parquet reader

Review comment:
       Done.

##########
File path: cpp/src/parquet/arrow/schema_internal.h
##########
@@ -39,8 +39,20 @@ Result<std::shared_ptr<::arrow::DataType>> 
GetArrowType(Type::type physical_type
                                                         const LogicalType& 
logical_type,
                                                         int type_length);
 
+// ARROW-12096 Exposing int96 arrow timestamp unit definition
+Result<std::shared_ptr<::arrow::DataType>> GetArrowType(Type::type 
physical_type,
+                                                        const LogicalType& 
logical_type,
+                                                        int type_length,
+                                                        const 
::arrow::TimeUnit::type& int96_arrow_time_unit);

Review comment:
       Went back to to review this and not sure how to address.
   
   Only thing I can imagine would be to add to `GetTypeForNode` (from 
`arrow/schema.cc`) and overwrite the standard `storage_type` if the parquet 
physical_type is INT96 and reader properties are not set to NANO? Let me know 
if I have misunderstood.

##########
File path: cpp/src/parquet/arrow/schema_internal.h
##########
@@ -39,8 +39,20 @@ Result<std::shared_ptr<::arrow::DataType>> 
GetArrowType(Type::type physical_type
                                                         const LogicalType& 
logical_type,
                                                         int type_length);
 
+// ARROW-12096 Exposing int96 arrow timestamp unit definition
+Result<std::shared_ptr<::arrow::DataType>> GetArrowType(Type::type 
physical_type,
+                                                        const LogicalType& 
logical_type,
+                                                        int type_length,
+                                                        const 
::arrow::TimeUnit::type& int96_arrow_time_unit);
+
 Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
     const schema::PrimitiveNode& primitive);
+
+// ARROW-12096 Exposing int96 arrow timestamp unit definition
+Result<std::shared_ptr<::arrow::DataType>> GetArrowType(

Review comment:
       Done.

##########
File path: cpp/src/parquet/types.h
##########
@@ -602,6 +602,49 @@ static inline int64_t Int96GetNanoSeconds(const 
parquet::Int96& i96) {
   return static_cast<int64_t>(days_since_epoch * kNanosecondsPerDay + 
nanoseconds);
 }
 
+// ARROW-12096
+static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) {
+  // We do the computations in the unsigned domain to avoid unsigned behaviour
+  // on overflow.
+  uint64_t days_since_epoch =
+      i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
+  uint64_t nanoseconds = 0;
+  memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
+
+  uint64_t microseconds = nanoseconds/static_cast<uint64_t>(1000);
+
+  return static_cast<int64_t>(days_since_epoch * kMicrosecondsPerDay + 
microseconds);
+}
+
+// ARROW-12096
+static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) {
+  // We do the computations in the unsigned domain to avoid unsigned behaviour
+  // on overflow.
+  uint64_t days_since_epoch =
+      i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
+  uint64_t nanoseconds = 0;
+  memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
+
+  uint64_t milliseconds = nanoseconds/static_cast<uint64_t>(1000000);
+
+  return static_cast<int64_t>(days_since_epoch * kMillisecondsPerDay + 
milliseconds);
+}
+
+// ARROW-12096
+static inline int64_t Int96GetSeconds(const parquet::Int96& i96) {
+  // We do the computations in the unsigned domain to avoid unsigned behaviour
+  // on overflow.
+  uint64_t days_since_epoch =
+      i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
+  
+  uint64_t nanoseconds = 0;
+  memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
+
+  uint64_t seconds = nanoseconds/(static_cast<uint64_t>(1000000000));
+
+  return static_cast<int64_t>(days_since_epoch * kSecondsPerDay + seconds);

Review comment:
       Went with your example in the end. As it made far more sense IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to