mapleFU commented on code in PR #33776:
URL: https://github.com/apache/arrow/pull/33776#discussion_r1093381119
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,113 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&len));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+static ::arrow::Status ValidateBloomFilterHeader(
+ const format::BloomFilterHeader& header) {
+ if (!header.algorithm.__isset.BLOCK) {
+ return ::arrow::Status::Invalid(
+ "Unsupported Bloom filter algorithm: ", header.algorithm, ".");
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&hash));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
- }
- if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
- throw ParquetException("Unsupported hash strategy");
+ if (!header.hash.__isset.XXHASH) {
+ return ::arrow::Status::Invalid("Unsupported Bloom filter hash: ",
header.hash, ".");
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&algorithm));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.compression.__isset.UNCOMPRESSED) {
+ return ::arrow::Status::Invalid(
+ "Unsupported Bloom filter compression: ", header.compression, ".");
}
- if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
- throw ParquetException("Unsupported Bloom filter algorithm");
+
+ if (header.numBytes <= 0 ||
+ static_cast<uint32_t>(header.numBytes) >
BloomFilter::kMaximumBloomFilterBytes) {
+ std::stringstream ss;
+ ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be
in range ("
+ << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+ return ::arrow::Status::Invalid(ss.str());
}
- BlockSplitBloomFilter bloom_filter;
+ return ::arrow::Status::OK();
+}
- PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
- bloom_filter.Init(buffer->data(), len);
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
+ const ReaderProperties& properties, ArrowInputStream* input) {
+ // NOTE: we don't know the bloom filter header size upfront, and we can't
rely on
+ // InputStream::Peek() which isn't always implemented. Therefore, we must
first
+ // Read() with an upper bound estimate of the header size, then once we know
+ // the bloom filter data size, we can Read() the exact number of remaining
data bytes.
+ ThriftDeserializer deserializer(properties);
+ format::BloomFilterHeader header;
+
+ // Read and deserialize bloom filter header
+ PARQUET_ASSIGN_OR_THROW(auto header_buf,
input->Read(kBloomFilterHeaderSizeGuess));
+ // This gets used, then set by DeserializeThriftMsg
+ uint32_t header_size = static_cast<uint32_t>(header_buf->size());
+ try {
+ deserializer.DeserializeMessage(reinterpret_cast<const
uint8_t*>(header_buf->data()),
+ &header_size, &header);
+ DCHECK_LE(header_size, header_buf->size());
+ } catch (std::exception& e) {
+ std::stringstream ss;
+ ss << "Deserializing bloom filter header failed.\n" << e.what();
+ throw ParquetException(ss.str());
+ }
+ PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
+
+ const int32_t bloom_filter_size = header.numBytes;
+ if (bloom_filter_size + header_size <= header_buf->size()) {
+ // The bloom filter data is entirely contained in the buffer we just read
+ // => just return it.
+ BlockSplitBloomFilter bloom_filter;
+ bloom_filter.Init(header_buf->data() + header_size, bloom_filter_size);
+ return bloom_filter;
+ }
+ // We have read a part of the bloom filter already, copy it to the target
buffer
+ // and read the remaining part from the InputStream.
+ auto buffer = AllocateBuffer(properties.memory_pool(), bloom_filter_size);
+
+ const auto bloom_filter_bytes_in_header = header_buf->size() - header_size;
+ std::memcpy(buffer->mutable_data(), header_buf->data() + header_size,
Review Comment:
If `header_size == header_buf->size()`, here would call `memcpy(len = 0)`.
`len = 0` in memcpy is valid if `src` and `data` has valid value.
So I'm afraid that it will call memcpy with 0 on a dangerous address...
@pitrou
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]