This is an automated email from the ASF dual-hosted git repository. apitrou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new dd33c750e0 ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275) dd33c750e0 is described below commit dd33c750e068baddba99589076aa12b892268640 Author: Antoine Pitrou <anto...@python.org> AuthorDate: Tue Jun 7 13:29:05 2022 +0200 ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275) In fringe cases, users may have Parquet files where deserializing exceeds our default Thrift size limits. Authored-by: Antoine Pitrou <anto...@python.org> Signed-off-by: Antoine Pitrou <anto...@python.org> --- cpp/cmake_modules/SetupCxxFlags.cmake | 1 - cpp/src/arrow/dataset/file_parquet.cc | 4 + cpp/src/parquet/column_reader.cc | 31 +++++-- cpp/src/parquet/column_reader.h | 5 ++ cpp/src/parquet/column_writer_test.cc | 8 +- cpp/src/parquet/file_reader.cc | 4 +- cpp/src/parquet/metadata.cc | 123 ++++++++++++++++--------- cpp/src/parquet/metadata.h | 35 +++++++- cpp/src/parquet/properties.h | 16 ++++ cpp/src/parquet/statistics_test.cc | 4 +- cpp/src/parquet/thrift_internal.h | 138 ++++++++++++++++------------- python/pyarrow/_dataset_parquet.pyx | 64 +++++++++++-- python/pyarrow/_parquet.pxd | 8 ++ python/pyarrow/_parquet.pyx | 18 +++- python/pyarrow/parquet/__init__.py | 61 ++++++++++--- python/pyarrow/tests/parquet/test_basic.py | 27 ++++++ python/pyarrow/tests/test_dataset.py | 11 +++ 17 files changed, 407 insertions(+), 151 deletions(-) diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake index 713bc5528b..66b84dbc29 100644 --- a/cpp/cmake_modules/SetupCxxFlags.cmake +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -290,7 +290,6 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN") elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-conversion") - set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion") set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wunused-result") elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel") diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index f2a3903208..0d95e18171 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -69,6 +69,10 @@ parquet::ReaderProperties MakeReaderProperties( properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size()); properties.file_decryption_properties( parquet_scan_options->reader_properties->file_decryption_properties()); + properties.set_thrift_string_size_limit( + parquet_scan_options->reader_properties->thrift_string_size_limit()); + properties.set_thrift_container_size_limit( + parquet_scan_options->reader_properties->thrift_container_size_limit()); return properties; } diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 76476c5da7..4f82992aeb 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -223,14 +223,15 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) { class SerializedPageReader : public PageReader { public: SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, - Compression::type codec, ::arrow::MemoryPool* pool, + Compression::type codec, const ReaderProperties& properties, const CryptoContext* crypto_ctx) - : stream_(std::move(stream)), - decompression_buffer_(AllocateBuffer(pool, 0)), + : properties_(properties), + stream_(std::move(stream)), + decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)), page_ordinal_(0), seen_num_rows_(0), total_num_rows_(total_num_rows), - decryption_buffer_(AllocateBuffer(pool, 0)) { + decryption_buffer_(AllocateBuffer(properties_.memory_pool(), 0)) { if (crypto_ctx != nullptr) { crypto_ctx_ = *crypto_ctx; InitDecryption(); @@ -254,6 +255,7 @@ class SerializedPageReader : public PageReader { int compressed_len, int uncompressed_len, int levels_byte_len = 0); + const ReaderProperties properties_; std::shared_ptr<ArrowInputStream> stream_; format::PageHeader current_page_header_; @@ -326,9 +328,10 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de } std::shared_ptr<Page> SerializedPageReader::NextPage() { + ThriftDeserializer deserializer(properties_); + // Loop here because there may be unhandled page types that we skip until // finding a page that we do know what to do with - while (seen_num_rows_ < total_num_rows_) { uint32_t header_size = 0; uint32_t allowed_page_size = kDefaultPageHeaderSize; @@ -349,8 +352,9 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() { UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader, data_page_header_aad_); } - DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size, - ¤t_page_header_, crypto_ctx_.meta_decryptor); + deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()), + &header_size, ¤t_page_header_, + crypto_ctx_.meta_decryptor); break; } catch (std::exception& e) { // Failed to deserialize. Double the allowed page header size and try again @@ -508,13 +512,22 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded( } // namespace +std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream, + int64_t total_num_rows, + Compression::type codec, + const ReaderProperties& properties, + const CryptoContext* ctx) { + return std::unique_ptr<PageReader>(new SerializedPageReader( + std::move(stream), total_num_rows, codec, properties, ctx)); +} + std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, Compression::type codec, ::arrow::MemoryPool* pool, const CryptoContext* ctx) { - return std::unique_ptr<PageReader>( - new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx)); + return std::unique_ptr<PageReader>(new SerializedPageReader( + std::move(stream), total_num_rows, codec, ReaderProperties(pool), ctx)); } namespace { diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 50eac31dc3..c22f9f2fc7 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -25,6 +25,7 @@ #include "parquet/exception.h" #include "parquet/level_conversion.h" #include "parquet/platform.h" +#include "parquet/properties.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -106,6 +107,10 @@ class PARQUET_EXPORT PageReader { std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, Compression::type codec, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const CryptoContext* ctx = NULLPTR); + static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream, + int64_t total_num_rows, Compression::type codec, + const ReaderProperties& properties, + const CryptoContext* ctx = NULLPTR); // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> // containing new Page otherwise diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 61f4c47e78..2cd21628b3 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -276,8 +276,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { // This is because the ColumnChunkMetaData semantics dictate the metadata object is // complete (no changes to the metadata buffer can be made after instantiation) ApplicationVersion app_version(this->writer_properties_->created_by()); - auto metadata_accessor = - ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version); + auto metadata_accessor = ColumnChunkMetaData::Make( + metadata_->contents(), this->descr_, default_reader_properties(), &app_version); return metadata_accessor->is_stats_set(); } @@ -286,8 +286,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { // This is because the ColumnChunkMetaData semantics dictate the metadata object is // complete (no changes to the metadata buffer can be made after instantiation) ApplicationVersion app_version(this->writer_properties_->created_by()); - auto metadata_accessor = - ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version); + auto metadata_accessor = ColumnChunkMetaData::Make( + metadata_->contents(), this->descr_, default_reader_properties(), &app_version); auto encoded_stats = metadata_accessor->statistics()->Encode(); return {encoded_stats.has_min, encoded_stats.has_max}; } diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 3f4c2cb76a..51f90b1730 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -543,8 +543,8 @@ uint32_t SerializedFile::ParseUnencryptedFileMetadata( } uint32_t read_metadata_len = metadata_len; // The encrypted read path falls through to here, so pass in the decryptor - file_metadata_ = - FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, file_decryptor_); + file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, + properties_, file_decryptor_); return read_metadata_len; } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 102fa874ad..6226c3ad09 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -176,9 +176,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column, const ColumnDescriptor* descr, int16_t row_group_ordinal, int16_t column_ordinal, + const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr<InternalFileDecryptor> file_decryptor) - : column_(column), descr_(descr), writer_version_(writer_version) { + : column_(column), + descr_(descr), + properties_(properties), + writer_version_(writer_version) { column_metadata_ = &column->meta_data; if (column->__isset.crypto_metadata) { // column metadata is encrypted format::ColumnCryptoMetaData ccmd = column->crypto_metadata; @@ -196,7 +200,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { auto decryptor = file_decryptor->GetColumnMetaDecryptor( path->ToDotString(), key_metadata, aad_column_metadata); auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size()); - DeserializeThriftMsg( + ThriftDeserializer deserializer(properties_); + deserializer.DeserializeMessage( reinterpret_cast<const uint8_t*>(column->encrypted_column_metadata.c_str()), &len, &decrypted_metadata_, decryptor); column_metadata_ = &decrypted_metadata_; @@ -306,26 +311,38 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { const format::ColumnMetaData* column_metadata_; format::ColumnMetaData decrypted_metadata_; const ColumnDescriptor* descr_; + const ReaderProperties properties_; const ApplicationVersion* writer_version_; }; std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make( const void* metadata, const ColumnDescriptor* descr, - const ApplicationVersion* writer_version, int16_t row_group_ordinal, - int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) { + const ReaderProperties& properties, const ApplicationVersion* writer_version, + int16_t row_group_ordinal, int16_t column_ordinal, + std::shared_ptr<InternalFileDecryptor> file_decryptor) { return std::unique_ptr<ColumnChunkMetaData>( new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal, - writer_version, std::move(file_decryptor))); + properties, writer_version, std::move(file_decryptor))); +} + +std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make( + const void* metadata, const ColumnDescriptor* descr, + const ApplicationVersion* writer_version, int16_t row_group_ordinal, + int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) { + return std::unique_ptr<ColumnChunkMetaData>(new ColumnChunkMetaData( + metadata, descr, row_group_ordinal, column_ordinal, default_reader_properties(), + writer_version, std::move(file_decryptor))); } ColumnChunkMetaData::ColumnChunkMetaData( const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, - int16_t column_ordinal, const ApplicationVersion* writer_version, + int16_t column_ordinal, const ReaderProperties& properties, + const ApplicationVersion* writer_version, std::shared_ptr<InternalFileDecryptor> file_decryptor) : impl_{new ColumnChunkMetaDataImpl( reinterpret_cast<const format::ColumnChunk*>(metadata), descr, - row_group_ordinal, column_ordinal, writer_version, std::move(file_decryptor))} { -} + row_group_ordinal, column_ordinal, properties, writer_version, + std::move(file_decryptor))} {} ColumnChunkMetaData::~ColumnChunkMetaData() = default; @@ -403,10 +420,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl { public: explicit RowGroupMetaDataImpl(const format::RowGroup* row_group, const SchemaDescriptor* schema, + const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr<InternalFileDecryptor> file_decryptor) : row_group_(row_group), schema_(schema), + properties_(properties), writer_version_(writer_version), file_decryptor_(std::move(file_decryptor)) {} @@ -431,7 +450,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl { std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) { if (i < num_columns()) { return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i), - writer_version_, row_group_->ordinal, + properties_, writer_version_, row_group_->ordinal, static_cast<int16_t>(i), file_decryptor_); } throw ParquetException("The file only has ", num_columns(), @@ -441,6 +460,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl { private: const format::RowGroup* row_group_; const SchemaDescriptor* schema_; + const ReaderProperties properties_; const ApplicationVersion* writer_version_; std::shared_ptr<InternalFileDecryptor> file_decryptor_; }; @@ -449,16 +469,26 @@ std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make( const void* metadata, const SchemaDescriptor* schema, const ApplicationVersion* writer_version, std::shared_ptr<InternalFileDecryptor> file_decryptor) { - return std::unique_ptr<RowGroupMetaData>( - new RowGroupMetaData(metadata, schema, writer_version, std::move(file_decryptor))); + return std::unique_ptr<parquet::RowGroupMetaData>( + new RowGroupMetaData(metadata, schema, default_reader_properties(), writer_version, + std::move(file_decryptor))); +} + +std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make( + const void* metadata, const SchemaDescriptor* schema, + const ReaderProperties& properties, const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor) { + return std::unique_ptr<parquet::RowGroupMetaData>(new RowGroupMetaData( + metadata, schema, properties, writer_version, std::move(file_decryptor))); } RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema, + const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr<InternalFileDecryptor> file_decryptor) : impl_{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup*>(metadata), - schema, writer_version, std::move(file_decryptor))} { -} + schema, properties, writer_version, + std::move(file_decryptor))} {} RowGroupMetaData::~RowGroupMetaData() = default; @@ -500,16 +530,17 @@ class FileMetaData::FileMetaDataImpl { FileMetaDataImpl() = default; explicit FileMetaDataImpl( - const void* metadata, uint32_t* metadata_len, + const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties, std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr) - : file_decryptor_(file_decryptor) { + : properties_(properties), file_decryptor_(file_decryptor) { metadata_.reset(new format::FileMetaData); auto footer_decryptor = file_decryptor_ != nullptr ? file_decryptor->GetFooterDecryptor() : nullptr; - DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len, - metadata_.get(), footer_decryptor); + ThriftDeserializer deserializer(properties_); + deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(metadata), + metadata_len, metadata_.get(), footer_decryptor); metadata_len_ = *metadata_len; if (metadata_->__isset.created_by) { @@ -622,8 +653,8 @@ class FileMetaData::FileMetaDataImpl { << " row groups, requested metadata for row group: " << i; throw ParquetException(ss.str()); } - return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_, - file_decryptor_); + return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_, + &writer_version_, file_decryptor_); } bool Equals(const FileMetaDataImpl& other) const { @@ -718,6 +749,7 @@ class FileMetaData::FileMetaDataImpl { SchemaDescriptor schema_; ApplicationVersion writer_version_; std::shared_ptr<const KeyValueMetadata> key_value_metadata_; + const ReaderProperties properties_; std::shared_ptr<InternalFileDecryptor> file_decryptor_; void InitSchema() { @@ -759,20 +791,26 @@ class FileMetaData::FileMetaDataImpl { }; std::shared_ptr<FileMetaData> FileMetaData::Make( - const void* metadata, uint32_t* metadata_len, + const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties, std::shared_ptr<InternalFileDecryptor> file_decryptor) { // This FileMetaData ctor is private, not compatible with std::make_shared return std::shared_ptr<FileMetaData>( - new FileMetaData(metadata, metadata_len, file_decryptor)); + new FileMetaData(metadata, metadata_len, properties, file_decryptor)); +} + +std::shared_ptr<FileMetaData> FileMetaData::Make( + const void* metadata, uint32_t* metadata_len, + std::shared_ptr<InternalFileDecryptor> file_decryptor) { + return std::shared_ptr<FileMetaData>(new FileMetaData( + metadata, metadata_len, default_reader_properties(), file_decryptor)); } FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len, + const ReaderProperties& properties, std::shared_ptr<InternalFileDecryptor> file_decryptor) - : impl_{std::unique_ptr<FileMetaDataImpl>( - new FileMetaDataImpl(metadata, metadata_len, file_decryptor))} {} + : impl_(new FileMetaDataImpl(metadata, metadata_len, properties, file_decryptor)) {} -FileMetaData::FileMetaData() - : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {} +FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {} FileMetaData::~FileMetaData() = default; @@ -870,24 +908,27 @@ class FileCryptoMetaData::FileCryptoMetaDataImpl { public: FileCryptoMetaDataImpl() = default; - explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) { - metadata_.reset(new format::FileCryptoMetaData); - DeserializeThriftMsg(metadata, metadata_len, metadata_.get()); + explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len, + const ReaderProperties& properties) { + ThriftDeserializer deserializer(properties); + deserializer.DeserializeMessage(metadata, metadata_len, &metadata_); metadata_len_ = *metadata_len; } - EncryptionAlgorithm encryption_algorithm() { - return FromThrift(metadata_->encryption_algorithm); + EncryptionAlgorithm encryption_algorithm() const { + return FromThrift(metadata_.encryption_algorithm); } - const std::string& key_metadata() { return metadata_->key_metadata; } + + const std::string& key_metadata() const { return metadata_.key_metadata; } + void WriteTo(::arrow::io::OutputStream* dst) const { ThriftSerializer serializer; - serializer.Serialize(metadata_.get(), dst); + serializer.Serialize(&metadata_, dst); } private: friend FileMetaDataBuilder; - std::unique_ptr<format::FileCryptoMetaData> metadata_; + format::FileCryptoMetaData metadata_; uint32_t metadata_len_; }; @@ -900,14 +941,16 @@ const std::string& FileCryptoMetaData::key_metadata() const { } std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make( - const uint8_t* serialized_metadata, uint32_t* metadata_len) { + const uint8_t* serialized_metadata, uint32_t* metadata_len, + const ReaderProperties& properties) { return std::shared_ptr<FileCryptoMetaData>( - new FileCryptoMetaData(serialized_metadata, metadata_len)); + new FileCryptoMetaData(serialized_metadata, metadata_len, properties)); } FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata, - uint32_t* metadata_len) - : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len)) {} + uint32_t* metadata_len, + const ReaderProperties& properties) + : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {} FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {} @@ -1754,10 +1797,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { crypto_metadata_->__set_key_metadata(key_metadata); } - std::unique_ptr<FileCryptoMetaData> file_crypto_metadata = - std::unique_ptr<FileCryptoMetaData>(new FileCryptoMetaData()); - file_crypto_metadata->impl_->metadata_ = std::move(crypto_metadata_); - + std::unique_ptr<FileCryptoMetaData> file_crypto_metadata(new FileCryptoMetaData()); + file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_); return file_crypto_metadata; } diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 3dd936d90e..89dca5667b 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -121,8 +121,17 @@ struct PageEncodingStats { class PARQUET_EXPORT ColumnChunkMetaData { public: // API convenience to get a MetaData accessor + + ARROW_DEPRECATED("Use the ReaderProperties-taking overload") static std::unique_ptr<ColumnChunkMetaData> Make( const void* metadata, const ColumnDescriptor* descr, + const ApplicationVersion* writer_version, int16_t row_group_ordinal = -1, + int16_t column_ordinal = -1, + std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); + + static std::unique_ptr<ColumnChunkMetaData> Make( + const void* metadata, const ColumnDescriptor* descr, + const ReaderProperties& properties = default_reader_properties(), const ApplicationVersion* writer_version = NULLPTR, int16_t row_group_ordinal = -1, int16_t column_ordinal = -1, std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); @@ -164,7 +173,8 @@ class PARQUET_EXPORT ColumnChunkMetaData { private: explicit ColumnChunkMetaData( const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, - int16_t column_ordinal, const ApplicationVersion* writer_version = NULLPTR, + int16_t column_ordinal, const ReaderProperties& properties, + const ApplicationVersion* writer_version = NULLPTR, std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); // PIMPL Idiom class ColumnChunkMetaDataImpl; @@ -174,9 +184,16 @@ class PARQUET_EXPORT ColumnChunkMetaData { /// \brief RowGroupMetaData is a proxy around format::RowGroupMetaData. class PARQUET_EXPORT RowGroupMetaData { public: + ARROW_DEPRECATED("Use the ReaderProperties-taking overload") + static std::unique_ptr<RowGroupMetaData> Make( + const void* metadata, const SchemaDescriptor* schema, + const ApplicationVersion* writer_version, + std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); + /// \brief Create a RowGroupMetaData from a serialized thrift message. static std::unique_ptr<RowGroupMetaData> Make( const void* metadata, const SchemaDescriptor* schema, + const ReaderProperties& properties = default_reader_properties(), const ApplicationVersion* writer_version = NULLPTR, std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); @@ -225,6 +242,7 @@ class PARQUET_EXPORT RowGroupMetaData { private: explicit RowGroupMetaData( const void* metadata, const SchemaDescriptor* schema, + const ReaderProperties& properties, const ApplicationVersion* writer_version = NULLPTR, std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); // PIMPL Idiom @@ -237,9 +255,15 @@ class FileMetaDataBuilder; /// \brief FileMetaData is a proxy around format::FileMetaData. class PARQUET_EXPORT FileMetaData { public: + ARROW_DEPRECATED("Use the ReaderProperties-taking overload") + static std::shared_ptr<FileMetaData> Make( + const void* serialized_metadata, uint32_t* inout_metadata_len, + std::shared_ptr<InternalFileDecryptor> file_decryptor); + /// \brief Create a FileMetaData from a serialized thrift message. static std::shared_ptr<FileMetaData> Make( const void* serialized_metadata, uint32_t* inout_metadata_len, + const ReaderProperties& properties = default_reader_properties(), std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); ~FileMetaData(); @@ -350,6 +374,7 @@ class PARQUET_EXPORT FileMetaData { friend class SerializedFile; explicit FileMetaData(const void* serialized_metadata, uint32_t* metadata_len, + const ReaderProperties& properties, std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR); void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor); @@ -363,8 +388,9 @@ class PARQUET_EXPORT FileMetaData { class PARQUET_EXPORT FileCryptoMetaData { public: // API convenience to get a MetaData accessor - static std::shared_ptr<FileCryptoMetaData> Make(const uint8_t* serialized_metadata, - uint32_t* metadata_len); + static std::shared_ptr<FileCryptoMetaData> Make( + const uint8_t* serialized_metadata, uint32_t* metadata_len, + const ReaderProperties& properties = default_reader_properties()); ~FileCryptoMetaData(); EncryptionAlgorithm encryption_algorithm() const; @@ -374,7 +400,8 @@ class PARQUET_EXPORT FileCryptoMetaData { private: friend FileMetaDataBuilder; - FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len); + FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len, + const ReaderProperties& properties); // PIMPL Idiom FileCryptoMetaData(); diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5c81c75357..1d5c360cc1 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -49,6 +49,12 @@ enum class ParquetDataPageVersion { V1, V2 }; /// Align the default buffer size to a small multiple of a page size. constexpr int64_t kDefaultBufferSize = 4096 * 4; +constexpr int32_t kDefaultThriftStringSizeLimit = 100 * 1000 * 1000; +// Structs in the thrift definition are relatively large (at least 300 bytes). +// This limits total memory to the same order of magnitude as +// kDefaultStringSizeLimit. +constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 1000; + class PARQUET_EXPORT ReaderProperties { public: explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool()) @@ -73,6 +79,14 @@ class PARQUET_EXPORT ReaderProperties { int64_t buffer_size() const { return buffer_size_; } void set_buffer_size(int64_t size) { buffer_size_ = size; } + int32_t thrift_string_size_limit() const { return thrift_string_size_limit_; } + void set_thrift_string_size_limit(int32_t size) { thrift_string_size_limit_ = size; } + + int32_t thrift_container_size_limit() const { return thrift_container_size_limit_; } + void set_thrift_container_size_limit(int32_t size) { + thrift_container_size_limit_ = size; + } + void file_decryption_properties(std::shared_ptr<FileDecryptionProperties> decryption) { file_decryption_properties_ = std::move(decryption); } @@ -84,6 +98,8 @@ class PARQUET_EXPORT ReaderProperties { private: MemoryPool* pool_; int64_t buffer_size_ = kDefaultBufferSize; + int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit; + int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit; bool buffered_stream_enabled_ = false; std::shared_ptr<FileDecryptionProperties> file_decryption_properties_; }; diff --git a/cpp/src/parquet/statistics_test.cc b/cpp/src/parquet/statistics_test.cc index e678598919..03da895380 100644 --- a/cpp/src/parquet/statistics_test.cc +++ b/cpp/src/parquet/statistics_test.cc @@ -595,8 +595,8 @@ void AssertStatsSet(const ApplicationVersion& version, std::shared_ptr<parquet::WriterProperties> props, const ColumnDescriptor* column, bool expected_is_set) { auto metadata_builder = ColumnChunkMetaDataBuilder::Make(props, column); - auto column_chunk = - ColumnChunkMetaData::Make(metadata_builder->contents(), column, &version); + auto column_chunk = ColumnChunkMetaData::Make(metadata_builder->contents(), column, + default_reader_properties(), &version); EncodedStatistics stats; stats.set_is_signed(false); metadata_builder->SetStatistics(stats); diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index 17248f3bdc..3c74dfc07b 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -42,6 +42,7 @@ #include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" #include "parquet/platform.h" +#include "parquet/properties.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -350,74 +351,84 @@ static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryptio using ThriftBuffer = apache::thrift::transport::TMemoryBuffer; -// On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size -// limit (ARROW-13655). If we wanted to protect against huge messages, we could -// do it ourselves since we know the message size up front. +class ThriftDeserializer { + public: + explicit ThriftDeserializer(const ReaderProperties& properties) + : ThriftDeserializer(properties.thrift_string_size_limit(), + properties.thrift_container_size_limit()) {} + + ThriftDeserializer(int32_t string_size_limit, int32_t container_size_limit) + : string_size_limit_(string_size_limit), + container_size_limit_(container_size_limit) {} -inline std::shared_ptr<ThriftBuffer> CreateReadOnlyMemoryBuffer(uint8_t* buf, - uint32_t len) { + // Deserialize a thrift message from buf/len. buf/len must at least contain + // all the bytes needed to store the thrift message. On return, len will be + // set to the actual length of the header. + template <class T> + void DeserializeMessage(const uint8_t* buf, uint32_t* len, T* deserialized_msg, + const std::shared_ptr<Decryptor>& decryptor = NULLPTR) { + if (decryptor == NULLPTR) { + // thrift message is not encrypted + DeserializeUnencryptedMessage(buf, len, deserialized_msg); + } else { + // thrift message is encrypted + uint32_t clen; + clen = *len; + // decrypt + auto decrypted_buffer = std::static_pointer_cast<ResizableBuffer>( + AllocateBuffer(decryptor->pool(), + static_cast<int64_t>(clen - decryptor->CiphertextSizeDelta()))); + const uint8_t* cipher_buf = buf; + uint32_t decrypted_buffer_len = + decryptor->Decrypt(cipher_buf, 0, decrypted_buffer->mutable_data()); + if (decrypted_buffer_len <= 0) { + throw ParquetException("Couldn't decrypt buffer\n"); + } + *len = decrypted_buffer_len + decryptor->CiphertextSizeDelta(); + DeserializeUnencryptedMessage(decrypted_buffer->data(), &decrypted_buffer_len, + deserialized_msg); + } + } + + private: + // On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size + // limit (ARROW-13655). If we wanted to protect against huge messages, we could + // do it ourselves since we know the message size up front. + std::shared_ptr<ThriftBuffer> CreateReadOnlyMemoryBuffer(uint8_t* buf, uint32_t len) { #if PARQUET_THRIFT_VERSION_MAJOR > 0 || PARQUET_THRIFT_VERSION_MINOR >= 14 - auto conf = std::make_shared<apache::thrift::TConfiguration>(); - conf->setMaxMessageSize(std::numeric_limits<int>::max()); - return std::shared_ptr<ThriftBuffer>( - new ThriftBuffer(buf, len, ThriftBuffer::OBSERVE, conf)); + auto conf = std::make_shared<apache::thrift::TConfiguration>(); + conf->setMaxMessageSize(std::numeric_limits<int>::max()); + return std::shared_ptr<ThriftBuffer>( + new ThriftBuffer(buf, len, ThriftBuffer::OBSERVE, conf)); #else - return std::shared_ptr<ThriftBuffer>(new ThriftBuffer(buf, len)); + return std::shared_ptr<ThriftBuffer>(new ThriftBuffer(buf, len)); #endif -} - -template <class T> -inline void DeserializeThriftUnencryptedMsg(const uint8_t* buf, uint32_t* len, - T* deserialized_msg) { - // Deserialize msg bytes into c++ thrift msg using memory transport. - auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast<uint8_t*>(buf), *len); - apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory; - // Protect against CPU and memory bombs - tproto_factory.setStringSizeLimit(100 * 1000 * 1000); - // Structs in the thrift definition are relatively large (at least 300 bytes). - // This limits total memory to the same order of magnitude as stringSize. - tproto_factory.setContainerSizeLimit(1000 * 1000); - std::shared_ptr<apache::thrift::protocol::TProtocol> tproto = // - tproto_factory.getProtocol(tmem_transport); - try { - deserialized_msg->read(tproto.get()); - } catch (std::exception& e) { - std::stringstream ss; - ss << "Couldn't deserialize thrift: " << e.what() << "\n"; - throw ParquetException(ss.str()); } - uint32_t bytes_left = tmem_transport->available_read(); - *len = *len - bytes_left; -} -// Deserialize a thrift message from buf/len. buf/len must at least contain -// all the bytes needed to store the thrift message. On return, len will be -// set to the actual length of the header. -template <class T> -inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg, - const std::shared_ptr<Decryptor>& decryptor = NULLPTR) { - // thrift message is not encrypted - if (decryptor == NULLPTR) { - DeserializeThriftUnencryptedMsg(buf, len, deserialized_msg); - } else { // thrift message is encrypted - uint32_t clen; - clen = *len; - // decrypt - std::shared_ptr<ResizableBuffer> decrypted_buffer = - std::static_pointer_cast<ResizableBuffer>(AllocateBuffer( - decryptor->pool(), - static_cast<int64_t>(clen - decryptor->CiphertextSizeDelta()))); - const uint8_t* cipher_buf = buf; - uint32_t decrypted_buffer_len = - decryptor->Decrypt(cipher_buf, 0, decrypted_buffer->mutable_data()); - if (decrypted_buffer_len <= 0) { - throw ParquetException("Couldn't decrypt buffer\n"); + template <class T> + void DeserializeUnencryptedMessage(const uint8_t* buf, uint32_t* len, + T* deserialized_msg) { + // Deserialize msg bytes into c++ thrift msg using memory transport. + auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast<uint8_t*>(buf), *len); + apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory; + // Protect against CPU and memory bombs + tproto_factory.setStringSizeLimit(string_size_limit_); + tproto_factory.setContainerSizeLimit(container_size_limit_); + auto tproto = tproto_factory.getProtocol(tmem_transport); + try { + deserialized_msg->read(tproto.get()); + } catch (std::exception& e) { + std::stringstream ss; + ss << "Couldn't deserialize thrift: " << e.what() << "\n"; + throw ParquetException(ss.str()); } - *len = decrypted_buffer_len + decryptor->CiphertextSizeDelta(); - DeserializeThriftMsg(decrypted_buffer->data(), &decrypted_buffer_len, - deserialized_msg); + uint32_t bytes_left = tmem_transport->available_read(); + *len = *len - bytes_left; } -} + + const int32_t string_size_limit_; + const int32_t container_size_limit_; +}; /// Utility class to serialize thrift objects to a binary format. This object /// should be reused if possible to reuse the underlying memory. @@ -478,10 +489,9 @@ class ThriftSerializer { int64_t SerializeEncryptedObj(ArrowOutputStream* out, uint8_t* out_buffer, uint32_t out_length, const std::shared_ptr<Encryptor>& encryptor) { - std::shared_ptr<ResizableBuffer> cipher_buffer = - std::static_pointer_cast<ResizableBuffer>(AllocateBuffer( - encryptor->pool(), - static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length))); + auto cipher_buffer = std::static_pointer_cast<ResizableBuffer>(AllocateBuffer( + encryptor->pool(), + static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length))); int cipher_buffer_len = encryptor->Encrypt(out_buffer, out_length, cipher_buffer->mutable_data()); diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 7b91d4c2c7..684253fff3 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -598,6 +598,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): If enabled, pre-buffer the raw Parquet data instead of issuing one read per column chunk. This can improve performance on high-latency filesystems. + thrift_string_size_limit : int, default None + If not None, override the maximum total string size allocated + when decoding Thrift structures. The default limit should be + sufficient for most Parquet files. + thrift_container_size_limit : int, default None + If not None, override the maximum total size of containers allocated + when decoding Thrift structures. The default limit should be + sufficient for most Parquet files. """ cdef: @@ -606,14 +614,20 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): # Avoid mistakingly creating attributes __slots__ = () - def __init__(self, bint use_buffered_stream=False, + def __init__(self, *, bint use_buffered_stream=False, buffer_size=8192, - bint pre_buffer=False): + bint pre_buffer=False, + thrift_string_size_limit=None, + thrift_container_size_limit=None): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream self.buffer_size = buffer_size self.pre_buffer = pre_buffer + if thrift_string_size_limit is not None: + self.thrift_string_size_limit = thrift_string_size_limit + if thrift_container_size_limit is not None: + self.thrift_container_size_limit = thrift_container_size_limit cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -654,17 +668,49 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): def pre_buffer(self, bint pre_buffer): self.arrow_reader_properties().set_pre_buffer(pre_buffer) + @property + def thrift_string_size_limit(self): + return self.reader_properties().thrift_string_size_limit() + + @thrift_string_size_limit.setter + def thrift_string_size_limit(self, size): + if size <= 0: + raise ValueError("size must be larger than zero") + self.reader_properties().set_thrift_string_size_limit(size) + + @property + def thrift_container_size_limit(self): + return self.reader_properties().thrift_container_size_limit() + + @thrift_container_size_limit.setter + def thrift_container_size_limit(self, size): + if size <= 0: + raise ValueError("size must be larger than zero") + self.reader_properties().set_thrift_container_size_limit(size) + def equals(self, ParquetFragmentScanOptions other): - return ( - self.use_buffered_stream == other.use_buffered_stream and - self.buffer_size == other.buffer_size and - self.pre_buffer == other.pre_buffer - ) + attrs = ( + self.use_buffered_stream, self.buffer_size, self.pre_buffer, + self.thrift_string_size_limit, self.thrift_container_size_limit) + other_attrs = ( + other.use_buffered_stream, other.buffer_size, other.pre_buffer, + other.thrift_string_size_limit, + other.thrift_container_size_limit) + return attrs == other_attrs + + @classmethod + def _reconstruct(cls, kwargs): + return cls(**kwargs) def __reduce__(self): - return ParquetFragmentScanOptions, ( - self.use_buffered_stream, self.buffer_size, self.pre_buffer + kwargs = dict( + use_buffered_stream=self.use_buffered_stream, + buffer_size=self.buffer_size, + pre_buffer=self.pre_buffer, + thrift_string_size_limit=self.thrift_string_size_limit, + thrift_container_size_limit=self.thrift_container_size_limit, ) + return type(self)._reconstruct, (kwargs,) cdef class ParquetFactoryOptions(_Weakrefable): diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 98857d5b48..29b625df50 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -359,8 +359,16 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: c_bool is_buffered_stream_enabled() const void enable_buffered_stream() void disable_buffered_stream() + void set_buffer_size(int64_t buf_size) int64_t buffer_size() const + + void set_thrift_string_size_limit(int32_t size) + int32_t thrift_string_size_limit() const + + void set_thrift_container_size_limit(int32_t size) + int32_t thrift_container_size_limit() const + void file_decryption_properties(shared_ptr[CFileDecryptionProperties] decryption) shared_ptr[CFileDecryptionProperties] file_decryption_properties() \ diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 8812ab1059..45f89e8b91 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1167,11 +1167,13 @@ cdef class ParquetReader(_Weakrefable): self.pool = maybe_unbox_memory_pool(memory_pool) self._metadata = None - def open(self, object source not None, bint use_memory_map=True, + def open(self, object source not None, *, bint use_memory_map=True, read_dictionary=None, FileMetaData metadata=None, int buffer_size=0, bint pre_buffer=False, coerce_int96_timestamp_unit=None, - FileDecryptionProperties decryption_properties=None): + FileDecryptionProperties decryption_properties=None, + thrift_string_size_limit=None, + thrift_container_size_limit=None): cdef: shared_ptr[CRandomAccessFile] rd_handle shared_ptr[CFileMetaData] c_metadata @@ -1193,6 +1195,18 @@ cdef class ParquetReader(_Weakrefable): else: raise ValueError('Buffer size must be larger than zero') + if thrift_string_size_limit is not None: + if thrift_string_size_limit <= 0: + raise ValueError("thrift_string_size_limit " + "must be larger than zero") + properties.set_thrift_string_size_limit(thrift_string_size_limit) + if thrift_container_size_limit is not None: + if thrift_container_size_limit <= 0: + raise ValueError("thrift_container_size_limit " + "must be larger than zero") + properties.set_thrift_container_size_limit( + thrift_container_size_limit) + if decryption_properties is not None: properties.file_decryption_properties( decryption_properties.unwrap()) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 7d1c8a4308..cbdf51b238 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -226,6 +226,14 @@ class ParquetFile: in nanoseconds. decryption_properties : FileDecryptionProperties, default None File decryption properties for Parquet Modular Encryption. + thrift_string_size_limit : int, default None + If not None, override the maximum total string size allocated + when decoding Thrift structures. The default limit should be + sufficient for most Parquet files. + thrift_container_size_limit : int, default None + If not None, override the maximum total size of containers allocated + when decoding Thrift structures. The default limit should be + sufficient for most Parquet files. Examples -------- @@ -269,17 +277,20 @@ class ParquetFile: [0,1,2,3,4,5]] """ - def __init__(self, source, metadata=None, common_metadata=None, + def __init__(self, source, *, metadata=None, common_metadata=None, read_dictionary=None, memory_map=False, buffer_size=0, pre_buffer=False, coerce_int96_timestamp_unit=None, - decryption_properties=None): + decryption_properties=None, thrift_string_size_limit=None, + thrift_container_size_limit=None): self.reader = ParquetReader() self.reader.open( source, use_memory_map=memory_map, buffer_size=buffer_size, pre_buffer=pre_buffer, read_dictionary=read_dictionary, metadata=metadata, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, - decryption_properties=decryption_properties + decryption_properties=decryption_properties, + thrift_string_size_limit=thrift_string_size_limit, + thrift_container_size_limit=thrift_container_size_limit, ) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -1650,7 +1661,9 @@ Examples filters=None, metadata_nthreads=None, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", use_legacy_dataset=None, pre_buffer=True, - coerce_int96_timestamp_unit=None): + coerce_int96_timestamp_unit=None, + thrift_string_size_limit=None, + thrift_container_size_limit=None): if use_legacy_dataset is None: # if a new filesystem is passed -> default to new implementation if isinstance(filesystem, FileSystem): @@ -1673,7 +1686,9 @@ Examples schema=schema, metadata=metadata, split_row_groups=split_row_groups, validate_schema=validate_schema, - metadata_nthreads=metadata_nthreads + metadata_nthreads=metadata_nthreads, + thrift_string_size_limit=thrift_string_size_limit, + thrift_container_size_limit=thrift_container_size_limit, ) self = object.__new__(cls) return self @@ -1683,7 +1698,9 @@ Examples filters=None, metadata_nthreads=None, read_dictionary=None, memory_map=False, buffer_size=0, partitioning="hive", use_legacy_dataset=True, pre_buffer=True, - coerce_int96_timestamp_unit=None): + coerce_int96_timestamp_unit=None, + thrift_string_size_limit=None, + thrift_container_size_limit=None): if partitioning != "hive": raise ValueError( 'Only "hive" for hive-like partitioning is supported when ' @@ -2258,11 +2275,13 @@ class _ParquetDatasetV2: 1 4 Horse 2022 """ - def __init__(self, path_or_paths, filesystem=None, filters=None, + def __init__(self, path_or_paths, filesystem=None, *, filters=None, partitioning="hive", read_dictionary=None, buffer_size=None, memory_map=False, ignore_prefixes=None, pre_buffer=True, coerce_int96_timestamp_unit=None, schema=None, - decryption_properties=None, **kwargs): + decryption_properties=None, thrift_string_size_limit=None, + thrift_container_size_limit=None, + **kwargs): import pyarrow.dataset as ds # Raise error for not supported keywords @@ -2277,7 +2296,9 @@ class _ParquetDatasetV2: # map format arguments read_options = { "pre_buffer": pre_buffer, - "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit + "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit, + "thrift_string_size_limit": thrift_string_size_limit, + "thrift_container_size_limit": thrift_container_size_limit, } if buffer_size: read_options.update(use_buffered_stream=True, @@ -2636,6 +2657,15 @@ decryption_properties : FileDecryptionProperties or None File-level decryption properties. The decryption properties can be created using ``CryptoFactory.file_decryption_properties()``. +thrift_string_size_limit : int, default None + If not None, override the maximum total string size allocated + when decoding Thrift structures. The default limit should be + sufficient for most Parquet files. +thrift_container_size_limit : int, default None + If not None, override the maximum total size of containers allocated + when decoding Thrift structures. The default limit should be + sufficient for most Parquet files. + Returns ------- {2} @@ -2722,13 +2752,14 @@ Read data from a single Parquet file: """ -def read_table(source, columns=None, use_threads=True, metadata=None, +def read_table(source, *, columns=None, use_threads=True, metadata=None, schema=None, use_pandas_metadata=False, memory_map=False, read_dictionary=None, filesystem=None, filters=None, buffer_size=0, partitioning="hive", use_legacy_dataset=False, ignore_prefixes=None, pre_buffer=True, coerce_int96_timestamp_unit=None, - decryption_properties=None): + decryption_properties=None, thrift_string_size_limit=None, + thrift_container_size_limit=None): if not use_legacy_dataset: if metadata is not None: raise ValueError( @@ -2749,7 +2780,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None, filters=filters, ignore_prefixes=ignore_prefixes, pre_buffer=pre_buffer, - coerce_int96_timestamp_unit=coerce_int96_timestamp_unit + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, + thrift_string_size_limit=thrift_string_size_limit, + thrift_container_size_limit=thrift_container_size_limit, ) except ImportError: # fall back on ParquetFile for simple cases when pyarrow.dataset @@ -2778,7 +2811,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None, memory_map=memory_map, buffer_size=buffer_size, pre_buffer=pre_buffer, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, - decryption_properties=decryption_properties + decryption_properties=decryption_properties, + thrift_string_size_limit=thrift_string_size_limit, + thrift_container_size_limit=thrift_container_size_limit, ) return dataset.read(columns=columns, use_threads=use_threads, diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 9d31bdeb60..62ea19d422 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -802,3 +802,30 @@ def test_read_table_legacy_deprecated(tempdir): FutureWarning, match="Passing 'use_legacy_dataset=True'" ): pq.read_table(path, use_legacy_dataset=True) + + +def test_thrift_size_limits(tempdir): + path = tempdir / 'largethrift.parquet' + + array = pa.array(list(range(10))) + num_cols = 1000 + table = pa.table( + [array] * num_cols, + names=[f'some_long_column_name_{i}' for i in range(num_cols)]) + pq.write_table(table, path) + + with pytest.raises( + OSError, + match="Couldn't deserialize thrift:.*Exceeded size limit"): + pq.read_table(path, thrift_string_size_limit=50 * num_cols) + with pytest.raises( + OSError, + match="Couldn't deserialize thrift:.*Exceeded size limit"): + pq.read_table(path, thrift_container_size_limit=num_cols) + + got = pq.read_table(path, thrift_string_size_limit=100 * num_cols) + assert got == table + got = pq.read_table(path, thrift_container_size_limit=2 * num_cols) + assert got == table + got = pq.read_table(path) + assert got == table diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d2210c4b6c..81fa7d1245 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -748,10 +748,15 @@ def test_parquet_scan_options(): opts3 = ds.ParquetFragmentScanOptions( buffer_size=2**13, use_buffered_stream=True) opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True) + opts5 = ds.ParquetFragmentScanOptions( + thrift_string_size_limit=123456, + thrift_container_size_limit=987654,) assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 assert opts1.pre_buffer is False + assert opts1.thrift_string_size_limit == 100_000_000 # default in C++ + assert opts1.thrift_container_size_limit == 1_000_000 # default in C++ assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 @@ -765,10 +770,14 @@ def test_parquet_scan_options(): assert opts4.buffer_size == 2**13 assert opts4.pre_buffer is True + assert opts5.thrift_string_size_limit == 123456 + assert opts5.thrift_container_size_limit == 987654 + assert opts1 == opts1 assert opts1 != opts2 assert opts2 != opts3 assert opts3 != opts4 + assert opts5 != opts1 def test_file_format_pickling(): @@ -795,6 +804,8 @@ def test_file_format_pickling(): ds.ParquetFileFormat( use_buffered_stream=True, buffer_size=4096, + thrift_string_size_limit=123, + thrift_container_size_limit=456, ), ])