This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2dbd39c71d GH-34633: [C++][Parquet] Fix StreamReader to read decimals
(#34720)
2dbd39c71d is described below
commit 2dbd39c71d774f4a14562d30cf161d0791dbe66b
Author: Gang Wu <[email protected]>
AuthorDate: Tue Mar 28 07:30:19 2023 +0800
GH-34633: [C++][Parquet] Fix StreamReader to read decimals (#34720)
### Rationale for this change
StreamReader only supports reading decimals from int32 and int64. User
cannot read decimals from FIXED_LENGTH_BYTE_ARRAY or BYTE_ARRAY.
### What changes are included in this PR?
Support StreamReader to read decimals via arrow::Decimal128 type.
### Are these changes tested?
Added two test cases to read them.
### Are there any user-facing changes?
Yes, now user can directly use arrow::Decimal128 to read from parquet file.
* Closes: #34633
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/parquet/stream_reader.cc | 42 ++++++++++++++++++++++++++++++++++-
cpp/src/parquet/stream_reader.h | 4 ++++
cpp/src/parquet/stream_reader_test.cc | 39 ++++++++++++++++++++++++++++++--
3 files changed, 82 insertions(+), 3 deletions(-)
diff --git a/cpp/src/parquet/stream_reader.cc b/cpp/src/parquet/stream_reader.cc
index 9a7cc8cdf8..0fecb1bf24 100644
--- a/cpp/src/parquet/stream_reader.cc
+++ b/cpp/src/parquet/stream_reader.cc
@@ -16,6 +16,7 @@
// under the License.
#include "parquet/stream_reader.h"
+#include "arrow/util/decimal.h"
#include <set>
#include <utility>
@@ -35,7 +36,7 @@ constexpr int64_t StreamReader::kBatchSizeOne;
// then it will allow the Parquet file to use the converted type
// NONE.
//
-static const std::set<std::pair<ConvertedType::type, ConvertedType::type> >
+static const std::set<std::pair<ConvertedType::type, ConvertedType::type>>
converted_type_exceptions = {{ConvertedType::INT_32, ConvertedType::NONE},
{ConvertedType::INT_64, ConvertedType::NONE},
{ConvertedType::INT_32,
ConvertedType::DECIMAL},
@@ -275,6 +276,45 @@ StreamReader&
StreamReader::operator>>(optional<std::string>& v) {
return *this;
}
+StreamReader& StreamReader::operator>>(optional<::arrow::Decimal128>& v) {
+ const auto& node = nodes_[column_index_];
+ if (node->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+ const int type_length = node->type_length();
+ CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::DECIMAL,
type_length);
+
+ FixedLenByteArray flba;
+ if (ReadOptional(&flba)) {
+ PARQUET_ASSIGN_OR_THROW(v,
+ ::arrow::Decimal128::FromBigEndian(flba.ptr,
type_length));
+ } else {
+ v.reset();
+ }
+ } else if (node->physical_type() == Type::BYTE_ARRAY) {
+ CheckColumn(Type::BYTE_ARRAY, ConvertedType::DECIMAL);
+
+ ByteArray ba;
+ if (ReadOptional(&ba)) {
+ PARQUET_ASSIGN_OR_THROW(v, ::arrow::Decimal128::FromBigEndian(ba.ptr,
ba.len));
+ } else {
+ v.reset();
+ }
+ } else {
+ ParquetException::NYI("Decimal128 is not implemented for non-binary
types");
+ }
+ return *this;
+}
+
+StreamReader& StreamReader::operator>>(::arrow::Decimal128& v) {
+ const auto& node = nodes_[column_index_];
+ std::optional<::arrow::Decimal128> maybe_v;
+ *this >> maybe_v;
+ if (!maybe_v.has_value()) {
+ ThrowReadFailedException(node);
+ }
+ v = std::move(maybe_v.value());
+ return *this;
+}
+
void StreamReader::ReadFixedLength(char* ptr, int len) {
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, len);
FixedLenByteArray flba;
diff --git a/cpp/src/parquet/stream_reader.h b/cpp/src/parquet/stream_reader.h
index e16f8ee694..a7dadac92c 100644
--- a/cpp/src/parquet/stream_reader.h
+++ b/cpp/src/parquet/stream_reader.h
@@ -134,6 +134,8 @@ class PARQUET_EXPORT StreamReader {
StreamReader& operator>>(std::string& v);
+ StreamReader& operator>>(::arrow::Decimal128& v);
+
// Input operators for optional fields.
StreamReader& operator>>(optional<bool>& v);
@@ -166,6 +168,8 @@ class PARQUET_EXPORT StreamReader {
StreamReader& operator>>(optional<std::string>& v);
+ StreamReader& operator>>(optional<::arrow::Decimal128>& v);
+
template <std::size_t N>
StreamReader& operator>>(optional<std::array<char, N>>& v) {
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
diff --git a/cpp/src/parquet/stream_reader_test.cc
b/cpp/src/parquet/stream_reader_test.cc
index aa0ff25b10..fed036bca5 100644
--- a/cpp/src/parquet/stream_reader_test.cc
+++ b/cpp/src/parquet/stream_reader_test.cc
@@ -26,6 +26,7 @@
#include <utility>
#include "arrow/io/file.h"
+#include "arrow/util/decimal.h"
#include "parquet/exception.h"
#include "parquet/test_util.h"
@@ -886,7 +887,7 @@ TEST_F(TestReadingDataFiles, Int32Decimal) {
auto reader = StreamReader{std::move(file_reader)};
int32_t x;
- int i;
+ int i = 0;
for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
@@ -903,7 +904,7 @@ TEST_F(TestReadingDataFiles, Int64Decimal) {
auto reader = StreamReader{std::move(file_reader)};
int64_t x;
- int i;
+ int i = 0;
for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
@@ -912,5 +913,39 @@ TEST_F(TestReadingDataFiles, Int64Decimal) {
EXPECT_EQ(i, 25);
}
+TEST_F(TestReadingDataFiles, FLBADecimal) {
+ PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(
+
GetDataFile("fixed_length_decimal.parquet")));
+
+ auto file_reader = ParquetFileReader::Open(infile);
+ auto reader = StreamReader{std::move(file_reader)};
+
+ ::arrow::Decimal128 x;
+ int i = 0;
+
+ for (i = 1; !reader.eof(); ++i) {
+ reader >> x >> EndRow;
+ EXPECT_EQ(x, ::arrow::Decimal128(i * 100));
+ }
+ EXPECT_EQ(i, 25);
+}
+
+TEST_F(TestReadingDataFiles, ByteArrayDecimal) {
+ PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(
+
GetDataFile("byte_array_decimal.parquet")));
+
+ auto file_reader = ParquetFileReader::Open(infile);
+ auto reader = StreamReader{std::move(file_reader)};
+
+ ::arrow::Decimal128 x;
+ int i = 0;
+
+ for (i = 1; !reader.eof(); ++i) {
+ reader >> x >> EndRow;
+ EXPECT_EQ(x, ::arrow::Decimal128(i * 100));
+ }
+ EXPECT_EQ(i, 25);
+}
+
} // namespace test
} // namespace parquet