pitrou commented on a change in pull request #9582:
URL: https://github.com/apache/arrow/pull/9582#discussion_r585572255



##########
File path: cpp/src/parquet/statistics.cc
##########
@@ -133,56 +136,95 @@ struct CompareHelper<Int96Type, is_signed> {
   static T Max(int type_length, const T& a, const T& b) {
     return Compare(0, a, b) ? b : a;
   }
-};  // namespace parquet
-
-template <typename DType, bool is_signed>
-struct ByteLikeCompareHelperBase {
-  using T = ByteArrayType::c_type;
-  using PtrType = typename std::conditional<is_signed, int8_t, uint8_t>::type;
+};
 
-  static T DefaultMin() { return {}; }
-  static T DefaultMax() { return {}; }
-  static T Coalesce(T val, T fallback) { return val; }
+template <typename T, bool is_signed>
+struct BinaryLikeComparer {};
 
-  static inline bool Compare(int type_length, const T& a, const T& b) {
-    const auto* aptr = reinterpret_cast<const PtrType*>(a.ptr);
-    const auto* bptr = reinterpret_cast<const PtrType*>(b.ptr);
-    return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + 
b.len);
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/false> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+    // Unsigned comparison is used for non-numeric types so straight
+    // lexiographic comparison makes sense. (a.ptr is always unsigned)....
+    return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr 
+ b_length);
   }
+};
 
-  static T Min(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? a : b;
-  }
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/true> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    // Is signed is used for integers encoded as big-endian twos
+    // complement integers. (e.g. decimals).
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+
+    // At least of the lengths is zero.
+    if (a_length == 0 || b_length == 0) {
+      return a_length == 0 && b_length > 0;
+    }
 
-  static T Max(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? b : a;
+    int8_t first_a = *a.ptr;
+    int8_t first_b = *b.ptr;
+    // We can short circuit for different signed numbers or
+    // for equal length bytes arrays that have different first bytes.
+    if ((0x80 & first_a) != (0x80 & first_b) ||
+        (a_length == b_length && first_a != first_b)) {
+      return first_a < first_b;
+    }
+    // When the lengths are unequal and the numbers are of the same
+    // sign we need to extend the digits.
+    const uint8_t* a_start = a.ptr;
+    const uint8_t* b_start = b.ptr;
+    if (a_length != b_length) {
+      const uint8_t* lead_start = nullptr;
+      const uint8_t* lead_end = nullptr;
+      if (a_length > b_length) {
+        int lead_length = a_length - b_length;
+        lead_start = a.ptr;
+        lead_end = a.ptr + lead_length;
+        a_start += lead_length;
+      } else {
+        DCHECK_LT(a_length, b_length);
+        int lead_length = b_length - a_length;
+        lead_start = b.ptr;
+        lead_end = b.ptr + lead_length;
+        b_start += lead_length;
+      }
+      // Compare extra bytes to the sign extension of the first
+      // byte of the other number.
+      uint8_t extension = first_a < 0 ? 0xFF : 0;
+      for (; lead_start != lead_end; lead_start++) {
+        if (*lead_start < extension) {
+          // The first bytes of the long value are less
+          // then the extended short one.  So if a is the long value
+          // we can return true.
+          return a_length > b_length;
+        } else if (*lead_start > extension) {

Review comment:
       Same here: if `extension` is 0xFF then this is never possible?
   
   So perhaps it's possible to rewrite this:
   ```c++
   if (first_a < 0) {
     // Both numbers negative
     for (; lead_start != lead_end; lead_start++) {
       if (*lead_start != 0xff) {
         return a_length > b_length;
       }
     }
   } else {
     // Both numbers positive
     for (; lead_start != lead_end; lead_start++) {
       if (*lead_start != 0) {
         return a_length < b_length;
       }
     }
   }
   ```
   
   Note sure which one is more understandable.

##########
File path: cpp/src/parquet/statistics_test.cc
##########
@@ -71,19 +71,25 @@ static FLBA FLBAFromString(const std::string& s) {
 TEST(Comparison, SignedByteArray) {
   auto comparator = MakeComparator<ByteArrayType>(Type::BYTE_ARRAY, 
SortOrder::SIGNED);
 
-  std::string s1 = "12345";
-  std::string s2 = "12345678";
-  ByteArray s1ba = ByteArrayFromString(s1);
-  ByteArray s2ba = ByteArrayFromString(s2);
-  ASSERT_TRUE(comparator->Compare(s1ba, s2ba));
+  std::vector<uint8_t> byte_values[] = {
+      {0x80, 0x80, 0, 0},       {/*0xFF,*/ 0x80, 0, 0},     {/*0xFF,*/ 0xFF, 
0x01, 0},
+      {/*0xFF,0xFF,*/ 0x80, 0}, {/*0xFF,0xFF,0xFF,*/ 0x80}, {/*0xFF, 0xFF, 
0xFF,*/ 0xFF},
+      {/*0, 0,*/ 0x01, 0x01},   {/*0,*/ 0x01, 0x01, 0},     {0x01, 0x01, 0, 
0}};

Review comment:
       Can you also test pairs of equal values with differing lengths?
   Perhaps even do exhaustive pairwise comparisons, e.g.:
   ```c++
   struct Case {
     std::vector<uint8_t> bytes;
     int order;
   };
   std::vector<Case> cases[] = {
     {{0x80, 0x80, 0, 0}, 1},
     {{0xFF, 0x80, 0x80, 0, 0}, 1},
     {{0x80, 0, 0}, 2},
     {{0xFF, 0x80, 0, 0}, 2},
     {{0xFF, 0xFF, 0x80, 0, 0}, 2},
     {{0xFF, 0x01, 0}, 3},
     {{0xFF, 0xFF, 0x01, 0}, 3},
     // ...
   };
   for (const auto& a : cases) {
     for (const auto& b : cases) {
       ByteArray ba(static_cast<int>(a.bytes.size()), a.bytes.data()));
       ByteArray bb(static_cast<int>(b.bytes.size()), b.bytes.data()));
       if (a.order < b.order) {
         EXPECT_TRUE(comparator->Compare(ba, bb)) << ba << " " << bb;
       } else {
         EXPECT_FALSE(comparator->Compare(ba, bb)) << ba << " " << bb;
       }
     }
   }
   ```
   

##########
File path: cpp/src/parquet/statistics.cc
##########
@@ -133,56 +136,95 @@ struct CompareHelper<Int96Type, is_signed> {
   static T Max(int type_length, const T& a, const T& b) {
     return Compare(0, a, b) ? b : a;
   }
-};  // namespace parquet
-
-template <typename DType, bool is_signed>
-struct ByteLikeCompareHelperBase {
-  using T = ByteArrayType::c_type;
-  using PtrType = typename std::conditional<is_signed, int8_t, uint8_t>::type;
+};
 
-  static T DefaultMin() { return {}; }
-  static T DefaultMax() { return {}; }
-  static T Coalesce(T val, T fallback) { return val; }
+template <typename T, bool is_signed>
+struct BinaryLikeComparer {};
 
-  static inline bool Compare(int type_length, const T& a, const T& b) {
-    const auto* aptr = reinterpret_cast<const PtrType*>(a.ptr);
-    const auto* bptr = reinterpret_cast<const PtrType*>(b.ptr);
-    return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + 
b.len);
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/false> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+    // Unsigned comparison is used for non-numeric types so straight
+    // lexiographic comparison makes sense. (a.ptr is always unsigned)....
+    return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr 
+ b_length);
   }
+};
 
-  static T Min(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? a : b;
-  }
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/true> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    // Is signed is used for integers encoded as big-endian twos
+    // complement integers. (e.g. decimals).
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+
+    // At least of the lengths is zero.
+    if (a_length == 0 || b_length == 0) {
+      return a_length == 0 && b_length > 0;
+    }
 
-  static T Max(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? b : a;
+    int8_t first_a = *a.ptr;
+    int8_t first_b = *b.ptr;
+    // We can short circuit for different signed numbers or
+    // for equal length bytes arrays that have different first bytes.
+    if ((0x80 & first_a) != (0x80 & first_b) ||
+        (a_length == b_length && first_a != first_b)) {
+      return first_a < first_b;
+    }

Review comment:
       Ok, I understand and your assessment looks correct to me. Can you just 
add a bit more explanations as comments, so this doesn't produce too much 
head-scratching later on? :-)

##########
File path: cpp/src/parquet/statistics.cc
##########
@@ -133,56 +136,95 @@ struct CompareHelper<Int96Type, is_signed> {
   static T Max(int type_length, const T& a, const T& b) {
     return Compare(0, a, b) ? b : a;
   }
-};  // namespace parquet
-
-template <typename DType, bool is_signed>
-struct ByteLikeCompareHelperBase {
-  using T = ByteArrayType::c_type;
-  using PtrType = typename std::conditional<is_signed, int8_t, uint8_t>::type;
+};
 
-  static T DefaultMin() { return {}; }
-  static T DefaultMax() { return {}; }
-  static T Coalesce(T val, T fallback) { return val; }
+template <typename T, bool is_signed>
+struct BinaryLikeComparer {};
 
-  static inline bool Compare(int type_length, const T& a, const T& b) {
-    const auto* aptr = reinterpret_cast<const PtrType*>(a.ptr);
-    const auto* bptr = reinterpret_cast<const PtrType*>(b.ptr);
-    return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + 
b.len);
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/false> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+    // Unsigned comparison is used for non-numeric types so straight
+    // lexiographic comparison makes sense. (a.ptr is always unsigned)....
+    return std::lexicographical_compare(a.ptr, a.ptr + a_length, b.ptr, b.ptr 
+ b_length);
   }
+};
 
-  static T Min(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? a : b;
-  }
+template <typename T>
+struct BinaryLikeComparer<T, /*is_signed=*/true> {
+  static bool Compare(int type_length, const T& a, const T& b) {
+    // Is signed is used for integers encoded as big-endian twos
+    // complement integers. (e.g. decimals).
+    int a_length = value_length(type_length, a);
+    int b_length = value_length(type_length, b);
+
+    // At least of the lengths is zero.
+    if (a_length == 0 || b_length == 0) {
+      return a_length == 0 && b_length > 0;
+    }
 
-  static T Max(int type_length, const T& a, const T& b) {
-    if (a.ptr == nullptr) return b;
-    if (b.ptr == nullptr) return a;
-    return Compare(type_length, a, b) ? b : a;
+    int8_t first_a = *a.ptr;
+    int8_t first_b = *b.ptr;
+    // We can short circuit for different signed numbers or
+    // for equal length bytes arrays that have different first bytes.
+    if ((0x80 & first_a) != (0x80 & first_b) ||
+        (a_length == b_length && first_a != first_b)) {
+      return first_a < first_b;
+    }
+    // When the lengths are unequal and the numbers are of the same
+    // sign we need to extend the digits.
+    const uint8_t* a_start = a.ptr;
+    const uint8_t* b_start = b.ptr;
+    if (a_length != b_length) {
+      const uint8_t* lead_start = nullptr;
+      const uint8_t* lead_end = nullptr;
+      if (a_length > b_length) {
+        int lead_length = a_length - b_length;
+        lead_start = a.ptr;
+        lead_end = a.ptr + lead_length;
+        a_start += lead_length;
+      } else {
+        DCHECK_LT(a_length, b_length);
+        int lead_length = b_length - a_length;
+        lead_start = b.ptr;
+        lead_end = b.ptr + lead_length;
+        b_start += lead_length;
+      }
+      // Compare extra bytes to the sign extension of the first
+      // byte of the other number.
+      uint8_t extension = first_a < 0 ? 0xFF : 0;
+      for (; lead_start != lead_end; lead_start++) {
+        if (*lead_start < extension) {

Review comment:
       If `extension` is 0 then this is never possible, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to