This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eeb74edaf GH-15164: [C++][Parquet] Implement current version of 
BloomFilter spec (#33776)
7eeb74edaf is described below

commit 7eeb74edaf29031ee1c9b0547c03a46a94323477
Author: mwish <maplewish...@gmail.com>
AuthorDate: Thu Feb 2 02:08:50 2023 +0800

    GH-15164: [C++][Parquet] Implement current version of BloomFilter spec 
(#33776)
    
    The original Parquet Bloom Filter spec, added in 2018 (PARQUET-41), was 
based on the Murmur3 hash function. The spec was later changed to use the XXH64 
hash function from xxHash (PARQUET-1609); however, Parquet C++ wasn't updated 
for this and kept implementing the original spec.
    
    This PR switches the bloom filter implementation to the current version of 
the Bloom Filter spec (as of 
https://github.com/apache/parquet-format/commit/3fb10e00c2204bf1c6cc91e094c59e84cefcee33).
 Conformance is tested using a dedicated test file in the parquet-testing 
repository (`bloom_filter.xxhash.bin`).
    
    Lead-authored-by: mwish <maplewish...@gmail.com>
    Co-authored-by: Antoine Pitrou <anto...@python.org>
    Signed-off-by: Antoine Pitrou <anto...@python.org>
---
 cpp/src/parquet/CMakeLists.txt            |   2 +-
 cpp/src/parquet/bloom_filter.cc           | 140 ++++++++++++++-----
 cpp/src/parquet/bloom_filter.h            |  29 ++--
 cpp/src/parquet/bloom_filter_test.cc      | 153 +++++++++++++++-----
 cpp/src/parquet/murmur3.cc                | 222 ------------------------------
 cpp/src/parquet/xxhasher.cc               |  62 +++++++++
 cpp/src/parquet/{murmur3.h => xxhasher.h} |  17 +--
 cpp/submodules/parquet-testing            |   2 +-
 docs/source/cpp/parquet.rst               |   5 +
 9 files changed, 314 insertions(+), 318 deletions(-)

diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index 773ef2a6dd..4b910d5c43 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -165,7 +165,7 @@ set(PARQUET_SRCS
     level_comparison.cc
     level_conversion.cc
     metadata.cc
-    murmur3.cc
+    xxhasher.cc
     page_index.cc
     "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp"
     "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp"
diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc
index 302698836b..730cce435e 100644
--- a/cpp/src/parquet/bloom_filter.cc
+++ b/cpp/src/parquet/bloom_filter.cc
@@ -17,20 +17,24 @@
 
 #include <cstdint>
 #include <cstring>
+#include <memory>
 
 #include "arrow/result.h"
 #include "arrow/util/logging.h"
+#include "generated/parquet_types.h"
 #include "parquet/bloom_filter.h"
 #include "parquet/exception.h"
-#include "parquet/murmur3.h"
+#include "parquet/thrift_internal.h"
+#include "parquet/xxhasher.h"
 
 namespace parquet {
 constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
 
 BlockSplitBloomFilter::BlockSplitBloomFilter()
     : pool_(::arrow::default_memory_pool()),
-      hash_strategy_(HashStrategy::MURMUR3_X64_128),
-      algorithm_(Algorithm::BLOCK) {}
+      hash_strategy_(HashStrategy::XXHASH),
+      algorithm_(Algorithm::BLOCK),
+      compression_strategy_(CompressionStrategy::UNCOMPRESSED) {}
 
 void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
   if (num_bytes < kMinimumBloomFilterBytes) {
@@ -50,7 +54,7 @@ void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memset(data_->mutable_data(), 0, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
 void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
@@ -65,51 +69,115 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, 
uint32_t num_bytes) {
   PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_));
   memcpy(data_->mutable_data(), bitset, num_bytes_);
 
-  this->hasher_.reset(new MurmurHash3());
+  this->hasher_ = std::make_unique<XxHasher>();
 }
 
-BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* 
input) {
-  uint32_t len, hash, algorithm;
-  int64_t bytes_available;
+static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&len));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+static ::arrow::Status ValidateBloomFilterHeader(
+    const format::BloomFilterHeader& header) {
+  if (!header.algorithm.__isset.BLOCK) {
+    return ::arrow::Status::Invalid(
+        "Unsupported Bloom filter algorithm: ", header.algorithm, ".");
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&hash));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (!header.hash.__isset.XXHASH) {
+    return ::arrow::Status::Invalid("Unsupported Bloom filter hash: ", 
header.hash, ".");
   }
-  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
-    throw ParquetException("Unsupported hash strategy");
+
+  if (!header.compression.__isset.UNCOMPRESSED) {
+    return ::arrow::Status::Invalid(
+        "Unsupported Bloom filter compression: ", header.compression, ".");
   }
 
-  PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), 
&algorithm));
-  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) {
-    throw ParquetException("Failed to deserialize from input stream");
+  if (header.numBytes <= 0 ||
+      static_cast<uint32_t>(header.numBytes) > 
BloomFilter::kMaximumBloomFilterBytes) {
+    std::stringstream ss;
+    ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be 
in range ("
+       << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "].";
+    return ::arrow::Status::Invalid(ss.str());
   }
-  if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
-    throw ParquetException("Unsupported Bloom filter algorithm");
+
+  return ::arrow::Status::OK();
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
+    const ReaderProperties& properties, ArrowInputStream* input) {
+  // NOTE: we don't know the bloom filter header size upfront, and we can't 
rely on
+  // InputStream::Peek() which isn't always implemented. Therefore, we must 
first
+  // Read() with an upper bound estimate of the header size, then once we know
+  // the bloom filter data size, we can Read() the exact number of remaining 
data bytes.
+  ThriftDeserializer deserializer(properties);
+  format::BloomFilterHeader header;
+
+  // Read and deserialize bloom filter header
+  PARQUET_ASSIGN_OR_THROW(auto header_buf, 
input->Read(kBloomFilterHeaderSizeGuess));
+  // This gets used, then set by DeserializeThriftMsg
+  uint32_t header_size = static_cast<uint32_t>(header_buf->size());
+  try {
+    deserializer.DeserializeMessage(reinterpret_cast<const 
uint8_t*>(header_buf->data()),
+                                    &header_size, &header);
+    DCHECK_LE(header_size, header_buf->size());
+  } catch (std::exception& e) {
+    std::stringstream ss;
+    ss << "Deserializing bloom filter header failed.\n" << e.what();
+    throw ParquetException(ss.str());
+  }
+  PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
+
+  const int32_t bloom_filter_size = header.numBytes;
+  if (bloom_filter_size + header_size <= header_buf->size()) {
+    // The bloom filter data is entirely contained in the buffer we just read
+    // => just return it.
+    BlockSplitBloomFilter bloom_filter;
+    bloom_filter.Init(header_buf->data() + header_size, bloom_filter_size);
+    return bloom_filter;
+  }
+  // We have read a part of the bloom filter already, copy it to the target 
buffer
+  // and read the remaining part from the InputStream.
+  auto buffer = AllocateBuffer(properties.memory_pool(), bloom_filter_size);
+
+  const auto bloom_filter_bytes_in_header = header_buf->size() - header_size;
+  if (bloom_filter_bytes_in_header > 0) {
+    std::memcpy(buffer->mutable_data(), header_buf->data() + header_size,
+                bloom_filter_bytes_in_header);
   }
 
+  const auto required_read_size = bloom_filter_size - 
bloom_filter_bytes_in_header;
+  PARQUET_ASSIGN_OR_THROW(
+      auto read_size, input->Read(required_read_size,
+                                  buffer->mutable_data() + 
bloom_filter_bytes_in_header));
+  if (ARROW_PREDICT_FALSE(read_size < required_read_size)) {
+    throw ParquetException("Bloom Filter read failed: not enough data");
+  }
   BlockSplitBloomFilter bloom_filter;
-
-  PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len));
-  bloom_filter.Init(buffer->data(), len);
+  bloom_filter.Init(buffer->data(), bloom_filter_size);
   return bloom_filter;
 }
 
 void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
   DCHECK(sink != nullptr);
 
-  PARQUET_THROW_NOT_OK(
-      sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), 
sizeof(num_bytes_)));
-  PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<const 
uint8_t*>(&hash_strategy_),
-                                   sizeof(hash_strategy_)));
-  PARQUET_THROW_NOT_OK(
-      sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), 
sizeof(algorithm_)));
-  PARQUET_THROW_NOT_OK(sink->Write(data_->mutable_data(), num_bytes_));
+  format::BloomFilterHeader header;
+  if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
+    throw ParquetException("BloomFilter does not support Algorithm other than 
BLOCK");
+  }
+  header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
+  if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
+    throw ParquetException("BloomFilter does not support Hash other than 
XXHASH");
+  }
+  header.hash.__set_XXHASH(format::XxHash());
+  if (ARROW_PREDICT_FALSE(compression_strategy_ != 
CompressionStrategy::UNCOMPRESSED)) {
+    throw ParquetException(
+        "BloomFilter does not support Compression other than UNCOMPRESSED");
+  }
+  header.compression.__set_UNCOMPRESSED(format::Uncompressed());
+  header.__set_numBytes(num_bytes_);
+
+  ThriftSerializer serializer;
+  serializer.Serialize(&header, sink);
+
+  PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
 }
 
 void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const 
{
@@ -128,9 +196,9 @@ void BlockSplitBloomFilter::SetMask(uint32_t key, 
BlockMask& block_mask) const {
 
 bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
   const uint32_t bucket_index =
-      static_cast<uint32_t>((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock 
- 1));
-  uint32_t key = static_cast<uint32_t>(hash);
-  uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+      static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / 
kBytesPerFilterBlock)) >> 32);
+  const uint32_t key = static_cast<uint32_t>(hash);
+  const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
 
   // Calculate mask for bucket.
   BlockMask block_mask;
@@ -146,8 +214,8 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
 
 void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
   const uint32_t bucket_index =
-      static_cast<uint32_t>(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 
1);
-  uint32_t key = static_cast<uint32_t>(hash);
+      static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / 
kBytesPerFilterBlock)) >> 32);
+  const uint32_t key = static_cast<uint32_t>(hash);
   uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
 
   // Calculate mask for bucket.
diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h
index 4d12a7c63b..9a1f802961 100644
--- a/cpp/src/parquet/bloom_filter.h
+++ b/cpp/src/parquet/bloom_filter.h
@@ -105,21 +105,23 @@ class PARQUET_EXPORT BloomFilter {
 
  protected:
   // Hash strategy available for Bloom filter.
-  enum class HashStrategy : uint32_t { MURMUR3_X64_128 = 0 };
+  enum class HashStrategy : uint32_t { XXHASH = 0 };
 
   // Bloom filter algorithm.
   enum class Algorithm : uint32_t { BLOCK = 0 };
+
+  enum class CompressionStrategy : uint32_t { UNCOMPRESSED = 0 };
 };
 
-// The BlockSplitBloomFilter is implemented using block-based Bloom filters 
from
-// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic 
idea is to
-// hash the item to a tiny Bloom filter which size fit a single cache line or 
smaller.
-//
-// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
-// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
+/// The BlockSplitBloomFilter is implemented using block-based Bloom filters 
from
+/// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic 
idea is to
+/// hash the item to a tiny Bloom filter which size fit a single cache line or 
smaller.
+///
+/// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
+/// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
 class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
  public:
-  /// The constructor of BlockSplitBloomFilter. It uses murmur3_x64_128 as 
hash function.
+  /// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function.
   BlockSplitBloomFilter();
 
   /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be 
within
@@ -140,7 +142,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   /// @param num_bytes  The number of bytes of given bitset.
   void Init(const uint8_t* bitset, uint32_t num_bytes);
 
-  // Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
+  /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom 
filter.
   static constexpr uint32_t kMinimumBloomFilterBytes = 32;
 
   /// Calculate optimal size according to the number of distinct values and 
false
@@ -198,9 +200,11 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   /// Deserialize the Bloom filter from an input stream. It is used when 
reconstructing
   /// a Bloom filter from a parquet filter.
   ///
-  /// @param input_stream The input stream from which to construct the Bloom 
filter
+  /// @param properties The parquet reader properties.
+  /// @param input_stream The input stream from which to construct the Bloom 
filter.
   /// @return The BlockSplitBloomFilter.
-  static BlockSplitBloomFilter Deserialize(ArrowInputStream* input_stream);
+  static BlockSplitBloomFilter Deserialize(const ReaderProperties& properties,
+                                           ArrowInputStream* input_stream);
 
  private:
   // Bytes in a tiny Bloom filter block.
@@ -240,6 +244,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   // Algorithm used in this Bloom filter.
   Algorithm algorithm_;
 
+  // Compression used in this Bloom filter.
+  CompressionStrategy compression_strategy_;
+
   // The hash pointer points to actual hash class used.
   std::unique_ptr<Hasher> hasher_;
 };
diff --git a/cpp/src/parquet/bloom_filter_test.cc 
b/cpp/src/parquet/bloom_filter_test.cc
index 23aa4a5801..f161c1455f 100644
--- a/cpp/src/parquet/bloom_filter_test.cc
+++ b/cpp/src/parquet/bloom_filter_test.cc
@@ -22,6 +22,7 @@
 #include <memory>
 #include <random>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "arrow/buffer.h"
@@ -31,23 +32,14 @@
 
 #include "parquet/bloom_filter.h"
 #include "parquet/exception.h"
-#include "parquet/murmur3.h"
 #include "parquet/platform.h"
 #include "parquet/test_util.h"
 #include "parquet/types.h"
+#include "parquet/xxhasher.h"
 
 namespace parquet {
 namespace test {
 
-TEST(Murmur3Test, TestBloomFilter) {
-  uint64_t result;
-  const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
-  ByteArray byteArray(8, bitset);
-  MurmurHash3 murmur3;
-  result = murmur3.Hash(&byteArray);
-  EXPECT_EQ(result, UINT64_C(913737700387071329));
-}
-
 TEST(ConstructorTest, TestBloomFilter) {
   BlockSplitBloomFilter bloom_filter;
   EXPECT_NO_THROW(bloom_filter.Init(1000));
@@ -64,29 +56,74 @@ TEST(ConstructorTest, TestBloomFilter) {
 // The BasicTest is used to test basic operations including InsertHash, 
FindHash and
 // serializing and de-serializing.
 TEST(BasicTest, TestBloomFilter) {
-  BlockSplitBloomFilter bloom_filter;
-  bloom_filter.Init(1024);
+  const std::vector<uint32_t> kBloomFilterSizes = {32, 64, 128, 256, 512, 
1024, 2048};
+  const std::vector<int32_t> kIntInserts = {1, 2,  3,  5,  6,       7,      8,
+                                            9, 10, 42, -1, 1 << 29, 1 << 30};
+  const std::vector<double> kFloatInserts = {1.5,     -1.5, 3.0, 6.0, 0.0,
+                                             123.456, 1e6,  1e7, 1e8};
+  const std::vector<int32_t> kNegativeIntLookups = {0,  11, 12,      13,     
-2,
+                                                    -3, 43, 1 << 27, 1 << 28};
+
+  for (const auto bloom_filter_bytes : kBloomFilterSizes) {
+    BlockSplitBloomFilter bloom_filter;
+    bloom_filter.Init(bloom_filter_bytes);
+
+    // Empty bloom filter deterministically returns false
+    for (const auto v : kIntInserts) {
+      EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v)));
+    }
+    for (const auto v : kFloatInserts) {
+      EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v)));
+    }
 
-  for (int i = 0; i < 10; i++) {
-    bloom_filter.InsertHash(bloom_filter.Hash(i));
-  }
+    // Insert all values
+    for (const auto v : kIntInserts) {
+      bloom_filter.InsertHash(bloom_filter.Hash(v));
+    }
+    for (const auto v : kFloatInserts) {
+      bloom_filter.InsertHash(bloom_filter.Hash(v));
+    }
 
-  for (int i = 0; i < 10; i++) {
-    EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
-  }
+    // They should always lookup successfully
+    for (const auto v : kIntInserts) {
+      EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v)));
+    }
+    for (const auto v : kFloatInserts) {
+      EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v)));
+    }
 
-  // Serialize Bloom filter to memory output stream
-  auto sink = CreateOutputStream();
-  bloom_filter.WriteTo(sink.get());
+    // Values not inserted in the filter should only rarely lookup successfully
+    int false_positives = 0;
+    for (const auto v : kNegativeIntLookups) {
+      false_positives += bloom_filter.FindHash(bloom_filter.Hash(v));
+    }
+    // (this is a crude check, see FPPTest below for a more rigourous formula)
+    EXPECT_LE(false_positives, 2);
 
-  // Deserialize Bloom filter from memory
-  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
-  ::arrow::io::BufferReader source(buffer);
+    // Serialize Bloom filter to memory output stream
+    auto sink = CreateOutputStream();
+    bloom_filter.WriteTo(sink.get());
 
-  BlockSplitBloomFilter de_bloom = BlockSplitBloomFilter::Deserialize(&source);
+    // Deserialize Bloom filter from memory
+    ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+    ::arrow::io::BufferReader source(buffer);
 
-  for (int i = 0; i < 10; i++) {
-    EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
+    ReaderProperties reader_properties;
+    BlockSplitBloomFilter de_bloom =
+        BlockSplitBloomFilter::Deserialize(reader_properties, &source);
+
+    // Lookup previously inserted values
+    for (const auto v : kIntInserts) {
+      EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v)));
+    }
+    for (const auto v : kFloatInserts) {
+      EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v)));
+    }
+    false_positives = 0;
+    for (const auto v : kNegativeIntLookups) {
+      false_positives += de_bloom.FindHash(de_bloom.Hash(v));
+    }
+    EXPECT_LE(false_positives, 2);
   }
 }
 
@@ -159,20 +196,22 @@ TEST(FPPTest, TestBloomFilter) {
 TEST(CompatibilityTest, TestBloomFilter) {
   const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
   const std::string bloom_filter_test_binary =
-      std::string(test::get_data_dir()) + "/bloom_filter.bin";
+      std::string(test::get_data_dir()) + "/bloom_filter.xxhash.bin";
 
   PARQUET_ASSIGN_OR_THROW(auto handle,
                           
::arrow::io::ReadableFile::Open(bloom_filter_test_binary));
   PARQUET_ASSIGN_OR_THROW(int64_t size, handle->GetSize());
 
-  // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes 
(length)
-  EXPECT_EQ(size, 1036);
+  // 16 bytes (thrift header) + 1024 bytes (bitset)
+  EXPECT_EQ(size, 1040);
 
   std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
   PARQUET_ASSIGN_OR_THROW(auto buffer, handle->Read(size));
 
   ::arrow::io::BufferReader source(buffer);
-  BlockSplitBloomFilter bloom_filter1 = 
BlockSplitBloomFilter::Deserialize(&source);
+  ReaderProperties reader_properties;
+  BlockSplitBloomFilter bloom_filter1 =
+      BlockSplitBloomFilter::Deserialize(reader_properties, &source);
 
   for (int i = 0; i < 4; i++) {
     const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
@@ -242,6 +281,56 @@ TEST(OptimalValueTest, TestBloomFilter) {
       UINT32_C(1073741824));
 }
 
-}  // namespace test
+// The test below is plainly copied from parquet-mr and serves as a basic 
sanity
+// check of our XXH64 wrapper.
+const int64_t HASHES_OF_LOOPING_BYTES_WITH_SEED_0[32] = {
+    -1205034819632174695L, -1642502924627794072L, 5216751715308240086L,
+    -1889335612763511331L, -13835840860730338L,   -2521325055659080948L,
+    4867868962443297827L,  1498682999415010002L,  -8626056615231480947L,
+    7482827008138251355L,  -617731006306969209L,  7289733825183505098L,
+    4776896707697368229L,  1428059224718910376L,  6690813482653982021L,
+    -6248474067697161171L, 4951407828574235127L,  6198050452789369270L,
+    5776283192552877204L,  -626480755095427154L,  -6637184445929957204L,
+    8370873622748562952L,  -1705978583731280501L, -7898818752540221055L,
+    -2516210193198301541L, 8356900479849653862L,  -4413748141896466000L,
+    -6040072975510680789L, 1451490609699316991L,  -7948005844616396060L,
+    8567048088357095527L,  -4375578310507393311L};
+
+/**
+ * Test data is output of the following program with xxHash implementation
+ * from https://github.com/Cyan4973/xxHash with commit
+ * c8c4cc0f812719ce1f5b2c291159658980e7c255
+ *
+ * #define XXH_INLINE_ALL
+ * #include "xxhash.h"
+ * #include <stdlib.h>
+ * #include <stdio.h>
+ * int main()
+ * {
+ *     char* src = (char*) malloc(32);
+ *     const int N = 32;
+ *     for (int i = 0; i < N; i++) {
+ *         src[i] = (char) i;
+ *     }
+ *
+ *     printf("without seed\n");
+ *     for (int i = 0; i <= N; i++) {
+ *        printf("%lldL,\n", (long long) XXH64(src, i, 0));
+ *     }
+ * }
+ */
+TEST(XxHashTest, TestBloomFilter) {
+  uint8_t bytes[32] = {};
+
+  for (int i = 0; i < 32; i++) {
+    ByteArray byteArray(i, bytes);
+    bytes[i] = i;
+
+    auto hasher_seed_0 = std::make_unique<XxHasher>();
+    EXPECT_EQ(HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], 
hasher_seed_0->Hash(&byteArray))
+        << "Hash with seed 0 Error: " << i;
+  }
+}
 
+}  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/murmur3.cc b/cpp/src/parquet/murmur3.cc
deleted file mode 100644
index 07a936e041..0000000000
--- a/cpp/src/parquet/murmur3.cc
+++ /dev/null
@@ -1,222 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//-----------------------------------------------------------------------------
-// MurmurHash3 was written by Austin Appleby, and is placed in the public
-// domain. The author hereby disclaims copyright to this source code.
-
-// Note - The x86 and x64 versions do _not_ produce the same results, as the
-// algorithms are optimized for their respective platforms. You can still
-// compile and run any of them on any platform, but your performance with the
-// non-native version will be less than optimal.
-
-#include "parquet/murmur3.h"
-
-namespace parquet {
-
-#if defined(_MSC_VER)
-
-#define FORCE_INLINE __forceinline
-#define ROTL64(x, y) _rotl64(x, y)
-
-#else  // defined(_MSC_VER)
-
-#define FORCE_INLINE inline __attribute__((always_inline))
-inline uint64_t rotl64(uint64_t x, int8_t r) { return (x << r) | (x >> (64 - 
r)); }
-#define ROTL64(x, y) rotl64(x, y)
-
-#endif  // !defined(_MSC_VER)
-
-#define BIG_CONSTANT(x) (x##LLU)
-
-//-----------------------------------------------------------------------------
-// Block read - if your platform needs to do endian-swapping or can only
-// handle aligned reads, do the conversion here
-
-FORCE_INLINE uint32_t getblock32(const uint32_t* p, int i) { return p[i]; }
-
-FORCE_INLINE uint64_t getblock64(const uint64_t* p, int i) { return p[i]; }
-
-//-----------------------------------------------------------------------------
-// Finalization mix - force all bits of a hash block to avalanche
-
-FORCE_INLINE uint32_t fmix32(uint32_t h) {
-  h ^= h >> 16;
-  h *= 0x85ebca6b;
-  h ^= h >> 13;
-  h *= 0xc2b2ae35;
-  h ^= h >> 16;
-
-  return h;
-}
-
-//----------
-
-FORCE_INLINE uint64_t fmix64(uint64_t k) {
-  k ^= k >> 33;
-  k *= BIG_CONSTANT(0xff51afd7ed558ccd);
-  k ^= k >> 33;
-  k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
-  k ^= k >> 33;
-
-  return k;
-}
-
-//-----------------------------------------------------------------------------
-
-void Hash_x64_128(const void* key, const int len, const uint32_t seed, 
uint64_t out[2]) {
-  const uint8_t* data = (const uint8_t*)key;
-  const int nblocks = len / 16;
-
-  uint64_t h1 = seed;
-  uint64_t h2 = seed;
-
-  const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
-  const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
-
-  //----------
-  // body
-
-  const uint64_t* blocks = (const uint64_t*)(data);
-
-  for (int i = 0; i < nblocks; i++) {
-    uint64_t k1 = getblock64(blocks, i * 2 + 0);
-    uint64_t k2 = getblock64(blocks, i * 2 + 1);
-
-    k1 *= c1;
-    k1 = ROTL64(k1, 31);
-    k1 *= c2;
-    h1 ^= k1;
-
-    h1 = ROTL64(h1, 27);
-    h1 += h2;
-    h1 = h1 * 5 + 0x52dce729;
-
-    k2 *= c2;
-    k2 = ROTL64(k2, 33);
-    k2 *= c1;
-    h2 ^= k2;
-
-    h2 = ROTL64(h2, 31);
-    h2 += h1;
-    h2 = h2 * 5 + 0x38495ab5;
-  }
-
-  //----------
-  // tail
-
-  const uint8_t* tail = (const uint8_t*)(data + nblocks * 16);
-
-  uint64_t k1 = 0;
-  uint64_t k2 = 0;
-
-  switch (len & 15) {
-    case 15:
-      k2 ^= ((uint64_t)tail[14]) << 48;  // fall through
-    case 14:
-      k2 ^= ((uint64_t)tail[13]) << 40;  // fall through
-    case 13:
-      k2 ^= ((uint64_t)tail[12]) << 32;  // fall through
-    case 12:
-      k2 ^= ((uint64_t)tail[11]) << 24;  // fall through
-    case 11:
-      k2 ^= ((uint64_t)tail[10]) << 16;  // fall through
-    case 10:
-      k2 ^= ((uint64_t)tail[9]) << 8;  // fall through
-    case 9:
-      k2 ^= ((uint64_t)tail[8]) << 0;
-      k2 *= c2;
-      k2 = ROTL64(k2, 33);
-      k2 *= c1;
-      h2 ^= k2;  // fall through
-
-    case 8:
-      k1 ^= ((uint64_t)tail[7]) << 56;  // fall through
-    case 7:
-      k1 ^= ((uint64_t)tail[6]) << 48;  // fall through
-    case 6:
-      k1 ^= ((uint64_t)tail[5]) << 40;  // fall through
-    case 5:
-      k1 ^= ((uint64_t)tail[4]) << 32;  // fall through
-    case 4:
-      k1 ^= ((uint64_t)tail[3]) << 24;  // fall through
-    case 3:
-      k1 ^= ((uint64_t)tail[2]) << 16;  // fall through
-    case 2:
-      k1 ^= ((uint64_t)tail[1]) << 8;  // fall through
-    case 1:
-      k1 ^= ((uint64_t)tail[0]) << 0;
-      k1 *= c1;
-      k1 = ROTL64(k1, 31);
-      k1 *= c2;
-      h1 ^= k1;
-  }
-
-  //----------
-  // finalization
-
-  h1 ^= len;
-  h2 ^= len;
-
-  h1 += h2;
-  h2 += h1;
-
-  h1 = fmix64(h1);
-  h2 = fmix64(h2);
-
-  h1 += h2;
-  h2 += h1;
-
-  reinterpret_cast<uint64_t*>(out)[0] = h1;
-  reinterpret_cast<uint64_t*>(out)[1] = h2;
-}
-
-template <typename T>
-uint64_t HashHelper(T value, uint32_t seed) {
-  uint64_t output[2];
-  Hash_x64_128(reinterpret_cast<void*>(&value), sizeof(T), seed, output);
-  return output[0];
-}
-
-uint64_t MurmurHash3::Hash(int32_t value) const { return HashHelper(value, 
seed_); }
-
-uint64_t MurmurHash3::Hash(int64_t value) const { return HashHelper(value, 
seed_); }
-
-uint64_t MurmurHash3::Hash(float value) const { return HashHelper(value, 
seed_); }
-
-uint64_t MurmurHash3::Hash(double value) const { return HashHelper(value, 
seed_); }
-
-uint64_t MurmurHash3::Hash(const FLBA* value, uint32_t len) const {
-  uint64_t out[2];
-  Hash_x64_128(reinterpret_cast<const void*>(value->ptr), len, seed_, out);
-  return out[0];
-}
-
-uint64_t MurmurHash3::Hash(const Int96* value) const {
-  uint64_t out[2];
-  Hash_x64_128(reinterpret_cast<const void*>(value->value), 
sizeof(value->value), seed_,
-               out);
-  return out[0];
-}
-
-uint64_t MurmurHash3::Hash(const ByteArray* value) const {
-  uint64_t out[2];
-  Hash_x64_128(reinterpret_cast<const void*>(value->ptr), value->len, seed_, 
out);
-  return out[0];
-}
-
-}  // namespace parquet
diff --git a/cpp/src/parquet/xxhasher.cc b/cpp/src/parquet/xxhasher.cc
new file mode 100644
index 0000000000..18dfae3d98
--- /dev/null
+++ b/cpp/src/parquet/xxhasher.cc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/xxhasher.h"
+
+#define XXH_INLINE_ALL
+#include "arrow/vendored/xxhash/xxhash.h"
+
+namespace parquet {
+
+namespace {
+template <typename T>
+uint64_t XxHashHelper(T value, uint32_t seed) {
+  return XXH64(reinterpret_cast<const void*>(&value), sizeof(T), seed);
+}
+}  // namespace
+
+uint64_t XxHasher::Hash(int32_t value) const {
+  return XxHashHelper(value, kParquetBloomXxHashSeed);
+}
+
+uint64_t XxHasher::Hash(int64_t value) const {
+  return XxHashHelper(value, kParquetBloomXxHashSeed);
+}
+
+uint64_t XxHasher::Hash(float value) const {
+  return XxHashHelper(value, kParquetBloomXxHashSeed);
+}
+
+uint64_t XxHasher::Hash(double value) const {
+  return XxHashHelper(value, kParquetBloomXxHashSeed);
+}
+
+uint64_t XxHasher::Hash(const FLBA* value, uint32_t len) const {
+  return XXH64(reinterpret_cast<const void*>(value->ptr), len, 
kParquetBloomXxHashSeed);
+}
+
+uint64_t XxHasher::Hash(const Int96* value) const {
+  return XXH64(reinterpret_cast<const void*>(value->value), 
sizeof(value->value),
+               kParquetBloomXxHashSeed);
+}
+
+uint64_t XxHasher::Hash(const ByteArray* value) const {
+  return XXH64(reinterpret_cast<const void*>(value->ptr), value->len,
+               kParquetBloomXxHashSeed);
+}
+
+}  // namespace parquet
diff --git a/cpp/src/parquet/murmur3.h b/cpp/src/parquet/xxhasher.h
similarity index 65%
rename from cpp/src/parquet/murmur3.h
rename to cpp/src/parquet/xxhasher.h
index acf7088e44..4a21f14eb2 100644
--- a/cpp/src/parquet/murmur3.h
+++ b/cpp/src/parquet/xxhasher.h
@@ -15,10 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//-----------------------------------------------------------------------------
-// MurmurHash3 was written by Austin Appleby, and is placed in the public
-// domain. The author hereby disclaims copyright to this source code.
-
 #pragma once
 
 #include <cstdint>
@@ -29,12 +25,8 @@
 
 namespace parquet {
 
-/// Source:
-/// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
-/// (Modified to adapt to coding conventions and to inherit the Hasher 
abstract class)
-class PARQUET_EXPORT MurmurHash3 : public Hasher {
+class PARQUET_EXPORT XxHasher : public Hasher {
  public:
-  MurmurHash3() : seed_(DEFAULT_SEED) {}
   uint64_t Hash(int32_t value) const override;
   uint64_t Hash(int64_t value) const override;
   uint64_t Hash(float value) const override;
@@ -43,12 +35,7 @@ class PARQUET_EXPORT MurmurHash3 : public Hasher {
   uint64_t Hash(const ByteArray* value) const override;
   uint64_t Hash(const FLBA* val, uint32_t len) const override;
 
- private:
-  // Default seed for hash which comes from Bloom filter in parquet-mr, it is 
generated
-  // by System.nanoTime() of java.
-  static constexpr int DEFAULT_SEED = 1361930890;
-
-  uint32_t seed_;
+  static constexpr int kParquetBloomXxHashSeed = 0;
 };
 
 }  // namespace parquet
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index 5b82793ef7..e2d244ab9a 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit 5b82793ef7196f7b3583e85669ced211cd8b5ff2
+Subproject commit e2d244ab9a84d382e3a50f55db41f362e450428b
diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst
index edc42d54cf..e56fd33c35 100644
--- a/docs/source/cpp/parquet.rst
+++ b/docs/source/cpp/parquet.rst
@@ -578,6 +578,11 @@ Miscellaneous
 +--------------------------+----------+----------+---------+
 | Offset Index             | ✓        |          | \(1)    |
 +--------------------------+----------+----------+---------+
+| Bloom Filter             | ✓        | ✓        | \(2)    |
++--------------------------+----------+----------+---------+
 
 * \(1) Access to the Column and Offset Index structures is provided, but
   data read APIs do not currently make any use of them.
+
+* \(2) APIs are provided for creating, serializing and deserializing Bloom
+  Filters, but they are not integrated into data read APIs.


Reply via email to