[ 
https://issues.apache.org/jira/browse/PARQUET-1095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258801#comment-16258801
 ] 

ASF GitHub Bot commented on PARQUET-1095:
-----------------------------------------

wesm closed pull request #403: PARQUET-1095: [C++] Read and write Arrow decimal 
values
URL: https://github.com/apache/parquet-cpp/pull/403
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5aa8b930..c524ceb5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -460,6 +460,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang")
   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CLANG_OPTIONS}")
 endif()
 
+if ("${COMPILER_FAMILY}" STREQUAL "msvc")
+  # MSVC version of -Wno-deprecated
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996")
+endif()
+
 ############################################################
 # "make lint" target
 ############################################################
diff --git a/cmake_modules/ThirdpartyToolchain.cmake 
b/cmake_modules/ThirdpartyToolchain.cmake
index a470fc18..fe1d4999 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -366,7 +366,7 @@ if (NOT ARROW_FOUND)
     -DARROW_BUILD_TESTS=OFF)
 
   if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
-    set(ARROW_VERSION "0e21f84c2fc26dba949a03ee7d7ebfade0a65b81")  # Arrow 
0.7.1
+    set(ARROW_VERSION "f2806fa518583907a129b2ecb0b7ec8758b69e17")
   else()
     set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
   endif()
diff --git a/data/fixed_length_decimal.parquet 
b/data/fixed_length_decimal.parquet
new file mode 100644
index 00000000..69fce531
Binary files /dev/null and b/data/fixed_length_decimal.parquet differ
diff --git a/data/fixed_length_decimal_legacy.parquet 
b/data/fixed_length_decimal_legacy.parquet
new file mode 100644
index 00000000..b0df62a2
Binary files /dev/null and b/data/fixed_length_decimal_legacy.parquet differ
diff --git a/data/int32_decimal.parquet b/data/int32_decimal.parquet
new file mode 100644
index 00000000..5bf2d4ea
Binary files /dev/null and b/data/int32_decimal.parquet differ
diff --git a/data/int64_decimal.parquet b/data/int64_decimal.parquet
new file mode 100644
index 00000000..5043bcac
Binary files /dev/null and b/data/int64_decimal.parquet differ
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc 
b/src/parquet/arrow/arrow-reader-writer-test.cc
index a18c5650..0e0831ec 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,6 +24,7 @@
 #include "gtest/gtest.h"
 
 #include <sstream>
+#include <arrow/compute/api.h>
 
 #include "parquet/api/reader.h"
 #include "parquet/api/writer.h"
@@ -37,13 +38,13 @@
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
+#include "arrow/util/decimal.h"
 
 using arrow::Array;
 using arrow::ArrayVisitor;
 using arrow::Buffer;
 using arrow::ChunkedArray;
 using arrow::Column;
-using arrow::EncodeArrayToDictionary;
 using arrow::ListArray;
 using arrow::PoolBuffer;
 using arrow::PrimitiveArray;
@@ -51,6 +52,9 @@ using arrow::Status;
 using arrow::Table;
 using arrow::TimeUnit;
 using arrow::default_memory_pool;
+using arrow::compute::DictionaryEncode;
+using arrow::compute::FunctionContext;
+using arrow::compute::Datum;
 using arrow::io::BufferReader;
 
 using arrow::test::randint;
@@ -68,10 +72,10 @@ using ColumnVector = 
std::vector<std::shared_ptr<arrow::Column>>;
 namespace parquet {
 namespace arrow {
 
-const int SMALL_SIZE = 100;
-const int LARGE_SIZE = 10000;
+static constexpr int SMALL_SIZE = 100;
+static constexpr int LARGE_SIZE = 10000;
 
-constexpr uint32_t kDefaultSeed = 0;
+static constexpr uint32_t kDefaultSeed = 0;
 
 LogicalType::type get_logical_type(const ::arrow::DataType& type) {
   switch (type.id()) {
@@ -118,6 +122,8 @@ LogicalType::type get_logical_type(const ::arrow::DataType& 
type) {
           static_cast<const ::arrow::DictionaryType&>(type);
       return get_logical_type(*dict_type.dictionary()->type());
     }
+    case ArrowId::DECIMAL:
+      return LogicalType::DECIMAL;
     default:
       break;
   }
@@ -147,6 +153,7 @@ ParquetType::type get_physical_type(const 
::arrow::DataType& type) {
     case ArrowId::STRING:
       return ParquetType::BYTE_ARRAY;
     case ArrowId::FIXED_SIZE_BINARY:
+    case ArrowId::DECIMAL:
       return ParquetType::FIXED_LEN_BYTE_ARRAY;
     case ArrowId::DATE32:
       return ParquetType::INT32;
@@ -299,6 +306,7 @@ struct test_traits<::arrow::FixedSizeBinaryType> {
 const std::string test_traits<::arrow::StringType>::value("Test");             
 // NOLINT
 const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); 
 // NOLINT
 const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");   
 // NOLINT
+
 template <typename T>
 using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 
@@ -342,28 +350,44 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& 
table, int num_threads,
 
 static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& 
type,
                                                    Repetition::type 
repetition) {
-  int byte_width;
-  // Decimal is not implemented yet.
+  int32_t byte_width = -1;
+  int32_t precision = -1;
+  int32_t scale = -1;
+
   switch (type.id()) {
     case ::arrow::Type::DICTIONARY: {
-      const ::arrow::DictionaryType& dict_type =
-          static_cast<const ::arrow::DictionaryType&>(type);
+      const auto& dict_type = static_cast<const 
::arrow::DictionaryType&>(type);
       const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
-      if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) {
-        byte_width =
-            static_cast<const 
::arrow::FixedSizeBinaryType&>(values_type).byte_width();
-      } else {
-        byte_width = -1;
+      switch (values_type.id()) {
+        case ::arrow::Type::FIXED_SIZE_BINARY:
+          byte_width =
+              static_cast<const 
::arrow::FixedSizeBinaryType&>(values_type).byte_width();
+          break;
+        case ::arrow::Type::DECIMAL: {
+          const auto& decimal_type =
+              static_cast<const ::arrow::Decimal128Type&>(values_type);
+          precision = decimal_type.precision();
+          scale = decimal_type.scale();
+          byte_width = DecimalSize(precision);
+        } break;
+        default:
+          break;
       }
     } break;
     case ::arrow::Type::FIXED_SIZE_BINARY:
       byte_width = static_cast<const 
::arrow::FixedSizeBinaryType&>(type).byte_width();
       break;
+    case ::arrow::Type::DECIMAL: {
+      const auto& decimal_type = static_cast<const 
::arrow::Decimal128Type&>(type);
+      precision = decimal_type.precision();
+      scale = decimal_type.scale();
+      byte_width = DecimalSize(precision);
+    } break;
     default:
-      byte_width = -1;
+      break;
   }
   auto pnode = PrimitiveNode::Make("column1", repetition, 
get_physical_type(type),
-                                   get_logical_type(type), byte_width);
+                                   get_logical_type(type), byte_width, 
precision, scale);
   NodePtr node_ =
       GroupNode::Make("schema", Repetition::REQUIRED, 
std::vector<NodePtr>({pnode}));
   return std::static_pointer_cast<GroupNode>(node_);
@@ -371,7 +395,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const 
::arrow::DataType& type
 
 namespace internal {
 
-void AssertArraysEqual(const Array &expected, const Array &actual) {
+void AssertArraysEqual(const Array& expected, const Array& actual) {
   if (!actual.Equals(expected)) {
     std::stringstream pp_result;
     std::stringstream pp_expected;
@@ -526,11 +550,19 @@ class TestParquetIO : public ::testing::Test {
 // There we write an UInt32 Array but receive an Int64 Array as result for
 // Parquet version 1.0.
 
-typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, 
::arrow::Int8Type,
-                         ::arrow::UInt16Type, ::arrow::Int16Type, 
::arrow::Int32Type,
-                         ::arrow::UInt64Type, ::arrow::Int64Type, 
::arrow::Date32Type,
-                         ::arrow::FloatType, ::arrow::DoubleType, 
::arrow::StringType,
-                         ::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
+typedef ::testing::Types<
+    ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, 
::arrow::UInt16Type,
+    ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, 
::arrow::Int64Type,
+    ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, 
::arrow::StringType,
+    ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, 
DecimalWithPrecisionAndScale<1>,
+    DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>,
+    DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>,
+    DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>,
+    DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>,
+    DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>,
+    DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>,
+    DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>,
+    DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>>
     TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);
@@ -590,8 +622,10 @@ TYPED_TEST(TestParquetIO, 
SingleColumnOptionalDictionaryWrite) {
 
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
-  std::shared_ptr<Array> dict_values;
-  ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), 
&dict_values));
+  Datum out;
+  FunctionContext ctx(default_memory_pool());
+  ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out));
+  std::shared_ptr<Array> dict_values = MakeArray(out.array());
   std::shared_ptr<GroupNode> schema =
       MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, dict_values);
@@ -856,25 +890,43 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
   ASSERT_OK_NO_THROW(
       WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, 
properties));
 
-  std::shared_ptr<Array> expected_values;
   std::shared_ptr<PoolBuffer> int64_data =
       std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
   {
     ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
-    int64_t* int64_data_ptr = 
reinterpret_cast<int64_t*>(int64_data->mutable_data());
-    const uint32_t* uint32_data_ptr =
-        reinterpret_cast<const uint32_t*>(values->values()->data());
-    // std::copy might be faster but this is explicit on the casts)
-    for (int64_t i = 0; i < values->length(); i++) {
-      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
-    }
+    auto int64_data_ptr = 
reinterpret_cast<int64_t*>(int64_data->mutable_data());
+    auto uint32_data_ptr = reinterpret_cast<const 
uint32_t*>(values->values()->data());
+    const auto cast_uint32_to_int64 = [](uint32_t value) {
+      return static_cast<int64_t>(value);
+    };
+    std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), 
int64_data_ptr,
+                   cast_uint32_to_int64);
   }
 
   std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), 
int64_data};
   auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), 
values->length(),
                                                        buffers, 
values->null_count());
-  ASSERT_OK(MakeArray(arr_data, &expected_values));
-  this->ReadAndCheckSingleColumnTable(expected_values);
+  std::shared_ptr<Array> expected_values = MakeArray(arr_data);
+  ASSERT_NE(expected_values, NULLPTR);
+
+  const auto& expected = static_cast<const 
::arrow::Int64Array&>(*expected_values);
+  ASSERT_GT(values->length(), 0);
+  ASSERT_EQ(values->length(), expected.length());
+
+  // TODO(phillipc): Is there a better way to compare these two arrays?
+  // AssertArraysEqual requires the same type, but we only care about values 
in this case
+  for (int i = 0; i < expected.length(); ++i) {
+    const bool value_is_valid = values->IsValid(i);
+    const bool expected_value_is_valid = expected.IsValid(i);
+
+    ASSERT_EQ(expected_value_is_valid, value_is_valid);
+
+    if (value_is_valid) {
+      uint32_t value = values->Value(i);
+      int64_t expected_value = expected.Value(i);
+      ASSERT_EQ(expected_value, static_cast<int64_t>(value));
+    }
+  }
 }
 
 using TestStringParquetIO = TestParquetIO<::arrow::StringType>;
@@ -1432,7 +1484,7 @@ void MakeListTable(int num_rows, std::shared_ptr<Table>* 
out) {
   offset_values.push_back(total_elements);
 
   std::vector<int8_t> value_draws;
-  randint<int8_t>(total_elements, 0, 100, &value_draws);
+  randint(total_elements, 0, 100, &value_draws);
 
   std::vector<bool> is_valid;
   random_is_valid(total_elements, 0.1, &is_valid);
@@ -1889,6 +1941,61 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
   ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
 }
 
+class TestArrowReaderAdHocSpark
+    : public ::testing::TestWithParam<
+          std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};
+
+TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
+  std::string path(std::getenv("PARQUET_TEST_DATA"));
+
+  std::string filename;
+  std::shared_ptr<::arrow::DataType> decimal_type;
+  std::tie(filename, decimal_type) = GetParam();
+
+  path += "/" + filename;
+  ASSERT_GT(path.size(), 0);
+
+  auto pool = ::arrow::default_memory_pool();
+
+  std::unique_ptr<FileReader> arrow_reader;
+  ASSERT_NO_THROW(
+      arrow_reader.reset(new FileReader(pool, 
ParquetFileReader::OpenFile(path, false))));
+  std::shared_ptr<::arrow::Table> table;
+  ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
+
+  ASSERT_EQ(1, table->num_columns());
+
+  constexpr int32_t expected_length = 24;
+
+  auto value_column = table->column(0);
+  ASSERT_EQ(expected_length, value_column->length());
+
+  auto raw_array = value_column->data();
+  ASSERT_EQ(1, raw_array->num_chunks());
+
+  auto chunk = raw_array->chunk(0);
+
+  std::shared_ptr<Array> expected_array;
+
+  ::arrow::Decimal128Builder builder(decimal_type, pool);
+
+  for (int32_t i = 0; i < expected_length; ++i) {
+    ::arrow::Decimal128 value((i + 1) * 100);
+    ASSERT_OK(builder.Append(value));
+  }
+  ASSERT_OK(builder.Finish(&expected_array));
+
+  internal::AssertArraysEqual(*expected_array, *chunk);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    ReadDecimals, TestArrowReaderAdHocSpark,
+    ::testing::Values(
+        std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
+        std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
+        std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 
2)),
+        std::make_tuple("fixed_length_decimal_legacy.parquet", 
::arrow::decimal(13, 2))));
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/src/parquet/arrow/arrow-schema-test.cc 
b/src/parquet/arrow/arrow-schema-test.cc
index a7a62c57..7ed9ad83 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -51,7 +51,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI);
 const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
 const auto BINARY = ::arrow::binary();
-const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4);
 
 class TestConvertParquetSchema : public ::testing::Test {
  public:
@@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
       auto lhs = result_schema_->field(i);
       auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
-                                    << " != " << rhs->ToString();
+      EXPECT_TRUE(lhs->Equals(rhs))
+          << i << " " << lhs->ToString() << " != " << rhs->ToString();
     }
   }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 5edc8379..3ca49cb4 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -29,6 +29,7 @@
 
 #include "arrow/api.h"
 #include "arrow/util/bit-util.h"
+#include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/parallel.h"
 
@@ -716,7 +717,8 @@ struct supports_fast_path_impl<ArrowType, FLBAType> {
 
 template <typename ArrowType, typename ParquetType>
 using supports_fast_path =
-    typename std::enable_if<supports_fast_path_impl<ArrowType, 
ParquetType>::value>::type;
+    typename std::enable_if<supports_fast_path_impl<ArrowType, 
ParquetType>::value,
+                            ParquetType>::type;
 
 template <typename ArrowType, typename ParquetType, typename Enable = void>
 struct TransferFunctor {
@@ -868,12 +870,248 @@ struct TransferFunctor<
       // Convert from BINARY type to STRING
       auto new_data = (*out)->data()->ShallowCopy();
       new_data->type = type;
-      RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+      *out = ::arrow::MakeArray(new_data);
     }
     return Status::OK();
   }
 };
 
+static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t 
stop) {
+  using ::arrow::BitUtil::FromBigEndian;
+
+  const int32_t length = stop - start;
+
+  DCHECK_GE(length, 0);
+  DCHECK_LE(length, 8);
+
+  switch (length) {
+    case 0:
+      return 0;
+    case 1:
+      return bytes[start];
+    case 2:
+      return FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
+    case 3: {
+      const uint64_t first_two_bytes =
+          FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_two_bytes << 8 | last_byte;
+    }
+    case 4:
+      return FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+    case 5: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_four_bytes << 8 | last_byte;
+    }
+    case 6: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+      const uint64_t last_two_bytes =
+          FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
+      return first_four_bytes << 16 | last_two_bytes;
+    }
+    case 7: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+      const uint64_t second_two_bytes =
+          FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
+    }
+    case 8:
+      return FromBigEndian(*reinterpret_cast<const uint64_t*>(bytes + start));
+    default: {
+      DCHECK(false);
+      return UINT64_MAX;
+    }
+  }
+}
+
+static constexpr int32_t kMinDecimalBytes = 1;
+static constexpr int32_t kMaxDecimalBytes = 16;
+
+/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) 
and one
+/// uint64_t (low bits).
+static void BytesToIntegerPair(const uint8_t* bytes,
+                               const int32_t total_number_of_bytes_used, 
int64_t* high,
+                               uint64_t* low) {
+  DCHECK_GE(total_number_of_bytes_used, kMinDecimalBytes);
+  DCHECK_LE(total_number_of_bytes_used, kMaxDecimalBytes);
+
+  /// Bytes are coming in big-endian, so the first byte is the MSB and 
therefore holds the
+  /// sign bit.
+  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
+
+  /// Sign extend the low bits if necessary
+  *low = UINT64_MAX * (is_negative && total_number_of_bytes_used < 8);
+  *high = -1 * (is_negative && total_number_of_bytes_used < kMaxDecimalBytes);
+
+  /// Stop byte of the high bytes
+  const int32_t high_bits_offset = std::max(0, total_number_of_bytes_used - 8);
+
+  /// Shift left enough bits to make room for the incoming int64_t
+  *high <<= high_bits_offset * CHAR_BIT;
+
+  /// Preserve the upper bits by inplace OR-ing the int64_t
+  *high |= BytesToInteger(bytes, 0, high_bits_offset);
+
+  /// Stop byte of the low bytes
+  const int32_t low_bits_offset = std::min(total_number_of_bytes_used, 8);
+
+  /// Shift left enough bits to make room for the incoming uint64_t
+  *low <<= low_bits_offset * CHAR_BIT;
+
+  /// Preserve the upper bits by inplace OR-ing the uint64_t
+  *low |= BytesToInteger(bytes, high_bits_offset, total_number_of_bytes_used);
+}
+
+static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t 
byte_width,
+                                          uint8_t* out_buf) {
+  // view the first 8 bytes as an unsigned 64-bit integer
+  auto low = reinterpret_cast<uint64_t*>(out_buf);
+
+  // view the second 8 bytes as a signed 64-bit integer
+  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
+
+  // Convert the fixed size binary array bytes into a Decimal128 compatible 
layout
+  BytesToIntegerPair(value, byte_width, high, low);
+}
+
+/// \brief Convert an array of FixedLenByteArrays to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two 
integers
+///    representing the high and low bits of each decimal value.
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, FLBAType> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+    // Finish the built data into a temporary array
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(reader->builder()->Finish(&array));
+    const auto& fixed_size_binary_array =
+        static_cast<const ::arrow::FixedSizeBinaryArray&>(*array);
+
+    // Get the byte width of the values in the FixedSizeBinaryArray. Most of 
the time
+    // this will be different from the decimal array width because we write 
the minimum
+    // number of bytes necessary to represent a given precision
+    const int32_t byte_width =
+        static_cast<const 
::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+            .byte_width();
+
+    // The byte width of each decimal value
+    const int32_t type_length =
+        static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+    // number of elements in the entire array
+    const int64_t length = fixed_size_binary_array.length();
+
+    // allocate memory for the decimal array
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+
+    // raw bytes that we can write to
+    uint8_t* out_ptr = data->mutable_data();
+
+    // convert each FixedSizeBinary value to valid decimal bytes
+    const int64_t null_count = fixed_size_binary_array.null_count();
+    if (null_count > 0) {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        if (!fixed_size_binary_array.IsNull(i)) {
+          RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), 
byte_width,
+                                 out_ptr);
+        }
+      }
+    } else {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), 
byte_width, out_ptr);
+      }
+    }
+
+    *out = std::make_shared<::arrow::Decimal128Array>(
+        type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
+    return Status::OK();
+  }
+};
+
+/// \brief Convert an Int32 or Int64 array into a Decimal128Array
+/// The parquet spec allows systems to write decimals in int32, int64 if the 
values are
+/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
+/// This function implements the conversion from int32 and int64 arrays to 
decimal arrays.
+template <typename ParquetIntegerType,
+          typename = typename std::enable_if<
+              std::is_same<ParquetIntegerType, Int32Type>::value ||
+              std::is_same<ParquetIntegerType, Int64Type>::value>::type>
+static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
+                                     const std::shared_ptr<::arrow::DataType>& 
type,
+                                     std::shared_ptr<Array>* out) {
+  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+  const int64_t length = reader->values_written();
+
+  using ElementType = typename ParquetIntegerType::c_type;
+  static_assert(std::is_same<ElementType, int32_t>::value ||
+                    std::is_same<ElementType, int64_t>::value,
+                "ElementType must be int32_t or int64_t");
+
+  const auto values = reinterpret_cast<const ElementType*>(reader->values());
+
+  const auto& decimal_type = static_cast<const 
::arrow::Decimal128Type&>(*type);
+  const int64_t type_length = decimal_type.byte_width();
+
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+  uint8_t* out_ptr = data->mutable_data();
+
+  using ::arrow::BitUtil::FromLittleEndian;
+
+  for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+    // sign/zero extend int32_t values, otherwise a no-op
+    const auto value = static_cast<int64_t>(values[i]);
+
+    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+
+    // No-op on little endian machines, byteswap on big endian
+    out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
+
+    // no need to byteswap here because we're sign/zero extending exactly 8 
bytes
+    out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
+  }
+
+  if (reader->nullable_values()) {
+    std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, 
is_valid,
+                                                      reader->null_count());
+  } else {
+    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
+  }
+  return Status::OK();
+}
+
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out);
+  }
+};
+
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out);
+  }
+};
+
 #define TRANSFER_DATA(ArrowType, ParquetType)                            \
   TransferFunctor<ArrowType, ParquetType> func;                          \
   RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
@@ -932,6 +1170,22 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, 
std::shared_ptr<Array>*
     TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
     TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
     TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
+    case ::arrow::Type::DECIMAL: {
+      switch (descr_->physical_type()) {
+        case ::parquet::Type::INT32: {
+          TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
+        } break;
+        case ::parquet::Type::INT64: {
+          TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
+        } break;
+        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
+          TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
+        } break;
+        default:
+          return Status::Invalid(
+              "Physical type for decimal must be int32, int64, or fixed length 
binary");
+      }
+    } break;
     case ::arrow::Type::TIMESTAMP: {
       ::arrow::TimestampType* timestamp_type =
           static_cast<::arrow::TimestampType*>(field_->type().get());
@@ -946,8 +1200,7 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, 
std::shared_ptr<Array>*
         default:
           return Status::NotImplemented("TimeUnit not supported");
       }
-      break;
-    }
+    } break;
       TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
       TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
     default:
diff --git a/src/parquet/arrow/record_reader.cc 
b/src/parquet/arrow/record_reader.cc
index 7275d2f8..6405ee77 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -83,7 +83,7 @@ class RecordReader::RecordReaderImpl {
     Reset();
   }
 
-  virtual ~RecordReaderImpl() {}
+  virtual ~RecordReaderImpl() = default;
 
   virtual int64_t ReadRecords(int64_t num_records) = 0;
 
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index e16a1afb..41da9fb3 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -25,6 +25,7 @@
 #include "parquet/util/schema-util.h"
 
 #include "arrow/api.h"
+#include "arrow/util/logging.h"
 
 using arrow::Field;
 using arrow::Status;
@@ -49,10 +50,9 @@ const auto TIMESTAMP_MS = 
::arrow::timestamp(::arrow::TimeUnit::MILLI);
 const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
 
-TypePtr MakeDecimalType(const PrimitiveNode& node) {
-  int precision = node.decimal_metadata().precision;
-  int scale = node.decimal_metadata().scale;
-  return std::make_shared<::arrow::DecimalType>(precision, scale);
+TypePtr MakeDecimal128Type(const PrimitiveNode& node) {
+  const auto& metadata = node.decimal_metadata();
+  return ::arrow::decimal(metadata.precision, metadata.scale);
 }
 
 static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) {
@@ -61,7 +61,7 @@ static Status FromByteArray(const PrimitiveNode& node, 
TypePtr* out) {
       *out = ::arrow::utf8();
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     default:
       // BINARY
@@ -77,7 +77,7 @@ static Status FromFLBA(const PrimitiveNode& node, TypePtr* 
out) {
       *out = ::arrow::fixed_size_binary(node.type_length());
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     default:
       std::stringstream ss;
@@ -120,7 +120,7 @@ static Status FromInt32(const PrimitiveNode& node, TypePtr* 
out) {
       *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     default:
       std::stringstream ss;
@@ -144,7 +144,7 @@ static Status FromInt64(const PrimitiveNode& node, TypePtr* 
out) {
       *out = ::arrow::uint64();
       break;
     case LogicalType::DECIMAL:
-      *out = MakeDecimalType(node);
+      *out = MakeDecimal128Type(node);
       break;
     case LogicalType::TIMESTAMP_MILLIS:
       *out = TIMESTAMP_MS;
@@ -473,7 +473,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
   ParquetType::type type;
   Repetition::type repetition =
       field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED;
+
   int length = -1;
+  int precision = -1;
+  int scale = -1;
 
   switch (field->type()->id()) {
     case ArrowType::NA:
@@ -532,9 +535,18 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       break;
     case ArrowType::FIXED_SIZE_BINARY: {
       type = ParquetType::FIXED_LEN_BYTE_ARRAY;
-      auto fixed_size_binary_type =
-          static_cast<::arrow::FixedSizeBinaryType*>(field->type().get());
-      length = fixed_size_binary_type->byte_width();
+      const auto& fixed_size_binary_type =
+          static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
+      length = fixed_size_binary_type.byte_width();
+    } break;
+    case ArrowType::DECIMAL: {
+      type = ParquetType::FIXED_LEN_BYTE_ARRAY;
+      logical_type = LogicalType::DECIMAL;
+      const auto& decimal_type =
+          static_cast<const ::arrow::Decimal128Type&>(*field->type());
+      precision = decimal_type.precision();
+      scale = decimal_type.scale();
+      length = DecimalSize(precision);
     } break;
     case ArrowType::DATE32:
       type = ParquetType::INT32;
@@ -565,12 +577,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       auto struct_type = 
std::static_pointer_cast<::arrow::StructType>(field->type());
       return StructToNode(struct_type, field->name(), field->nullable(), 
properties,
                           arrow_properties, out);
-    } break;
+    }
     case ArrowType::LIST: {
       auto list_type = 
std::static_pointer_cast<::arrow::ListType>(field->type());
       return ListToNode(list_type, field->name(), field->nullable(), 
properties,
                         arrow_properties, out);
-    } break;
+    }
     case ArrowType::DICTIONARY: {
       // Parquet has no Dictionary type, dictionary-encoded is handled on
       // the encoding, not the schema level.
@@ -582,14 +594,15 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       return FieldToNode(unpacked_field, properties, arrow_properties, out);
     }
     default: {
-      // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, 
VARCHAR
+      // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR
       std::stringstream ss;
       ss << "Unhandled type for Arrow to Parquet schema conversion: ";
       ss << field->type()->ToString();
       return Status::NotImplemented(ss.str());
     }
   }
-  *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, 
length);
+  *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, 
length,
+                             precision, scale);
   return Status::OK();
 }
 
@@ -617,5 +630,77 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
                          out);
 }
 
+/// \brief Compute the number of bytes required to represent a decimal of a
+/// given precision. Taken from the Apache Impala codebase. The comments next
+/// to the return values are the maximum value that can be represented in 2's
+/// complement with the returned number of bytes.
+int32_t DecimalSize(int32_t precision) {
+  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal 
to 1, got "
+                          << precision;
+  DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 
38, got "
+                           << precision;
+
+  switch (precision) {
+    case 1:
+    case 2:
+      return 1;  // 127
+    case 3:
+    case 4:
+      return 2;  // 32,767
+    case 5:
+    case 6:
+      return 3;  // 8,388,607
+    case 7:
+    case 8:
+    case 9:
+      return 4;  // 2,147,483,427
+    case 10:
+    case 11:
+      return 5;  // 549,755,813,887
+    case 12:
+    case 13:
+    case 14:
+      return 6;  // 140,737,488,355,327
+    case 15:
+    case 16:
+      return 7;  // 36,028,797,018,963,967
+    case 17:
+    case 18:
+      return 8;  // 9,223,372,036,854,775,807
+    case 19:
+    case 20:
+    case 21:
+      return 9;  // 2,361,183,241,434,822,606,847
+    case 22:
+    case 23:
+      return 10;  // 604,462,909,807,314,587,353,087
+    case 24:
+    case 25:
+    case 26:
+      return 11;  // 154,742,504,910,672,534,362,390,527
+    case 27:
+    case 28:
+      return 12;  // 39,614,081,257,132,168,796,771,975,167
+    case 29:
+    case 30:
+    case 31:
+      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
+    case 32:
+    case 33:
+      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
+    case 34:
+    case 35:
+      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
+    case 36:
+    case 37:
+    case 38:
+      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
+    default:
+      DCHECK(false);
+      break;
+  }
+  return -1;
+}
+
 }  // namespace arrow
 }  // namespace parquet
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index de153eb7..3b212da7 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -85,6 +85,8 @@ ::arrow::Status PARQUET_EXPORT ToParquetSchema(const 
::arrow::Schema* arrow_sche
                                                const WriterProperties& 
properties,
                                                
std::shared_ptr<SchemaDescriptor>* out);
 
+int32_t DecimalSize(int32_t precision);
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 954a84f3..8611a303 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <limits>
 #include <string>
 #include <vector>
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
 #include "arrow/type_traits.h"
+#include "arrow/util/decimal.h"
 
 namespace parquet {
 namespace arrow {
@@ -28,6 +30,16 @@ namespace arrow {
 using ::arrow::Array;
 using ::arrow::Status;
 
+template <int32_t PRECISION>
+struct DecimalWithPrecisionAndScale {
+  static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value");
+
+  using type = ::arrow::Decimal128Type;
+  static constexpr ::arrow::Type::type type_id = 
::arrow::Decimal128Type::type_id;
+  static constexpr int32_t precision = PRECISION;
+  static constexpr int32_t scale = PRECISION - 1;
+};
+
 template <typename ArrowType>
 using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
 
@@ -52,8 +64,10 @@ using is_arrow_bool = std::is_same<ArrowType, 
::arrow::BooleanType>;
 template <class ArrowType>
 typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type 
NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
-  std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, 
&values);
+  using c_type = typename ArrowType::c_type;
+  std::vector<c_type> values;
+  ::arrow::test::random_real(size, 0, static_cast<c_type>(0), 
static_cast<c_type>(1),
+                             &values);
   ::arrow::NumericBuilder<ArrowType> builder;
   RETURN_NOT_OK(builder.Append(values.data(), values.size()));
   return builder.Finish(out);
@@ -64,7 +78,7 @@ typename std::enable_if<
     is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, 
Status>::type
 NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
 
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
@@ -77,7 +91,7 @@ template <class ArrowType>
 typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type 
NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
   for (size_t i = 0; i < size; i++) {
     values[i] *= 86400000;
   }
@@ -114,11 +128,54 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   return builder.Finish(out);
 }
 
+static inline void random_decimals(int64_t n, uint32_t seed, int32_t precision,
+                                   uint8_t* out) {
+  std::mt19937 gen(seed);
+  std::uniform_int_distribution<uint32_t> d(0, 
std::numeric_limits<uint8_t>::max());
+  const int32_t required_bytes = DecimalSize(precision);
+  constexpr int32_t byte_width = 16;
+  std::fill(out, out + byte_width * n, '\0');
+
+  for (int64_t i = 0; i < n; ++i, out += byte_width) {
+    std::generate(out, out + required_bytes,
+                  [&d, &gen] { return static_cast<uint8_t>(d(gen)); });
+
+    // sign extend if the sign bit is set for the last byte generated
+    // 0b10000000 == 0x80 == 128
+    if ((out[required_bytes - 1] & '\x80') != 0) {
+      std::fill(out + required_bytes, out + byte_width, '\xFF');
+    }
+  }
+}
+
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+typename std::enable_if<
+    std::is_same<ArrowType, DecimalWithPrecisionAndScale<precision>>::value, 
Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+  constexpr int32_t kDecimalPrecision = precision;
+  constexpr int32_t kDecimalScale = 
DecimalWithPrecisionAndScale<precision>::scale;
+
+  const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale);
+  ::arrow::Decimal128Builder builder(type);
+  const int32_t byte_width =
+      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+  constexpr int32_t seed = 0;
+
+  std::shared_ptr<Buffer> out_buf;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * 
byte_width,
+                                        &out_buf));
+  random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
+
+  RETURN_NOT_OK(builder.Append(out_buf->data(), size));
+  return builder.Finish(out);
+}
+
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type 
NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
   std::vector<uint8_t> values;
-  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  ::arrow::test::randint(size, 0, 1, &values);
   ::arrow::BooleanBuilder builder;
   RETURN_NOT_OK(builder.Append(values.data(), values.size()));
   return builder.Finish(out);
@@ -128,9 +185,10 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, 
Status>::type NonNullAr
 template <typename ArrowType>
 typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type 
NullableArray(
     size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) 
{
-  std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::random_real<typename ArrowType::c_type>(size, seed, -1e10, 
1e10,
-                                                         &values);
+  using c_type = typename ArrowType::c_type;
+  std::vector<c_type> values;
+  ::arrow::test::random_real(size, seed, static_cast<c_type>(-1e10),
+                             static_cast<c_type>(1e10), &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -151,7 +209,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, 
std::shared_ptr<Arra
 
   // Seed is random in Arrow right now
   (void)seed;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -172,7 +230,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, 
Status>::type NullableA
 
   // Seed is random in Arrow right now
   (void)seed;
-  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::test::randint(size, 0, 64, &values);
   for (size_t i = 0; i < size; i++) {
     values[i] *= 86400000;
   }
@@ -246,6 +304,34 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,
   return builder.Finish(out);
 }
 
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+typename std::enable_if<
+    std::is_same<ArrowType, DecimalWithPrecisionAndScale<precision>>::value, 
Status>::type
+NullableArray(size_t size, size_t num_nulls, uint32_t seed,
+              std::shared_ptr<::arrow::Array>* out) {
+  std::vector<uint8_t> valid_bytes(size, '\1');
+
+  for (size_t i = 0; i < num_nulls; ++i) {
+    valid_bytes[i * 2] = '\0';
+  }
+
+  constexpr int32_t kDecimalPrecision = precision;
+  constexpr int32_t kDecimalScale = 
DecimalWithPrecisionAndScale<precision>::scale;
+  const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale);
+  const int32_t byte_width =
+      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+  std::shared_ptr<::arrow::Buffer> out_buf;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * 
byte_width,
+                                        &out_buf));
+
+  random_decimals(size, seed, precision, out_buf->mutable_data());
+
+  ::arrow::Decimal128Builder builder(type);
+  RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data()));
+  return builder.Finish(out);
+}
+
 // This helper function only supports (size/2) nulls yet.
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type 
NullableArray(
@@ -255,7 +341,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, 
Status>::type NullableA
   // Seed is random in Arrow right now
   (void)seed;
 
-  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  ::arrow::test::randint(size, 0, 1, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index b53c1cac..1f3fc7e6 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -32,6 +32,7 @@
 using arrow::Array;
 using arrow::BinaryArray;
 using arrow::FixedSizeBinaryArray;
+using arrow::Decimal128Array;
 using arrow::BooleanArray;
 using arrow::Int16Array;
 using arrow::Int16Builder;
@@ -104,7 +105,6 @@ class LevelBuilder {
 
   NOT_IMPLEMENTED_VISIT(Struct)
   NOT_IMPLEMENTED_VISIT(Union)
-  NOT_IMPLEMENTED_VISIT(Decimal)
   NOT_IMPLEMENTED_VISIT(Dictionary)
   NOT_IMPLEMENTED_VISIT(Interval)
 
@@ -743,8 +743,6 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, 
::arrow::BinaryType>(
       buffer_ptr[i] =
           ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + 
value_offset[i]);
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   } else {
     int buffer_idx = 0;
     for (int64_t i = 0; i < data->length(); i++) {
@@ -753,9 +751,9 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, 
::arrow::BinaryType>(
             ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + 
value_offset[i]);
       }
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
@@ -765,29 +763,82 @@ Status FileWriter::Impl::TypedWriteBatch<FLBAType, 
::arrow::FixedSizeBinaryType>
     ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t 
num_levels,
     const int16_t* def_levels, const int16_t* rep_levels) {
   RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
-  auto data = static_cast<const FixedSizeBinaryArray*>(array.get());
+  const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
+  const int64_t length = data.length();
+
   auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
 
   auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
 
-  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 
0)) {
+  if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) 
{
     // no nulls, just dump the data
     // todo(advancedxy): use a writeBatch to avoid this step
-    for (int64_t i = 0; i < data->length(); i++) {
-      buffer_ptr[i] = FixedLenByteArray(data->GetValue(i));
+    for (int64_t i = 0; i < length; i++) {
+      buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   } else {
     int buffer_idx = 0;
-    for (int64_t i = 0; i < data->length(); i++) {
-      if (!data->IsNull(i)) {
-        buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i));
+    for (int64_t i = 0; i < length; i++) {
+      if (!data.IsNull(i)) {
+        buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
       }
     }
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+template <>
+Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
+    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t 
num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels) {
+  const auto& data = static_cast<const Decimal128Array&>(*array);
+  const int64_t length = data.length();
+
+  // TODO(phillipc): This is potentially very wasteful if we have a lot of 
nulls
+  std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
+
+  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
+  auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+
+  auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+
+  const auto& decimal_type = static_cast<const 
::arrow::Decimal128Type&>(*data.type());
+  const int32_t offset =
+      decimal_type.byte_width() - DecimalSize(decimal_type.precision());
+
+  const bool does_not_have_nulls =
+      writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+  // TODO(phillipc): Look into whether our compilers will perform loop 
unswitching so we
+  // don't have to keep writing two loops to handle the case where we know 
there are no
+  // nulls
+  if (does_not_have_nulls) {
+    // no nulls, just dump the data
+    // todo(advancedxy): use a writeBatch to avoid this step
+    for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
+      auto unsigned_64_bit = reinterpret_cast<const 
uint64_t*>(data.GetValue(i));
+      big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+      big_endian_values[j + 1] = 
::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+      buffer_ptr[i] = FixedLenByteArray(
+          reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+    }
+  } else {
+    for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
+      if (!data.IsNull(i)) {
+        auto unsigned_64_bit = reinterpret_cast<const 
uint64_t*>(data.GetValue(i));
+        big_endian_values[j] = 
::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+        big_endian_values[j + 1] = 
::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+        buffer_ptr[buffer_idx++] = FixedLenByteArray(
+            reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+        j += 2;
+      }
+    }
+  }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
@@ -896,6 +947,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& 
data) {
       WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
       WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
       WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
+      WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
       WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
       WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
       WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index be387522..3284aca6 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -420,11 +420,9 @@ inline void 
DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary)
 
   PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
   uint8_t* bytes_data = byte_array_data_->mutable_data();
-  int offset = 0;
-  for (int i = 0; i < num_dictionary_values; ++i) {
+  for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += 
fixed_len) {
     memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len);
     dictionary_[i].ptr = bytes_data + offset;
-    offset += fixed_len;
   }
 }
 
@@ -597,7 +595,7 @@ inline int DictEncoder<DType>::Hash(const typename 
DType::c_type& value) const {
 template <>
 inline int DictEncoder<ByteArrayType>::Hash(const ByteArray& value) const {
   if (value.len > 0) {
-    DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL";
+    DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL";
   }
   return HashUtil::Hash(value.ptr, value.len, 0);
 }
@@ -605,7 +603,7 @@ inline int DictEncoder<ByteArrayType>::Hash(const 
ByteArray& value) const {
 template <>
 inline int DictEncoder<FLBAType>::Hash(const FixedLenByteArray& value) const {
   if (type_length_ > 0) {
-    DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL";
+    DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL";
   }
   return HashUtil::Hash(value.ptr, type_length_, 0);
 }
@@ -923,7 +921,8 @@ class DeltaByteArrayDecoder : public Decoder<ByteArrayType> 
{
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY),
         prefix_len_decoder_(nullptr, pool),
-        suffix_decoder_(nullptr, pool) {}
+        suffix_decoder_(nullptr, pool),
+        last_value_(0, nullptr) {}
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 9b9bde9f..4ec48a45 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -45,9 +45,9 @@ RowGroupReader::RowGroupReader(std::unique_ptr<Contents> 
contents)
     : contents_(std::move(contents)) {}
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   const ColumnDescriptor* descr = metadata()->schema()->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -57,9 +57,9 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
 }
 
 std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns())
+      << "The RowGroup only has " << metadata()->num_columns()
+      << "columns, requested column: " << i;
   return contents_->GetColumnPageReader(i);
 }
 
@@ -127,9 +127,9 @@ std::shared_ptr<FileMetaData> ParquetFileReader::metadata() 
const {
 }
 
 std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
-  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
-                                           << metadata()->num_row_groups()
-                                           << "row groups, requested reader 
for: " << i;
+  DCHECK(i < metadata()->num_row_groups())
+      << "The file only has " << metadata()->num_row_groups()
+      << "row groups, requested reader for: " << i;
   return contents_->GetRowGroup(i);
 }
 
diff --git a/src/parquet/types.h b/src/parquet/types.h
index af3a58f5..53b33d56 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <iterator>
 #include <sstream>
 #include <string>
 
@@ -136,15 +137,15 @@ struct ByteArray {
   ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {}
   uint32_t len;
   const uint8_t* ptr;
+};
 
-  bool operator==(const ByteArray& other) const {
-    return this->len == other.len && 0 == memcmp(this->ptr, other.ptr, 
this->len);
-  }
+inline bool operator==(const ByteArray& left, const ByteArray& right) {
+  return left.len == right.len && std::equal(left.ptr, left.ptr + left.len, 
right.ptr);
+}
 
-  bool operator!=(const ByteArray& other) const {
-    return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, 
this->len);
-  }
-};
+inline bool operator!=(const ByteArray& left, const ByteArray& right) {
+  return !(left == right);
+}
 
 struct FixedLenByteArray {
   FixedLenByteArray() : ptr(nullptr) {}
@@ -152,63 +153,47 @@ struct FixedLenByteArray {
   const uint8_t* ptr;
 };
 
-typedef FixedLenByteArray FLBA;
+using FLBA = FixedLenByteArray;
 
-MANUALLY_ALIGNED_STRUCT(1) Int96 {
-  uint32_t value[3];
+MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
+STRUCT_END(Int96, 12);
 
-  bool operator==(const Int96& other) const {
-    return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t));
-  }
+inline bool operator==(const Int96& left, const Int96& right) {
+  return std::equal(left.value, left.value + 3, right.value);
+}
 
-  bool operator!=(const Int96& other) const { return !(*this == other); }
-};
-STRUCT_END(Int96, 12);
+inline bool operator!=(const Int96& left, const Int96& right) { return !(left 
== right); }
 
 static inline std::string ByteArrayToString(const ByteArray& a) {
   return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
 }
 
 static inline std::string Int96ToString(const Int96& a) {
-  std::stringstream result;
-  for (int i = 0; i < 3; i++) {
-    result << a.value[i] << " ";
-  }
+  std::ostringstream result;
+  std::copy(a.value, a.value + 3, std::ostream_iterator<uint32_t>(result, " 
"));
   return result.str();
 }
 
 static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& 
a, int len) {
-  const uint8_t* bytes = reinterpret_cast<const uint8_t*>(a.ptr);
-  std::stringstream result;
-  for (int i = 0; i < len; i++) {
-    result << (uint32_t)bytes[i] << " ";
-  }
+  std::ostringstream result;
+  std::copy(a.ptr, a.ptr + len, std::ostream_iterator<uint32_t>(result, " "));
   return result.str();
 }
 
-static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
-  uint32_t len = std::min(x1.len, x2.len);
-  int cmp = memcmp(x1.ptr, x2.ptr, len);
-  if (cmp != 0) return cmp;
-  if (len < x1.len) return 1;
-  if (len < x2.len) return -1;
-  return 0;
-}
-
-template <int TYPE>
+template <Type::type TYPE>
 struct type_traits {};
 
 template <>
 struct type_traits<Type::BOOLEAN> {
-  typedef bool value_type;
-  static constexpr int value_byte_size = 1;
+  using value_type = bool;
 
+  static constexpr int value_byte_size = 1;
   static constexpr const char* printf_code = "d";
 };
 
 template <>
 struct type_traits<Type::INT32> {
-  typedef int32_t value_type;
+  using value_type = int32_t;
 
   static constexpr int value_byte_size = 4;
   static constexpr const char* printf_code = "d";
@@ -216,7 +201,7 @@ struct type_traits<Type::INT32> {
 
 template <>
 struct type_traits<Type::INT64> {
-  typedef int64_t value_type;
+  using value_type = int64_t;
 
   static constexpr int value_byte_size = 8;
   static constexpr const char* printf_code = "ld";
@@ -224,7 +209,7 @@ struct type_traits<Type::INT64> {
 
 template <>
 struct type_traits<Type::INT96> {
-  typedef Int96 value_type;
+  using value_type = Int96;
 
   static constexpr int value_byte_size = 12;
   static constexpr const char* printf_code = "s";
@@ -232,7 +217,7 @@ struct type_traits<Type::INT96> {
 
 template <>
 struct type_traits<Type::FLOAT> {
-  typedef float value_type;
+  using value_type = float;
 
   static constexpr int value_byte_size = 4;
   static constexpr const char* printf_code = "f";
@@ -240,7 +225,7 @@ struct type_traits<Type::FLOAT> {
 
 template <>
 struct type_traits<Type::DOUBLE> {
-  typedef double value_type;
+  using value_type = double;
 
   static constexpr int value_byte_size = 8;
   static constexpr const char* printf_code = "lf";
@@ -248,7 +233,7 @@ struct type_traits<Type::DOUBLE> {
 
 template <>
 struct type_traits<Type::BYTE_ARRAY> {
-  typedef ByteArray value_type;
+  using value_type = ByteArray;
 
   static constexpr int value_byte_size = sizeof(ByteArray);
   static constexpr const char* printf_code = "s";
@@ -256,7 +241,7 @@ struct type_traits<Type::BYTE_ARRAY> {
 
 template <>
 struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
-  typedef FixedLenByteArray value_type;
+  using value_type = FixedLenByteArray;
 
   static constexpr int value_byte_size = sizeof(FixedLenByteArray);
   static constexpr const char* printf_code = "s";
@@ -264,18 +249,18 @@ struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
 
 template <Type::type TYPE>
 struct DataType {
+  using c_type = typename type_traits<TYPE>::value_type;
   static constexpr Type::type type_num = TYPE;
-  typedef typename type_traits<TYPE>::value_type c_type;
 };
 
-typedef DataType<Type::BOOLEAN> BooleanType;
-typedef DataType<Type::INT32> Int32Type;
-typedef DataType<Type::INT64> Int64Type;
-typedef DataType<Type::INT96> Int96Type;
-typedef DataType<Type::FLOAT> FloatType;
-typedef DataType<Type::DOUBLE> DoubleType;
-typedef DataType<Type::BYTE_ARRAY> ByteArrayType;
-typedef DataType<Type::FIXED_LEN_BYTE_ARRAY> FLBAType;
+using BooleanType = DataType<Type::BOOLEAN>;
+using Int32Type = DataType<Type::INT32>;
+using Int64Type = DataType<Type::INT64>;
+using Int96Type = DataType<Type::INT96>;
+using FloatType = DataType<Type::FLOAT>;
+using DoubleType = DataType<Type::DOUBLE>;
+using ByteArrayType = DataType<Type::BYTE_ARRAY>;
+using FLBAType = DataType<Type::FIXED_LEN_BYTE_ARRAY>;
 
 template <typename Type>
 inline std::string format_fwf(int width) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [C++] Read and write Arrow decimal values
> -----------------------------------------
>
>                 Key: PARQUET-1095
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1095
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-cpp
>            Reporter: Wes McKinney
>            Assignee: Phillip Cloud
>             Fix For: cpp-1.4.0
>
>
> We now have 16-byte decimal values in Arrow which have been validated against 
> the Java implementation. We need to be able to read and write these to 
> Parquet format. 
> To make these values readable by Impala or some other Parquet readers may 
> require some work. It expects the storage size to match the decimal precision 
> exactly. So in parquet-cpp we will need to write the correct non-zero bytes 
> into a FIXED_LEN_BYTE_ARRAY of the appropriate size.
> We should validate this against Java Parquet implementations



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to