This is an automated email from the ASF dual-hosted git repository. gangwu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push: new 19f3b85d16 GH-47189: [C++][Parquet] Fix ParquetFilePrinter to produce valid JSON statistics (#47190) 19f3b85d16 is described below commit 19f3b85d168b69f524e813848aefb57941261272 Author: Gang Wu <ust...@gmail.com> AuthorDate: Mon Aug 4 23:42:14 2025 +0800 GH-47189: [C++][Parquet] Fix ParquetFilePrinter to produce valid JSON statistics (#47190) ### Rationale for this change ParquetFilePrinter uses FormatStatValue to print min/max stats. If stats contain non-UTF8 data, produced JSON is invalid. ### What changes are included in this PR? Make FormatStatValue aware of logical type and print hex values for binary data. ### Are these changes tested? Added test case to validate json output. ### Are there any user-facing changes? Yes, users may see hex values for binary values instead of char values in the past. * GitHub Issue: #47189 Authored-by: Gang Wu <ust...@gmail.com> Signed-off-by: Gang Wu <ust...@gmail.com> --- cpp/src/parquet/printer.cc | 13 +++-- cpp/src/parquet/reader_test.cc | 80 ++++++++++++++++++++++---- cpp/src/parquet/types.cc | 127 ++++++++++++++++++++++++++++++++++------- cpp/src/parquet/types.h | 5 +- cpp/src/parquet/types_test.cc | 43 ++++++++++++-- 5 files changed, 224 insertions(+), 44 deletions(-) diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 59e8935e14..dfd1d85809 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -168,8 +168,10 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte std::string min = stats->min(), max = stats->max(); stream << ", Null Values: " << stats->null_count << ", Distinct Values: " << stats->distinct_count << std::endl - << " Max: " << FormatStatValue(descr->physical_type(), max) - << ", Min: " << FormatStatValue(descr->physical_type(), min); + << " Max: " + << FormatStatValue(descr->physical_type(), max, descr->logical_type()) + << ", Min: " + << FormatStatValue(descr->physical_type(), min, descr->logical_type()); } else { stream << " Statistics Not Set"; } @@ -334,9 +336,12 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list<int> selected if (stats->HasMinMax()) { std::string min = stats->EncodeMin(), max = stats->EncodeMax(); stream << ", " - << R"("Max": ")" << FormatStatValue(descr->physical_type(), max) + << R"("Max": ")" + << FormatStatValue(descr->physical_type(), max, descr->logical_type()) << "\", " - << R"("Min": ")" << FormatStatValue(descr->physical_type(), min) << "\""; + << R"("Min": ")" + << FormatStatValue(descr->physical_type(), min, descr->logical_type()) + << "\""; } stream << " },"; } else { diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index b2ecef702f..7093a5c116 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -1196,19 +1196,24 @@ TEST_F(TestJSONWithLocalFile, JSONOutputSortColumns) { EXPECT_THAT(json_content, testing::HasSubstr(json_contains)); } +namespace { + +::arrow::Status CheckJsonValid(std::string_view json_string) { + rj::Document json_doc; + constexpr auto kParseFlags = rj::kParseFullPrecisionFlag | rj::kParseNanAndInfFlag; + json_doc.Parse<kParseFlags>(json_string.data(), json_string.length()); + if (json_doc.HasParseError()) { + return ::arrow::Status::Invalid("JSON parse error at offset ", + json_doc.GetErrorOffset(), ": ", + rj::GetParseError_En(json_doc.GetParseError())); + } + return ::arrow::Status::OK(); +} + +} // namespace + // GH-44101: Test that JSON output is valid JSON TEST_F(TestJSONWithLocalFile, ValidJsonOutput) { - auto check_json_valid = [](std::string_view json_string) -> ::arrow::Status { - rj::Document json_doc; - constexpr auto kParseFlags = rj::kParseFullPrecisionFlag | rj::kParseNanAndInfFlag; - json_doc.Parse<kParseFlags>(json_string.data(), json_string.length()); - if (json_doc.HasParseError()) { - return ::arrow::Status::Invalid("JSON parse error at offset ", - json_doc.GetErrorOffset(), ": ", - rj::GetParseError_En(json_doc.GetParseError())); - } - return ::arrow::Status::OK(); - }; std::vector<std::string_view> check_file_lists = { "data_index_bloom_encoding_with_length.parquet", "data_index_bloom_encoding_stats.parquet", @@ -1218,11 +1223,62 @@ TEST_F(TestJSONWithLocalFile, ValidJsonOutput) { "sort_columns.parquet"}; for (const auto& file : check_file_lists) { std::string json_content = ReadFromLocalFile(file); - ASSERT_OK(check_json_valid(json_content)) + ASSERT_OK(CheckJsonValid(json_content)) << "Invalid JSON output for file: " << file << ", content:" << json_content; } } +TEST(TestJSONWithMemoryFile, ValidJsonOutput) { + using ::arrow::internal::checked_cast; + auto schema = std::static_pointer_cast<GroupNode>(GroupNode::Make( + "schema", Repetition::REQUIRED, + schema::NodeVector{PrimitiveNode::Make("string_field", Repetition::REQUIRED, + LogicalType::String(), Type::BYTE_ARRAY), + PrimitiveNode::Make("binary_field", Repetition::REQUIRED, + LogicalType::None(), Type::BYTE_ARRAY)})); + + ASSERT_OK_AND_ASSIGN(auto out_file, ::arrow::io::BufferOutputStream::Create()); + auto file_writer = ParquetFileWriter::Open(out_file, schema); + auto row_group_writer = file_writer->AppendRowGroup(); + + // Write string column with valid UTF8 data + auto string_writer = checked_cast<ByteArrayWriter*>(row_group_writer->NextColumn()); + std::vector<std::string> utf8_strings = {"Hello", "World", "UTF8 测试", "🌟"}; + std::vector<ByteArray> string_values; + for (const auto& str : utf8_strings) { + string_values.emplace_back(std::string_view(str)); + } + string_writer->WriteBatch(string_values.size(), nullptr, nullptr, string_values.data()); + + // Write binary column with non-UTF8 data + auto binary_writer = checked_cast<ByteArrayWriter*>(row_group_writer->NextColumn()); + std::vector<std::vector<uint8_t>> binary_data = {{0x00, 0x01, 0x02, 0x03}, + {0xFF, 0xFE, 0xFD, 0xFC}, + {0x80, 0x81, 0x82, 0x83}, + {0xC0, 0xC1, 0xF5, 0xF6}}; + std::vector<ByteArray> binary_values; + for (const auto& data : binary_data) { + binary_values.emplace_back( + std::string_view(reinterpret_cast<const char*>(data.data()), data.size())); + } + binary_writer->WriteBatch(binary_values.size(), nullptr, nullptr, binary_values.data()); + + row_group_writer->Close(); + file_writer->Close(); + + // Read the file back and print as JSON + ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish()); + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(file_buf)); + ParquetFilePrinter printer(reader.get()); + + // Verify the output is valid JSON + std::stringstream json_output; + printer.JSONPrint(json_output, {}); + std::string json_content = json_output.str(); + ASSERT_OK(CheckJsonValid(json_content)) << "Invalid JSON output: " << json_content; +} + TEST(TestFileReader, BufferedReadsWithDictionary) { const int num_rows = 1000; diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 1387c222a7..e54683084c 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -18,6 +18,7 @@ #include <array> #include <cmath> #include <cstdint> +#include <iomanip> #include <memory> #include <sstream> #include <string> @@ -25,6 +26,8 @@ #include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" +#include "arrow/util/decimal.h" +#include "arrow/util/float16.h" #include "arrow/util/logging_internal.h" #include <rapidjson/document.h> @@ -96,9 +99,87 @@ bool PageCanUseChecksum(PageType::type pageType) { } } -std::string FormatStatValue(Type::type parquet_type, ::std::string_view val) { +namespace { + +template <typename T> +std::enable_if_t<std::is_arithmetic_v<T>, std::string> FormatNumericValue( + ::std::string_view val) { + std::stringstream result; + T value{}; + std::memcpy(&value, val.data(), sizeof(T)); + result << value; + return result.str(); +} + +std::string FormatDecimalValue(Type::type parquet_type, ::std::string_view val, + const std::shared_ptr<const LogicalType>& logical_type) { + ARROW_DCHECK(logical_type != nullptr && logical_type->is_decimal()); + + const auto& decimal_type = + ::arrow::internal::checked_cast<const DecimalLogicalType&>(*logical_type); + const int32_t scale = decimal_type.scale(); + + std::stringstream result; + switch (parquet_type) { + case Type::INT32: { + int32_t int_value{}; + std::memcpy(&int_value, val.data(), sizeof(int32_t)); + ::arrow::Decimal128 decimal_value(int_value); + result << decimal_value.ToString(scale); + break; + } + case Type::INT64: { + int64_t long_value{}; + std::memcpy(&long_value, val.data(), sizeof(int64_t)); + ::arrow::Decimal128 decimal_value(long_value); + result << decimal_value.ToString(scale); + break; + } + case Type::FIXED_LEN_BYTE_ARRAY: + case Type::BYTE_ARRAY: { + auto decimal_result = ::arrow::Decimal128::FromBigEndian( + reinterpret_cast<const uint8_t*>(val.data()), static_cast<int32_t>(val.size())); + if (!decimal_result.ok()) { + throw ParquetException("Failed to parse decimal value: ", + decimal_result.status().message()); + } + result << decimal_result.ValueUnsafe().ToString(scale); + break; + } + default: + throw ParquetException("Unsupported decimal type: ", TypeToString(parquet_type)); + } + + return result.str(); +} + +std::string FormatNonUTF8Value(::std::string_view val) { + if (val.empty()) { + return ""; + } + std::stringstream result; + result << "0x" << std::hex; + for (const auto& c : val) { + result << std::setw(2) << std::setfill('0') + << static_cast<int>(static_cast<unsigned char>(c)); + } + return result.str(); +} +std::string FormatFloat16Value(::std::string_view val) { + std::stringstream result; + auto float16 = ::arrow::util::Float16::FromLittleEndian( + reinterpret_cast<const uint8_t*>(val.data())); + result << float16.ToFloat(); + return result.str(); +} + +} // namespace + +std::string FormatStatValue(Type::type parquet_type, ::std::string_view val, + const std::shared_ptr<const LogicalType>& logical_type) { + std::stringstream result; const char* bytes = val.data(); switch (parquet_type) { case Type::BOOLEAN: { @@ -108,28 +189,22 @@ std::string FormatStatValue(Type::type parquet_type, ::std::string_view val) { break; } case Type::INT32: { - int32_t value{}; - std::memcpy(&value, bytes, sizeof(int32_t)); - result << value; - break; + if (logical_type != nullptr && logical_type->is_decimal()) { + return FormatDecimalValue(parquet_type, val, logical_type); + } + return FormatNumericValue<int32_t>(val); } case Type::INT64: { - int64_t value{}; - std::memcpy(&value, bytes, sizeof(int64_t)); - result << value; - break; + if (logical_type != nullptr && logical_type->is_decimal()) { + return FormatDecimalValue(parquet_type, val, logical_type); + } + return FormatNumericValue<int64_t>(val); } case Type::DOUBLE: { - double value{}; - std::memcpy(&value, bytes, sizeof(double)); - result << value; - break; + return FormatNumericValue<double>(val); } case Type::FLOAT: { - float value{}; - std::memcpy(&value, bytes, sizeof(float)); - result << value; - break; + return FormatNumericValue<float>(val); } case Type::INT96: { std::array<int32_t, 3> values{}; @@ -139,8 +214,18 @@ std::string FormatStatValue(Type::type parquet_type, ::std::string_view val) { } case Type::BYTE_ARRAY: case Type::FIXED_LEN_BYTE_ARRAY: { - result << val; - break; + if (logical_type != nullptr) { + if (logical_type->is_decimal()) { + return FormatDecimalValue(parquet_type, val, logical_type); + } + if (logical_type->is_string()) { + return std::string(val); + } + if (logical_type->is_float16()) { + return FormatFloat16Value(val); + } + } + return FormatNonUTF8Value(val); } case Type::UNDEFINED: default: @@ -1730,7 +1815,7 @@ format::LogicalType LogicalType::Impl::Geometry::ToThrift() const { format::LogicalType type; format::GeometryType geometry_type; - // Canonially export crs of "" as an unset CRS + // Canonically export crs of "" as an unset CRS if (!crs_.empty()) { geometry_type.__set_crs(crs_); } @@ -1825,7 +1910,7 @@ format::LogicalType LogicalType::Impl::Geography::ToThrift() const { format::LogicalType type; format::GeographyType geography_type; - // Canonially export crs of "" as an unset CRS + // Canonically export crs of "" as an unset CRS if (!crs_.empty()) { geography_type.__set_crs(crs_); } diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 80b0002faa..c2040e555f 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -853,8 +853,9 @@ PARQUET_EXPORT std::string TypeToString(Type::type t); PARQUET_EXPORT std::string TypeToString(Type::type t, int type_length); -PARQUET_EXPORT std::string FormatStatValue(Type::type parquet_type, - ::std::string_view val); +PARQUET_EXPORT std::string FormatStatValue( + Type::type parquet_type, ::std::string_view val, + const std::shared_ptr<const LogicalType>& logical_type = NULLPTR); PARQUET_EXPORT int GetTypeByteSize(Type::type t); diff --git a/cpp/src/parquet/types_test.cc b/cpp/src/parquet/types_test.cc index fdcaed5c81..4381127469 100644 --- a/cpp/src/parquet/types_test.cc +++ b/cpp/src/parquet/types_test.cc @@ -117,17 +117,50 @@ TEST(TypePrinter, StatisticsTypes) { smin = std::string("abcdef"); smax = std::string("ijklmnop"); - ASSERT_STREQ("abcdef", FormatStatValue(Type::BYTE_ARRAY, smin).c_str()); - ASSERT_STREQ("ijklmnop", FormatStatValue(Type::BYTE_ARRAY, smax).c_str()); + ASSERT_EQ(smin, FormatStatValue(Type::BYTE_ARRAY, smin, LogicalType::String())); + ASSERT_EQ(smax, FormatStatValue(Type::BYTE_ARRAY, smax, LogicalType::String())); + ASSERT_EQ("0x616263646566", FormatStatValue(Type::BYTE_ARRAY, smin)); + ASSERT_EQ("0x696a6b6c6d6e6f70", FormatStatValue(Type::BYTE_ARRAY, smax)); // PARQUET-1357: FormatStatValue truncates binary statistics on zero character smax.push_back('\0'); - ASSERT_EQ(smax, FormatStatValue(Type::BYTE_ARRAY, smax)); + ASSERT_EQ(smax, FormatStatValue(Type::BYTE_ARRAY, smax, LogicalType::String())); + ASSERT_EQ("0x696a6b6c6d6e6f7000", FormatStatValue(Type::BYTE_ARRAY, smax)); + // String smin = std::string("abcdefgh"); smax = std::string("ijklmnop"); - ASSERT_STREQ("abcdefgh", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin).c_str()); - ASSERT_STREQ("ijklmnop", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smax).c_str()); + ASSERT_EQ(smin, + FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin, LogicalType::String())); + ASSERT_EQ(smax, + FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smax, LogicalType::String())); + ASSERT_EQ("0x6162636465666768", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin)); + ASSERT_EQ("0x696a6b6c6d6e6f70", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smax)); + + // Decimal + int32_t int32_decimal = 1024; + smin = std::string(reinterpret_cast<char*>(&int32_decimal), sizeof(int32_t)); + ASSERT_EQ("10.24", FormatStatValue(Type::INT32, smin, LogicalType::Decimal(6, 2))); + + int64_t int64_decimal = 102'400'000'000; + smin = std::string(reinterpret_cast<char*>(&int64_decimal), sizeof(int64_t)); + ASSERT_EQ("10240000.0000", + FormatStatValue(Type::INT64, smin, LogicalType::Decimal(18, 4))); + + std::vector<char> bytes = {0x11, 0x22, 0x33, 0x44}; + smin = std::string(bytes.begin(), bytes.end()); + ASSERT_EQ("28745.4020", + FormatStatValue(Type::BYTE_ARRAY, smin, LogicalType::Decimal(10, 4))); + ASSERT_EQ("28745.4020", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin, + LogicalType::Decimal(10, 4))); + ASSERT_EQ("0x11223344", FormatStatValue(Type::BYTE_ARRAY, smin)); + ASSERT_EQ("0x11223344", FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin)); + + // Float16 + bytes = {0x1c, 0x50}; + smin = std::string(bytes.begin(), bytes.end()); + ASSERT_EQ("32.875", + FormatStatValue(Type::FIXED_LEN_BYTE_ARRAY, smin, LogicalType::Float16())); } TEST(TestInt96Timestamp, Decoding) {