wgtmac commented on code in PR #33776:
URL: https://github.com/apache/arrow/pull/33776#discussion_r1088407732
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
Review Comment:
```suggestion
static ::arrow::Status ValidateBloomFilterHeader(
```
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+ const format::BloomFilterHeader& header) {
+ if (!header.algorithm.__isset.BLOCK) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+ return ::arrow::Status::Invalid(ss.str());
+ }
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&len));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.hash.__isset.XXHASH) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&hash));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.compression.__isset.UNCOMPRESSED) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter compression: " << header.compression <<
".";
+ return ::arrow::Status::Invalid(ss.str());
}
- if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
- throw ParquetException("Unsupported hash strategy");
+
+ if (header.numBytes <= 0 ||
+ static_cast<uint32_t>(header.numBytes) >
BloomFilter::kMaximumBloomFilterBytes) {
+ std::stringstream ss;
+ ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be
in range ("
+ << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&algorithm));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
+ ReaderProperties readerProperties;
Review Comment:
This should be an input parameter.
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
Review Comment:
```suggestion
static constexpr uint32_t kMaxBloomFilterHeaderSize = 1024;
```
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+ const format::BloomFilterHeader& header) {
+ if (!header.algorithm.__isset.BLOCK) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+ return ::arrow::Status::Invalid(ss.str());
+ }
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&len));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.hash.__isset.XXHASH) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&hash));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.compression.__isset.UNCOMPRESSED) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter compression: " << header.compression <<
".";
+ return ::arrow::Status::Invalid(ss.str());
}
- if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
- throw ParquetException("Unsupported hash strategy");
+
+ if (header.numBytes <= 0 ||
+ static_cast<uint32_t>(header.numBytes) >
BloomFilter::kMaximumBloomFilterBytes) {
+ std::stringstream ss;
+ ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be
in range ("
+ << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&algorithm));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
+ ReaderProperties readerProperties;
+ uint32_t length = HEADER_SIZE_GUESS;
+
Review Comment:
Remove blank line
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
Review Comment:
```suggestion
static constexpr uint32_t kHeaderSizeGuess = 32;
```
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+ const format::BloomFilterHeader& header) {
+ if (!header.algorithm.__isset.BLOCK) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+ return ::arrow::Status::Invalid(ss.str());
+ }
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&len));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.hash.__isset.XXHASH) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&hash));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.compression.__isset.UNCOMPRESSED) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter compression: " << header.compression <<
".";
+ return ::arrow::Status::Invalid(ss.str());
}
- if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
- throw ParquetException("Unsupported hash strategy");
+
+ if (header.numBytes <= 0 ||
+ static_cast<uint32_t>(header.numBytes) >
BloomFilter::kMaximumBloomFilterBytes) {
+ std::stringstream ss;
+ ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be
in range ("
+ << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&algorithm));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
+ ReaderProperties readerProperties;
+ uint32_t length = HEADER_SIZE_GUESS;
+
+ uint32_t headerSize = 0;
+
+ ThriftDeserializer deserializer(readerProperties);
+ format::BloomFilterHeader header;
+
+ // Read and deserialize bloom filter header
+ while (true) {
+ PARQUET_ASSIGN_OR_THROW(auto sv, input->Peek(length));
+ // This gets used, then set by DeserializeThriftMsg
+ headerSize = static_cast<uint32_t>(sv.size());
+ try {
+ deserializer.DeserializeMessage(reinterpret_cast<const
uint8_t*>(sv.data()),
+ &headerSize, &header);
+ break;
+ } catch (std::exception& e) {
+ // Failed to deserialize. Double the allowed page header size and try
again
+ length *= 2;
+ if (length > MAX_BLOOM_FILTER_HEADER_SIZE) {
+ std::stringstream ss;
+ ss << "Deserializing bloom filter header failed.\n" << e.what();
+ throw ParquetException(ss.str());
+ }
+ }
}
- if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
- throw ParquetException("Unsupported Bloom filter algorithm");
+
+ // Throw if the header is invalid
+ auto status = validateBloomFilterHeader(header);
+ if (!status.ok()) {
+ throw ParquetException(status.ToString());
}
- BlockSplitBloomFilter bloom_filter;
+ // Read remaining data of bitset
+ PARQUET_THROW_NOT_OK(input->Advance(headerSize));
+ PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(header.numBytes));
- PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
- bloom_filter.Init(buffer->data(), len);
- return bloom_filter;
+ BlockSplitBloomFilter bloomFilter;
+ bloomFilter.Init(buffer->data(), header.numBytes);
+ return bloomFilter;
}
void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
DCHECK(sink != nullptr);
- PARQUET_THROW_NOT_OK(
- sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_),
sizeof(num_bytes_)));
- PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<const
uint8_t*>(&hash_strategy_),
- sizeof(hash_strategy_)));
- PARQUET_THROW_NOT_OK(
- sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_),
sizeof(algorithm_)));
- PARQUET_THROW_NOT_OK(sink->Write(data_->mutable_data(), num_bytes_));
+ format::BloomFilterHeader header;
+ if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+ throw ParquetException("BloomFilter not support Algorithm other than
BLOCK");
+ }
+ header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+ if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+ throw ParquetException("BloomFilter not support Hash other than XXHASH");
Review Comment:
```suggestion
throw ParquetException("BloomFilter does not support Hash other than
XXHASH");
```
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+ const format::BloomFilterHeader& header) {
+ if (!header.algorithm.__isset.BLOCK) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+ return ::arrow::Status::Invalid(ss.str());
+ }
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&len));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.hash.__isset.XXHASH) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&hash));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.compression.__isset.UNCOMPRESSED) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter compression: " << header.compression <<
".";
+ return ::arrow::Status::Invalid(ss.str());
}
- if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
- throw ParquetException("Unsupported hash strategy");
+
+ if (header.numBytes <= 0 ||
+ static_cast<uint32_t>(header.numBytes) >
BloomFilter::kMaximumBloomFilterBytes) {
+ std::stringstream ss;
+ ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be
in range ("
+ << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&algorithm));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
+ ReaderProperties readerProperties;
+ uint32_t length = HEADER_SIZE_GUESS;
+
+ uint32_t headerSize = 0;
+
+ ThriftDeserializer deserializer(readerProperties);
+ format::BloomFilterHeader header;
+
+ // Read and deserialize bloom filter header
+ while (true) {
+ PARQUET_ASSIGN_OR_THROW(auto sv, input->Peek(length));
+ // This gets used, then set by DeserializeThriftMsg
+ headerSize = static_cast<uint32_t>(sv.size());
+ try {
+ deserializer.DeserializeMessage(reinterpret_cast<const
uint8_t*>(sv.data()),
+ &headerSize, &header);
+ break;
+ } catch (std::exception& e) {
+ // Failed to deserialize. Double the allowed page header size and try
again
+ length *= 2;
+ if (length > MAX_BLOOM_FILTER_HEADER_SIZE) {
+ std::stringstream ss;
+ ss << "Deserializing bloom filter header failed.\n" << e.what();
+ throw ParquetException(ss.str());
+ }
+ }
}
- if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
- throw ParquetException("Unsupported Bloom filter algorithm");
+
+ // Throw if the header is invalid
+ auto status = validateBloomFilterHeader(header);
+ if (!status.ok()) {
+ throw ParquetException(status.ToString());
}
- BlockSplitBloomFilter bloom_filter;
+ // Read remaining data of bitset
+ PARQUET_THROW_NOT_OK(input->Advance(headerSize));
+ PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(header.numBytes));
- PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
- bloom_filter.Init(buffer->data(), len);
- return bloom_filter;
+ BlockSplitBloomFilter bloomFilter;
+ bloomFilter.Init(buffer->data(), header.numBytes);
+ return bloomFilter;
}
void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
DCHECK(sink != nullptr);
- PARQUET_THROW_NOT_OK(
- sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_),
sizeof(num_bytes_)));
- PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<const
uint8_t*>(&hash_strategy_),
- sizeof(hash_strategy_)));
- PARQUET_THROW_NOT_OK(
- sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_),
sizeof(algorithm_)));
- PARQUET_THROW_NOT_OK(sink->Write(data_->mutable_data(), num_bytes_));
+ format::BloomFilterHeader header;
+ if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+ throw ParquetException("BloomFilter not support Algorithm other than
BLOCK");
+ }
+ header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+ if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+ throw ParquetException("BloomFilter not support Hash other than XXHASH");
+ }
+ header.hash.__set_XXHASH(format::XxHash());
+ if (ARROW_PREDICT_FALSE(compression_strategy_ !=
CompressionStrategy::UNCOMPRESSED)) {
+ throw ParquetException("BloomFilter not support Compression other than
UNCOMPRESSED");
Review Comment:
```suggestion
throw ParquetException("BloomFilter does not support Compression other
than UNCOMPRESSED");
```
##########
cpp/src/parquet/xxhasher.h:
##########
@@ -44,9 +38,7 @@ class PARQUET_EXPORT MurmurHash3 : public Hasher {
uint64_t Hash(const FLBA* val, uint32_t len) const override;
private:
- // Default seed for hash which comes from Bloom filter in parquet-mr, it is
generated
- // by System.nanoTime() of java.
- static constexpr int DEFAULT_SEED = 1361930890;
+ static constexpr int DEFAULT_SEED = 0;
Review Comment:
```suggestion
static constexpr int kDefaultSeed = 0;
```
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
Review Comment:
Usually we use capitalized initials.
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset,
uint32_t num_bytes) {
PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
memcpy(data_->mutable_data(), bitset, num_bytes_);
- this->hasher_.reset(new MurmurHash3());
+ this->hasher_ = std::make_unique<XxHasher>();
}
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
- uint32_t len, hash, algorithm;
- int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+ const format::BloomFilterHeader& header) {
+ if (!header.algorithm.__isset.BLOCK) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+ return ::arrow::Status::Invalid(ss.str());
+ }
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&len));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.hash.__isset.XXHASH) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&hash));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ if (!header.compression.__isset.UNCOMPRESSED) {
+ std::stringstream ss;
+ ss << "Unsupported Bloom filter compression: " << header.compression <<
".";
+ return ::arrow::Status::Invalid(ss.str());
}
- if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
- throw ParquetException("Unsupported hash strategy");
+
+ if (header.numBytes <= 0 ||
+ static_cast<uint32_t>(header.numBytes) >
BloomFilter::kMaximumBloomFilterBytes) {
+ std::stringstream ss;
+ ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be
in range ("
+ << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+ return ::arrow::Status::Invalid(ss.str());
}
- PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t),
&algorithm));
- if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
- throw ParquetException("Failed to deserialize from input stream");
+ return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream*
input) {
+ ReaderProperties readerProperties;
+ uint32_t length = HEADER_SIZE_GUESS;
+
+ uint32_t headerSize = 0;
Review Comment:
```suggestion
uint32_t header_size = 0;
```
##########
cpp/src/parquet/parquet.thrift:
##########
@@ -585,15 +585,15 @@ union BloomFilterAlgorithm {
/** Hash strategy type annotation. xxHash is an extremely fast
non-cryptographic hash
* algorithm. It uses 64 bits version of xxHash.
**/
-struct XxHash {}
+struct XxHasher {}
Review Comment:
Should we update the generated cop as well? And is it good time to sync the
latest thrift file from upstream parquet-format? @wjones127 @pitrou
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]