wgtmac commented on code in PR #33776:
URL: https://github.com/apache/arrow/pull/33776#discussion_r1088407732


##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(

Review Comment:
   ```suggestion
   static ::arrow::Status ValidateBloomFilterHeader(
   ```



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+    return ::arrow::Status::Invalid(ss.str());
+  }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.hash.__isset.XXHASH) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter compression: " << header.compression << 
".";
+    return ::arrow::Status::Invalid(ss.str());
   }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > 
BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be 
in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
+  ReaderProperties readerProperties;

Review Comment:
   This should be an input parameter.



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;

Review Comment:
   ```suggestion
   static constexpr uint32_t kMaxBloomFilterHeaderSize = 1024;
   ```



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+    return ::arrow::Status::Invalid(ss.str());
+  }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.hash.__isset.XXHASH) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter compression: " << header.compression << 
".";
+    return ::arrow::Status::Invalid(ss.str());
   }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > 
BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be 
in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
+  ReaderProperties readerProperties;
+  uint32_t length = HEADER_SIZE_GUESS;
+

Review Comment:
   Remove blank line



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;

Review Comment:
   ```suggestion
   static constexpr uint32_t kHeaderSizeGuess = 32;
   ```



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+    return ::arrow::Status::Invalid(ss.str());
+  }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.hash.__isset.XXHASH) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter compression: " << header.compression << 
".";
+    return ::arrow::Status::Invalid(ss.str());
   }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > 
BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be 
in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
+  ReaderProperties readerProperties;
+  uint32_t length = HEADER_SIZE_GUESS;
+
+  uint32_t headerSize = 0;
+
+  ThriftDeserializer deserializer(readerProperties);
+  format::BloomFilterHeader header;
+
+  // Read and deserialize bloom filter header
+  while (true) {
+    PARQUET_ASSIGN_OR_THROW(auto sv, input->Peek(length));
+    // This gets used, then set by DeserializeThriftMsg
+    headerSize = static_cast<uint32_t>(sv.size());
+    try {
+      deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(sv.data()),
+                                      &headerSize, &header);
+      break;
+    } catch (std::exception& e) {
+      // Failed to deserialize. Double the allowed page header size and try 
again
+      length *= 2;
+      if (length > MAX_BLOOM_FILTER_HEADER_SIZE) {
+        std::stringstream ss;
+        ss << "Deserializing bloom filter header failed.\n" << e.what();
+        throw ParquetException(ss.str());
+      }
+    }
   }
-  if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
-    throw ParquetException("Unsupported Bloom filter algorithm");
+
+  // Throw if the header is invalid
+  auto status = validateBloomFilterHeader(header);
+  if (!status.ok()) {
+    throw ParquetException(status.ToString());
   }
 
-  BlockSplitBloomFilter bloom_filter;
+  // Read remaining data of bitset
+  PARQUET_THROW_NOT_OK(input->Advance(headerSize));
+  PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(header.numBytes));
 
-  PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
-  bloom_filter.Init(buffer->data(), len);
-  return bloom_filter;
+  BlockSplitBloomFilter bloomFilter;
+  bloomFilter.Init(buffer->data(), header.numBytes);
+  return bloomFilter;
 }
 
 void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
   DCHECK(sink != nullptr);
 
-  PARQUET_THROW_NOT_OK(
-      sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), 
sizeof(num_bytes_)));
-  PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<const 
uint8_t*>(&hash_strategy_),
-                                   sizeof(hash_strategy_)));
-  PARQUET_THROW_NOT_OK(
-      sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), 
sizeof(algorithm_)));
-  PARQUET_THROW_NOT_OK(sink->Write(data_->mutable_data(), num_bytes_));
+  format::BloomFilterHeader header;
+  if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+    throw ParquetException("BloomFilter not support Algorithm other than 
BLOCK");
+  }
+  header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+  if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+    throw ParquetException("BloomFilter not support Hash other than XXHASH");

Review Comment:
   ```suggestion
       throw ParquetException("BloomFilter does not support Hash other than 
XXHASH");
   ```



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+    return ::arrow::Status::Invalid(ss.str());
+  }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.hash.__isset.XXHASH) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter compression: " << header.compression << 
".";
+    return ::arrow::Status::Invalid(ss.str());
   }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > 
BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be 
in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
+  ReaderProperties readerProperties;
+  uint32_t length = HEADER_SIZE_GUESS;
+
+  uint32_t headerSize = 0;
+
+  ThriftDeserializer deserializer(readerProperties);
+  format::BloomFilterHeader header;
+
+  // Read and deserialize bloom filter header
+  while (true) {
+    PARQUET_ASSIGN_OR_THROW(auto sv, input->Peek(length));
+    // This gets used, then set by DeserializeThriftMsg
+    headerSize = static_cast<uint32_t>(sv.size());
+    try {
+      deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(sv.data()),
+                                      &headerSize, &header);
+      break;
+    } catch (std::exception& e) {
+      // Failed to deserialize. Double the allowed page header size and try 
again
+      length *= 2;
+      if (length > MAX_BLOOM_FILTER_HEADER_SIZE) {
+        std::stringstream ss;
+        ss << "Deserializing bloom filter header failed.\n" << e.what();
+        throw ParquetException(ss.str());
+      }
+    }
   }
-  if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
-    throw ParquetException("Unsupported Bloom filter algorithm");
+
+  // Throw if the header is invalid
+  auto status = validateBloomFilterHeader(header);
+  if (!status.ok()) {
+    throw ParquetException(status.ToString());
   }
 
-  BlockSplitBloomFilter bloom_filter;
+  // Read remaining data of bitset
+  PARQUET_THROW_NOT_OK(input->Advance(headerSize));
+  PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(header.numBytes));
 
-  PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
-  bloom_filter.Init(buffer->data(), len);
-  return bloom_filter;
+  BlockSplitBloomFilter bloomFilter;
+  bloomFilter.Init(buffer->data(), header.numBytes);
+  return bloomFilter;
 }
 
 void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
   DCHECK(sink != nullptr);
 
-  PARQUET_THROW_NOT_OK(
-      sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), 
sizeof(num_bytes_)));
-  PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<const 
uint8_t*>(&hash_strategy_),
-                                   sizeof(hash_strategy_)));
-  PARQUET_THROW_NOT_OK(
-      sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), 
sizeof(algorithm_)));
-  PARQUET_THROW_NOT_OK(sink->Write(data_->mutable_data(), num_bytes_));
+  format::BloomFilterHeader header;
+  if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+    throw ParquetException("BloomFilter not support Algorithm other than 
BLOCK");
+  }
+  header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+  if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+    throw ParquetException("BloomFilter not support Hash other than XXHASH");
+  }
+  header.hash.__set_XXHASH(format::XxHash());
+  if (ARROW_PREDICT_FALSE(compression_strategy_ != 
CompressionStrategy::UNCOMPRESSED)) {
+    throw ParquetException("BloomFilter not support Compression other than 
UNCOMPRESSED");

Review Comment:
   ```suggestion
       throw ParquetException("BloomFilter does not support Compression other 
than UNCOMPRESSED");
   ```



##########
cpp/src/parquet/xxhasher.h:
##########
@@ -44,9 +38,7 @@ class PARQUET_EXPORT MurmurHash3 : public Hasher {
   uint64_t Hash(const FLBA* val, uint32_t len) const override;
 
  private:
-  // Default seed for hash which comes from Bloom filter in parquet-mr, it is 
generated
-  // by System.nanoTime() of java.
-  static constexpr int DEFAULT_SEED = 1361930890;
+  static constexpr int DEFAULT_SEED = 0;

Review Comment:
   ```suggestion
     static constexpr int kDefaultSeed = 0;
   ```



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(

Review Comment:
   Usually we use capitalized initials.



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -65,51 +69,109 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t HEADER_SIZE_GUESS = 32;
+static constexpr uint32_t MAX_BLOOM_FILTER_HEADER_SIZE = 1024;
+
+static ::arrow::Status validateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter algorithm: " << header.algorithm << ".";
+    return ::arrow::Status::Invalid(ss.str());
+  }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.hash.__isset.XXHASH) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter hash: " << header.hash << ".";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    std::stringstream ss;
+    ss << "Unsupported Bloom filter compression: " << header.compression << 
".";
+    return ::arrow::Status::Invalid(ss.str());
   }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > 
BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be 
in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
+  ReaderProperties readerProperties;
+  uint32_t length = HEADER_SIZE_GUESS;
+
+  uint32_t headerSize = 0;

Review Comment:
   ```suggestion
     uint32_t header_size = 0;
   ```



##########
cpp/src/parquet/parquet.thrift:
##########
@@ -585,15 +585,15 @@ union BloomFilterAlgorithm {
 /** Hash strategy type annotation. xxHash is an extremely fast 
non-cryptographic hash
  * algorithm. It uses 64 bits version of xxHash. 
  **/
-struct XxHash {}
+struct XxHasher {}

Review Comment:
   Should we update the generated cop as well? And is it good time to sync the 
latest thrift file from upstream parquet-format? @wjones127 @pitrou 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to