This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dbd39c71d GH-34633: [C++][Parquet] Fix StreamReader to read decimals 
(#34720)
2dbd39c71d is described below

commit 2dbd39c71d774f4a14562d30cf161d0791dbe66b
Author: Gang Wu <[email protected]>
AuthorDate: Tue Mar 28 07:30:19 2023 +0800

    GH-34633: [C++][Parquet] Fix StreamReader to read decimals (#34720)
    
    ### Rationale for this change
    
    StreamReader only supports reading decimals from int32 and int64. User 
cannot read decimals from FIXED_LENGTH_BYTE_ARRAY or BYTE_ARRAY.
    
    ### What changes are included in this PR?
    
    Support StreamReader to read decimals via arrow::Decimal128 type.
    
    ### Are these changes tested?
    
    Added two test cases to read them.
    
    ### Are there any user-facing changes?
    
    Yes, now user can directly use arrow::Decimal128 to read from parquet file.
    * Closes: #34633
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/parquet/stream_reader.cc      | 42 ++++++++++++++++++++++++++++++++++-
 cpp/src/parquet/stream_reader.h       |  4 ++++
 cpp/src/parquet/stream_reader_test.cc | 39 ++++++++++++++++++++++++++++++--
 3 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/cpp/src/parquet/stream_reader.cc b/cpp/src/parquet/stream_reader.cc
index 9a7cc8cdf8..0fecb1bf24 100644
--- a/cpp/src/parquet/stream_reader.cc
+++ b/cpp/src/parquet/stream_reader.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "parquet/stream_reader.h"
+#include "arrow/util/decimal.h"
 
 #include <set>
 #include <utility>
@@ -35,7 +36,7 @@ constexpr int64_t StreamReader::kBatchSizeOne;
 // then it will allow the Parquet file to use the converted type
 // NONE.
 //
-static const std::set<std::pair<ConvertedType::type, ConvertedType::type> >
+static const std::set<std::pair<ConvertedType::type, ConvertedType::type>>
     converted_type_exceptions = {{ConvertedType::INT_32, ConvertedType::NONE},
                                  {ConvertedType::INT_64, ConvertedType::NONE},
                                  {ConvertedType::INT_32, 
ConvertedType::DECIMAL},
@@ -275,6 +276,45 @@ StreamReader& 
StreamReader::operator>>(optional<std::string>& v) {
   return *this;
 }
 
+StreamReader& StreamReader::operator>>(optional<::arrow::Decimal128>& v) {
+  const auto& node = nodes_[column_index_];
+  if (node->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+    const int type_length = node->type_length();
+    CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::DECIMAL, 
type_length);
+
+    FixedLenByteArray flba;
+    if (ReadOptional(&flba)) {
+      PARQUET_ASSIGN_OR_THROW(v,
+                              ::arrow::Decimal128::FromBigEndian(flba.ptr, 
type_length));
+    } else {
+      v.reset();
+    }
+  } else if (node->physical_type() == Type::BYTE_ARRAY) {
+    CheckColumn(Type::BYTE_ARRAY, ConvertedType::DECIMAL);
+
+    ByteArray ba;
+    if (ReadOptional(&ba)) {
+      PARQUET_ASSIGN_OR_THROW(v, ::arrow::Decimal128::FromBigEndian(ba.ptr, 
ba.len));
+    } else {
+      v.reset();
+    }
+  } else {
+    ParquetException::NYI("Decimal128 is not implemented for non-binary 
types");
+  }
+  return *this;
+}
+
+StreamReader& StreamReader::operator>>(::arrow::Decimal128& v) {
+  const auto& node = nodes_[column_index_];
+  std::optional<::arrow::Decimal128> maybe_v;
+  *this >> maybe_v;
+  if (!maybe_v.has_value()) {
+    ThrowReadFailedException(node);
+  }
+  v = std::move(maybe_v.value());
+  return *this;
+}
+
 void StreamReader::ReadFixedLength(char* ptr, int len) {
   CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, len);
   FixedLenByteArray flba;
diff --git a/cpp/src/parquet/stream_reader.h b/cpp/src/parquet/stream_reader.h
index e16f8ee694..a7dadac92c 100644
--- a/cpp/src/parquet/stream_reader.h
+++ b/cpp/src/parquet/stream_reader.h
@@ -134,6 +134,8 @@ class PARQUET_EXPORT StreamReader {
 
   StreamReader& operator>>(std::string& v);
 
+  StreamReader& operator>>(::arrow::Decimal128& v);
+
   // Input operators for optional fields.
 
   StreamReader& operator>>(optional<bool>& v);
@@ -166,6 +168,8 @@ class PARQUET_EXPORT StreamReader {
 
   StreamReader& operator>>(optional<std::string>& v);
 
+  StreamReader& operator>>(optional<::arrow::Decimal128>& v);
+
   template <std::size_t N>
   StreamReader& operator>>(optional<std::array<char, N>>& v) {
     CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
diff --git a/cpp/src/parquet/stream_reader_test.cc 
b/cpp/src/parquet/stream_reader_test.cc
index aa0ff25b10..fed036bca5 100644
--- a/cpp/src/parquet/stream_reader_test.cc
+++ b/cpp/src/parquet/stream_reader_test.cc
@@ -26,6 +26,7 @@
 #include <utility>
 
 #include "arrow/io/file.h"
+#include "arrow/util/decimal.h"
 #include "parquet/exception.h"
 #include "parquet/test_util.h"
 
@@ -886,7 +887,7 @@ TEST_F(TestReadingDataFiles, Int32Decimal) {
   auto reader = StreamReader{std::move(file_reader)};
 
   int32_t x;
-  int i;
+  int i = 0;
 
   for (i = 1; !reader.eof(); ++i) {
     reader >> x >> EndRow;
@@ -903,7 +904,7 @@ TEST_F(TestReadingDataFiles, Int64Decimal) {
   auto reader = StreamReader{std::move(file_reader)};
 
   int64_t x;
-  int i;
+  int i = 0;
 
   for (i = 1; !reader.eof(); ++i) {
     reader >> x >> EndRow;
@@ -912,5 +913,39 @@ TEST_F(TestReadingDataFiles, Int64Decimal) {
   EXPECT_EQ(i, 25);
 }
 
+TEST_F(TestReadingDataFiles, FLBADecimal) {
+  PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(
+                                           
GetDataFile("fixed_length_decimal.parquet")));
+
+  auto file_reader = ParquetFileReader::Open(infile);
+  auto reader = StreamReader{std::move(file_reader)};
+
+  ::arrow::Decimal128 x;
+  int i = 0;
+
+  for (i = 1; !reader.eof(); ++i) {
+    reader >> x >> EndRow;
+    EXPECT_EQ(x, ::arrow::Decimal128(i * 100));
+  }
+  EXPECT_EQ(i, 25);
+}
+
+TEST_F(TestReadingDataFiles, ByteArrayDecimal) {
+  PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(
+                                           
GetDataFile("byte_array_decimal.parquet")));
+
+  auto file_reader = ParquetFileReader::Open(infile);
+  auto reader = StreamReader{std::move(file_reader)};
+
+  ::arrow::Decimal128 x;
+  int i = 0;
+
+  for (i = 1; !reader.eof(); ++i) {
+    reader >> x >> EndRow;
+    EXPECT_EQ(x, ::arrow::Decimal128(i * 100));
+  }
+  EXPECT_EQ(i, 25);
+}
+
 }  // namespace test
 }  // namespace parquet

Reply via email to