pitrou commented on a change in pull request #8897:
URL: https://github.com/apache/arrow/pull/8897#discussion_r542391497



##########
File path: cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
##########
@@ -655,6 +666,7 @@ std::vector<std::shared_ptr<CastFunction>> 
GetNumericCasts() {
   functions.push_back(GetCastToFloating<DoubleType>("cast_double"));
 
   functions.push_back(GetCastToDecimal());

Review comment:
       Nit: `GetCastToDecimal128`?

##########
File path: cpp/src/arrow/type.cc
##########
@@ -747,6 +747,75 @@ std::vector<std::shared_ptr<Field>> 
StructType::GetAllFieldsByName(
   return result;
 }
 
+// Taken from the Apache Impala codebase. The comments next
+// to the return values are the maximum value that can be represented in 2's
+// complement with the returned number of bytes.
+int32_t DecimalType::DecimalSize(int32_t precision) {
+  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal 
to 1, got "
+                          << precision;
+
+  switch (precision) {
+    case 1:
+    case 2:
+      return 1;  // 127
+    case 3:
+    case 4:
+      return 2;  // 32,767
+    case 5:
+    case 6:
+      return 3;  // 8,388,607
+    case 7:
+    case 8:
+    case 9:
+      return 4;  // 2,147,483,427
+    case 10:
+    case 11:
+      return 5;  // 549,755,813,887
+    case 12:
+    case 13:
+    case 14:
+      return 6;  // 140,737,488,355,327
+    case 15:
+    case 16:
+      return 7;  // 36,028,797,018,963,967
+    case 17:
+    case 18:
+      return 8;  // 9,223,372,036,854,775,807
+    case 19:
+    case 20:
+    case 21:
+      return 9;  // 2,361,183,241,434,822,606,847
+    case 22:
+    case 23:
+      return 10;  // 604,462,909,807,314,587,353,087
+    case 24:
+    case 25:
+    case 26:
+      return 11;  // 154,742,504,910,672,534,362,390,527
+    case 27:
+    case 28:
+      return 12;  // 39,614,081,257,132,168,796,771,975,167
+    case 29:
+    case 30:
+    case 31:
+      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
+    case 32:
+    case 33:
+      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
+    case 34:
+    case 35:
+      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
+    case 36:
+    case 37:
+    case 38:
+      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
+    default:
+      return static_cast<int32_t>(std::ceil((precision / 8.0) * 
std::log2(10)));

Review comment:
       The entire range can be precomputed:
   ```python
   >>> decimal_size = lambda prec: int(math.ceil((prec * math.log2(10) + 1) / 
8))
   >>> [decimal_size(i) for i in range(1, 77)]
   [1, 1, 2, 2, 3, 3, 4, 4, 4, 5, 5, 6, 6, 6, 7, 7, 8, 8, 9, 9, 9, 10, 10, 11, 
11, 11, 12, 12, 13, 13, 13, 14, 14, 15, 15, 16, 16, 16, 17, 17, 18, 18, 18, 19, 
19, 20, 20, 21, 21, 21, 22, 22, 23, 23, 23, 24, 24, 25, 25, 26, 26, 26, 27, 27, 
28, 28, 28, 29, 29, 30, 30, 31, 31, 31, 32, 32]
   ```
   
   So just make it a static array of integers instead of the humongous switch 
statement + runtime computation?
   (also your formula above is incorrect, it misses the adjustment for the sign 
bit)

##########
File path: cpp/src/arrow/util/basic_decimal.h
##########
@@ -303,6 +314,10 @@ ARROW_EXPORT inline bool operator>=(const BasicDecimal256& 
left,
   return !operator<(left, right);
 }
 
+ARROW_EXPORT BasicDecimal256 operator-(const BasicDecimal256& operand);
+ARROW_EXPORT BasicDecimal256 operator~(const BasicDecimal256& operand);
+ARROW_EXPORT BasicDecimal256 operator+(const BasicDecimal256& left,
+                                       const BasicDecimal256& right);

Review comment:
       No binary `operator-`? 

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -369,225 +371,134 @@ Status TransferBinary(RecordReader* reader, MemoryPool* 
pool,
 }
 
 // ----------------------------------------------------------------------
-// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
-
-static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t 
stop) {
-  const int32_t length = stop - start;
-
-  DCHECK_GE(length, 0);
-  DCHECK_LE(length, 8);
-
-  switch (length) {
-    case 0:
-      return 0;
-    case 1:
-      return bytes[start];
-    case 2:
-      return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-    case 3: {
-      const uint64_t first_two_bytes = 
FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_two_bytes << 8 | last_byte;
-    }
-    case 4:
-      return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-    case 5: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 8 | last_byte;
-    }
-    case 6: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      return first_four_bytes << 16 | last_two_bytes;
-    }
-    case 7: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t second_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
-    }
-    case 8:
-      return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
-    default: {
-      DCHECK(false);
-      return UINT64_MAX;
-    }
-  }
-}
-
-static constexpr int32_t kMinDecimalBytes = 1;
-static constexpr int32_t kMaxDecimalBytes = 16;
-
-/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) 
and one
-/// uint64_t (low bits).
-static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
-                               int64_t* out_high, uint64_t* out_low) {
-  DCHECK_GE(length, kMinDecimalBytes);
-  DCHECK_LE(length, kMaxDecimalBytes);
-
-  // XXX This code is copied from Decimal::FromBigEndian
-
-  int64_t high, low;
-
-  // Bytes are coming in big-endian, so the first byte is the MSB and 
therefore holds the
-  // sign bit.
-  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
-
-  // 1. Extract the high bytes
-  // Stop byte of the high bytes
-  const int32_t high_bits_offset = std::max(0, length - 8);
-  const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
-
-  if (high_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    high = high_bits;
-  } else {
-    high = -1 * (is_negative && length < kMaxDecimalBytes);
-    // Shift left enough bits to make room for the incoming int64_t
-    high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    high |= high_bits;
-  }
-
-  // 2. Extract the low bytes
-  // Stop byte of the low bytes
-  const int32_t low_bits_offset = std::min(length, 8);
-  const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
-
-  if (low_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    low = low_bits;
-  } else {
-    // Sign extend the low bits if necessary
-    low = -1 * (is_negative && length < 8);
-    // Shift left enough bits to make room for the incoming int64_t
-    low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    low |= low_bits;
-  }
-
-  *out_high = high;
-  *out_low = static_cast<uint64_t>(low);
+// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || 
Decimal256
+
+template <typename DecimalType>
+Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
+                              uint8_t* out_buf) {
+  ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, 
byte_width));
+  t.ToBytes(out_buf);
+  return ::arrow::Status::OK();
 }
 
-static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t 
byte_width,
-                                          uint8_t* out_buf) {
-  // view the first 8 bytes as an unsigned 64-bit integer
-  auto low = reinterpret_cast<uint64_t*>(out_buf);
+template <typename DecimalArrayType>
+struct DecimalTypeTrait;
 
-  // view the second 8 bytes as a signed 64-bit integer
-  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
-
-  // Convert the fixed size binary array bytes into a Decimal128 compatible 
layout
-  BytesToIntegerPair(value, byte_width, high, low);
-}
-
-template <typename T>
-Status ConvertToDecimal128(const Array& array, const 
std::shared_ptr<DataType>&,
-                           MemoryPool* pool, std::shared_ptr<Array>*) {
-  return Status::NotImplemented("not implemented");
-}
+template <>
+struct DecimalTypeTrait<::arrow::Decimal128Array> {
+  using value = ::arrow::Decimal128;
+};
 
 template <>
-Status ConvertToDecimal128<FLBAType>(const Array& array,
-                                     const std::shared_ptr<DataType>& type,
-                                     MemoryPool* pool, std::shared_ptr<Array>* 
out) {
-  const auto& fixed_size_binary_array =
-      static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
-
-  // The byte width of each decimal value
-  const int32_t type_length =
-      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
-
-  // number of elements in the entire array
-  const int64_t length = fixed_size_binary_array.length();
-
-  // Get the byte width of the values in the FixedSizeBinaryArray. Most of the 
time
-  // this will be different from the decimal array width because we write the 
minimum
-  // number of bytes necessary to represent a given precision
-  const int32_t byte_width =
-      static_cast<const 
::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
-          .byte_width();
-  if (byte_width < kMinDecimalBytes || byte_width > kMaxDecimalBytes) {
-    return Status::Invalid("Invalid FIXED_LEN_BYTE_ARRAY length for 
Decimal128");
+struct DecimalTypeTrait<::arrow::Decimal256Array> {
+  using value = ::arrow::Decimal256;
+};
+
+template <typename DecimalArrayType, typename ParquetType>
+struct DecimalConverter {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>&,
+                                        MemoryPool* pool, 
std::shared_ptr<Array>*) {
+    return Status::NotImplemented("not implemented");
   }
-
-  // allocate memory for the decimal array
-  ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * 
type_length, pool));
-
-  // raw bytes that we can write to
-  uint8_t* out_ptr = data->mutable_data();
-
-  // convert each FixedSizeBinary value to valid decimal bytes
-  const int64_t null_count = fixed_size_binary_array.null_count();
-  if (null_count > 0) {
-    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-      if (!fixed_size_binary_array.IsNull(i)) {
-        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), 
byte_width, out_ptr);
+};
+
+template <typename DecimalArrayType>
+struct DecimalConverter<DecimalArrayType, FLBAType> {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>& type,
+                                        MemoryPool* pool, 
std::shared_ptr<Array>* out) {
+    const auto& fixed_size_binary_array =
+        static_cast<const ::arrow::FixedSizeBinaryArray&>(array);

Review comment:
       Use `checked_cast`?

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -369,225 +371,134 @@ Status TransferBinary(RecordReader* reader, MemoryPool* 
pool,
 }
 
 // ----------------------------------------------------------------------
-// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
-
-static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t 
stop) {
-  const int32_t length = stop - start;
-
-  DCHECK_GE(length, 0);
-  DCHECK_LE(length, 8);
-
-  switch (length) {
-    case 0:
-      return 0;
-    case 1:
-      return bytes[start];
-    case 2:
-      return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-    case 3: {
-      const uint64_t first_two_bytes = 
FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_two_bytes << 8 | last_byte;
-    }
-    case 4:
-      return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-    case 5: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 8 | last_byte;
-    }
-    case 6: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      return first_four_bytes << 16 | last_two_bytes;
-    }
-    case 7: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t second_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
-    }
-    case 8:
-      return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
-    default: {
-      DCHECK(false);
-      return UINT64_MAX;
-    }
-  }
-}
-
-static constexpr int32_t kMinDecimalBytes = 1;
-static constexpr int32_t kMaxDecimalBytes = 16;
-
-/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) 
and one
-/// uint64_t (low bits).
-static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
-                               int64_t* out_high, uint64_t* out_low) {
-  DCHECK_GE(length, kMinDecimalBytes);
-  DCHECK_LE(length, kMaxDecimalBytes);
-
-  // XXX This code is copied from Decimal::FromBigEndian
-
-  int64_t high, low;
-
-  // Bytes are coming in big-endian, so the first byte is the MSB and 
therefore holds the
-  // sign bit.
-  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
-
-  // 1. Extract the high bytes
-  // Stop byte of the high bytes
-  const int32_t high_bits_offset = std::max(0, length - 8);
-  const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
-
-  if (high_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    high = high_bits;
-  } else {
-    high = -1 * (is_negative && length < kMaxDecimalBytes);
-    // Shift left enough bits to make room for the incoming int64_t
-    high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    high |= high_bits;
-  }
-
-  // 2. Extract the low bytes
-  // Stop byte of the low bytes
-  const int32_t low_bits_offset = std::min(length, 8);
-  const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
-
-  if (low_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    low = low_bits;
-  } else {
-    // Sign extend the low bits if necessary
-    low = -1 * (is_negative && length < 8);
-    // Shift left enough bits to make room for the incoming int64_t
-    low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    low |= low_bits;
-  }
-
-  *out_high = high;
-  *out_low = static_cast<uint64_t>(low);
+// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || 
Decimal256
+
+template <typename DecimalType>
+Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
+                              uint8_t* out_buf) {
+  ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, 
byte_width));
+  t.ToBytes(out_buf);
+  return ::arrow::Status::OK();
 }
 
-static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t 
byte_width,
-                                          uint8_t* out_buf) {
-  // view the first 8 bytes as an unsigned 64-bit integer
-  auto low = reinterpret_cast<uint64_t*>(out_buf);
+template <typename DecimalArrayType>
+struct DecimalTypeTrait;
 
-  // view the second 8 bytes as a signed 64-bit integer
-  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
-
-  // Convert the fixed size binary array bytes into a Decimal128 compatible 
layout
-  BytesToIntegerPair(value, byte_width, high, low);
-}
-
-template <typename T>
-Status ConvertToDecimal128(const Array& array, const 
std::shared_ptr<DataType>&,
-                           MemoryPool* pool, std::shared_ptr<Array>*) {
-  return Status::NotImplemented("not implemented");
-}
+template <>
+struct DecimalTypeTrait<::arrow::Decimal128Array> {
+  using value = ::arrow::Decimal128;
+};
 
 template <>
-Status ConvertToDecimal128<FLBAType>(const Array& array,
-                                     const std::shared_ptr<DataType>& type,
-                                     MemoryPool* pool, std::shared_ptr<Array>* 
out) {
-  const auto& fixed_size_binary_array =
-      static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
-
-  // The byte width of each decimal value
-  const int32_t type_length =
-      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
-
-  // number of elements in the entire array
-  const int64_t length = fixed_size_binary_array.length();
-
-  // Get the byte width of the values in the FixedSizeBinaryArray. Most of the 
time
-  // this will be different from the decimal array width because we write the 
minimum
-  // number of bytes necessary to represent a given precision
-  const int32_t byte_width =
-      static_cast<const 
::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
-          .byte_width();
-  if (byte_width < kMinDecimalBytes || byte_width > kMaxDecimalBytes) {
-    return Status::Invalid("Invalid FIXED_LEN_BYTE_ARRAY length for 
Decimal128");
+struct DecimalTypeTrait<::arrow::Decimal256Array> {
+  using value = ::arrow::Decimal256;
+};
+
+template <typename DecimalArrayType, typename ParquetType>
+struct DecimalConverter {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>&,
+                                        MemoryPool* pool, 
std::shared_ptr<Array>*) {
+    return Status::NotImplemented("not implemented");
   }
-
-  // allocate memory for the decimal array
-  ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * 
type_length, pool));
-
-  // raw bytes that we can write to
-  uint8_t* out_ptr = data->mutable_data();
-
-  // convert each FixedSizeBinary value to valid decimal bytes
-  const int64_t null_count = fixed_size_binary_array.null_count();
-  if (null_count > 0) {
-    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-      if (!fixed_size_binary_array.IsNull(i)) {
-        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), 
byte_width, out_ptr);
+};
+
+template <typename DecimalArrayType>
+struct DecimalConverter<DecimalArrayType, FLBAType> {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>& type,
+                                        MemoryPool* pool, 
std::shared_ptr<Array>* out) {
+    const auto& fixed_size_binary_array =
+        static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
+
+    // The byte width of each decimal value
+    const int32_t type_length =
+        static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+    // number of elements in the entire array
+    const int64_t length = fixed_size_binary_array.length();
+
+    // Get the byte width of the values in the FixedSizeBinaryArray. Most of 
the time
+    // this will be different from the decimal array width because we write 
the minimum
+    // number of bytes necessary to represent a given precision
+    const int32_t byte_width =
+        static_cast<const 
::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+            .byte_width();
+    // allocate memory for the decimal array
+    ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * 
type_length, pool));
+
+    // raw bytes that we can write to
+    uint8_t* out_ptr = data->mutable_data();
+
+    // convert each FixedSizeBinary value to valid decimal bytes
+    const int64_t null_count = fixed_size_binary_array.null_count();
+
+    using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
+    if (null_count > 0) {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        if (!fixed_size_binary_array.IsNull(i)) {

Review comment:
       You shouldn't leave the null slots uninitialized, otherwise it can leak 
previous data.

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -723,29 +628,47 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
       RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
       result = chunked_result;
     } break;
-    case ::arrow::Type::DECIMAL: {
+    case ::arrow::Type::DECIMAL128: {
       switch (descr->physical_type()) {
         case ::parquet::Type::INT32: {
-          RETURN_NOT_OK(
-              DecimalIntegerTransfer<Int32Type>(reader, pool, value_type, 
&result));
+          auto fn = DecimalIntegerTransfer<Int32Type>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         case ::parquet::Type::INT64: {
-          RETURN_NOT_OK(
-              DecimalIntegerTransfer<Int64Type>(reader, pool, value_type, 
&result));
+          auto fn = &DecimalIntegerTransfer<Int64Type>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         case ::parquet::Type::BYTE_ARRAY: {
-          RETURN_NOT_OK(
-              TransferDecimal<ByteArrayType>(reader, pool, value_type, 
&result));
+          auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
-          RETURN_NOT_OK(TransferDecimal<FLBAType>(reader, pool, value_type, 
&result));
+          auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         default:
           return Status::Invalid(
               "Physical type for decimal must be int32, int64, byte array, or 
fixed "
               "length binary");
       }
     } break;
+    case ::arrow::Type::DECIMAL256:
+      switch (descr->physical_type()) {
+        case ::parquet::Type::BYTE_ARRAY: {
+          auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+        } break;
+        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
+          auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+        } break;
+        default:
+          return Status::Invalid(
+              "Physical type for decimal must be int32, int64, byte array, or 
fixed "

Review comment:
       According to the above, the error message isn't accurate.

##########
File path: cpp/src/arrow/util/decimal_test.cc
##########
@@ -1333,6 +1333,45 @@ TEST(Decimal256Test, Multiply) {
   }
 }
 
+TEST(Decimal256Test, Shift) {
+  Decimal256 v(1024);

Review comment:
       1024 has only one bit set. Perhaps test with more interesting values?

##########
File path: cpp/src/parquet/arrow/reader_internal.cc
##########
@@ -599,7 +510,10 @@ template <
                                     std::is_same<ParquetIntegerType, 
Int64Type>::value>>
 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
                                      const std::shared_ptr<DataType>& type, 
Datum* out) {
-  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+  // Decimal256 isn't relevant here because this library never writes

Review comment:
       "This library" meaning parquet-cpp? But what do other implementations do?

##########
File path: cpp/src/arrow/util/decimal.cc
##########
@@ -718,6 +718,48 @@ Result<Decimal256> Decimal256::FromString(const char* s) {
   return FromString(util::string_view(s));
 }
 
+Result<Decimal256> Decimal256::FromBigEndian(const uint8_t* bytes, int32_t 
length) {
+  static constexpr int32_t kMinDecimalBytes = 1;
+  static constexpr int32_t kMaxDecimalBytes = 32;
+
+  std::array<uint64_t, 4> little_endian_array;
+
+  if (ARROW_PREDICT_FALSE(length < kMinDecimalBytes || length > 
kMaxDecimalBytes)) {
+    return Status::Invalid("Length of byte array passed to 
Decimal128::FromBigEndian ",
+                           "was ", length, ", but must be between ", 
kMinDecimalBytes,
+                           " and ", kMaxDecimalBytes);
+  }
+
+  // Bytes are coming in big-endian, so the first byte is the MSB and 
therefore holds the
+  // sign bit.
+  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
+
+  for (int word_idx = 0; word_idx < 4; word_idx++) {
+    const int32_t word_length = std::min(length, 
static_cast<int32_t>(sizeof(uint64_t)));
+
+    if (word_length == 8) {
+      // Full words can be assigned as is (and are UB with the shift below).
+      little_endian_array[word_idx] =
+          UInt64FromBigEndian(bytes + length - word_length, word_length);
+    } else {
+      // Sign extend the word its if necessary
+      uint64_t word = -1 * is_negative;
+      if (length > 0) {
+        // Incorporate the actual values if present.
+        // Shift left enough bits to make room for the incoming int64_t
+        word = SafeLeftShift(word, word_length * CHAR_BIT);
+        // Preserve the upper bits by inplace OR-ing the int64_t
+        word |= UInt64FromBigEndian(bytes + length - word_length, word_length);
+      }
+      little_endian_array[word_idx] = word;
+    }
+    // Move on to the next word.
+    length -= sizeof(uint64_t);

Review comment:
       Is it deliberate that `length` can become negative?

##########
File path: cpp/src/parquet/arrow/schema_internal.cc
##########
@@ -33,7 +33,10 @@ using ::arrow::internal::checked_cast;
 
 Result<std::shared_ptr<ArrowType>> MakeArrowDecimal(const LogicalType& 
logical_type) {
   const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
-  return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale());
+  if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) {
+    return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale());
+  }
+  return ::arrow::Decimal256Type::Make(decimal.precision(), decimal.scale());

Review comment:
       While we are at it, can we catch cases where the precision doesn't fit 
in `Decimal256` and error out?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to