This is an automated email from the ASF dual-hosted git repository. apitrou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 7eeb74edaf GH-15164: [C++][Parquet] Implement current version of BloomFilter spec (#33776) 7eeb74edaf is described below commit 7eeb74edaf29031ee1c9b0547c03a46a94323477 Author: mwish <maplewish...@gmail.com> AuthorDate: Thu Feb 2 02:08:50 2023 +0800 GH-15164: [C++][Parquet] Implement current version of BloomFilter spec (#33776) The original Parquet Bloom Filter spec, added in 2018 (PARQUET-41), was based on the Murmur3 hash function. The spec was later changed to use the XXH64 hash function from xxHash (PARQUET-1609); however, Parquet C++ wasn't updated for this and kept implementing the original spec. This PR switches the bloom filter implementation to the current version of the Bloom Filter spec (as of https://github.com/apache/parquet-format/commit/3fb10e00c2204bf1c6cc91e094c59e84cefcee33). Conformance is tested using a dedicated test file in the parquet-testing repository (`bloom_filter.xxhash.bin`). Lead-authored-by: mwish <maplewish...@gmail.com> Co-authored-by: Antoine Pitrou <anto...@python.org> Signed-off-by: Antoine Pitrou <anto...@python.org> --- cpp/src/parquet/CMakeLists.txt | 2 +- cpp/src/parquet/bloom_filter.cc | 140 ++++++++++++++----- cpp/src/parquet/bloom_filter.h | 29 ++-- cpp/src/parquet/bloom_filter_test.cc | 153 +++++++++++++++----- cpp/src/parquet/murmur3.cc | 222 ------------------------------ cpp/src/parquet/xxhasher.cc | 62 +++++++++ cpp/src/parquet/{murmur3.h => xxhasher.h} | 17 +-- cpp/submodules/parquet-testing | 2 +- docs/source/cpp/parquet.rst | 5 + 9 files changed, 314 insertions(+), 318 deletions(-) diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 773ef2a6dd..4b910d5c43 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -165,7 +165,7 @@ set(PARQUET_SRCS level_comparison.cc level_conversion.cc metadata.cc - murmur3.cc + xxhasher.cc page_index.cc "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp" "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp" diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 302698836b..730cce435e 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -17,20 +17,24 @@ #include <cstdint> #include <cstring> +#include <memory> #include "arrow/result.h" #include "arrow/util/logging.h" +#include "generated/parquet_types.h" #include "parquet/bloom_filter.h" #include "parquet/exception.h" -#include "parquet/murmur3.h" +#include "parquet/thrift_internal.h" +#include "parquet/xxhasher.h" namespace parquet { constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock]; BlockSplitBloomFilter::BlockSplitBloomFilter() : pool_(::arrow::default_memory_pool()), - hash_strategy_(HashStrategy::MURMUR3_X64_128), - algorithm_(Algorithm::BLOCK) {} + hash_strategy_(HashStrategy::XXHASH), + algorithm_(Algorithm::BLOCK), + compression_strategy_(CompressionStrategy::UNCOMPRESSED) {} void BlockSplitBloomFilter::Init(uint32_t num_bytes) { if (num_bytes < kMinimumBloomFilterBytes) { @@ -50,7 +54,7 @@ void BlockSplitBloomFilter::Init(uint32_t num_bytes) { PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_)); memset(data_->mutable_data(), 0, num_bytes_); - this->hasher_.reset(new MurmurHash3()); + this->hasher_ = std::make_unique<XxHasher>(); } void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) { @@ -65,51 +69,115 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) { PARQUET_ASSIGN_OR_THROW(data_, ::arrow::AllocateBuffer(num_bytes_, pool_)); memcpy(data_->mutable_data(), bitset, num_bytes_); - this->hasher_.reset(new MurmurHash3()); + this->hasher_ = std::make_unique<XxHasher>(); } -BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(ArrowInputStream* input) { - uint32_t len, hash, algorithm; - int64_t bytes_available; +static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256; - PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), &len)); - if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) { - throw ParquetException("Failed to deserialize from input stream"); +static ::arrow::Status ValidateBloomFilterHeader( + const format::BloomFilterHeader& header) { + if (!header.algorithm.__isset.BLOCK) { + return ::arrow::Status::Invalid( + "Unsupported Bloom filter algorithm: ", header.algorithm, "."); } - PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), &hash)); - if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) { - throw ParquetException("Failed to deserialize from input stream"); + if (!header.hash.__isset.XXHASH) { + return ::arrow::Status::Invalid("Unsupported Bloom filter hash: ", header.hash, "."); } - if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) { - throw ParquetException("Unsupported hash strategy"); + + if (!header.compression.__isset.UNCOMPRESSED) { + return ::arrow::Status::Invalid( + "Unsupported Bloom filter compression: ", header.compression, "."); } - PARQUET_ASSIGN_OR_THROW(bytes_available, input->Read(sizeof(uint32_t), &algorithm)); - if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t)) { - throw ParquetException("Failed to deserialize from input stream"); + if (header.numBytes <= 0 || + static_cast<uint32_t>(header.numBytes) > BloomFilter::kMaximumBloomFilterBytes) { + std::stringstream ss; + ss << "Bloom filter size is incorrect: " << header.numBytes << ". Must be in range (" + << 0 << ", " << BloomFilter::kMaximumBloomFilterBytes << "]."; + return ::arrow::Status::Invalid(ss.str()); } - if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) { - throw ParquetException("Unsupported Bloom filter algorithm"); + + return ::arrow::Status::OK(); +} + +BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize( + const ReaderProperties& properties, ArrowInputStream* input) { + // NOTE: we don't know the bloom filter header size upfront, and we can't rely on + // InputStream::Peek() which isn't always implemented. Therefore, we must first + // Read() with an upper bound estimate of the header size, then once we know + // the bloom filter data size, we can Read() the exact number of remaining data bytes. + ThriftDeserializer deserializer(properties); + format::BloomFilterHeader header; + + // Read and deserialize bloom filter header + PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(kBloomFilterHeaderSizeGuess)); + // This gets used, then set by DeserializeThriftMsg + uint32_t header_size = static_cast<uint32_t>(header_buf->size()); + try { + deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(header_buf->data()), + &header_size, &header); + DCHECK_LE(header_size, header_buf->size()); + } catch (std::exception& e) { + std::stringstream ss; + ss << "Deserializing bloom filter header failed.\n" << e.what(); + throw ParquetException(ss.str()); + } + PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header)); + + const int32_t bloom_filter_size = header.numBytes; + if (bloom_filter_size + header_size <= header_buf->size()) { + // The bloom filter data is entirely contained in the buffer we just read + // => just return it. + BlockSplitBloomFilter bloom_filter; + bloom_filter.Init(header_buf->data() + header_size, bloom_filter_size); + return bloom_filter; + } + // We have read a part of the bloom filter already, copy it to the target buffer + // and read the remaining part from the InputStream. + auto buffer = AllocateBuffer(properties.memory_pool(), bloom_filter_size); + + const auto bloom_filter_bytes_in_header = header_buf->size() - header_size; + if (bloom_filter_bytes_in_header > 0) { + std::memcpy(buffer->mutable_data(), header_buf->data() + header_size, + bloom_filter_bytes_in_header); } + const auto required_read_size = bloom_filter_size - bloom_filter_bytes_in_header; + PARQUET_ASSIGN_OR_THROW( + auto read_size, input->Read(required_read_size, + buffer->mutable_data() + bloom_filter_bytes_in_header)); + if (ARROW_PREDICT_FALSE(read_size < required_read_size)) { + throw ParquetException("Bloom Filter read failed: not enough data"); + } BlockSplitBloomFilter bloom_filter; - - PARQUET_ASSIGN_OR_THROW(auto buffer, input->Read(len)); - bloom_filter.Init(buffer->data(), len); + bloom_filter.Init(buffer->data(), bloom_filter_size); return bloom_filter; } void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const { DCHECK(sink != nullptr); - PARQUET_THROW_NOT_OK( - sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), sizeof(num_bytes_))); - PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<const uint8_t*>(&hash_strategy_), - sizeof(hash_strategy_))); - PARQUET_THROW_NOT_OK( - sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), sizeof(algorithm_))); - PARQUET_THROW_NOT_OK(sink->Write(data_->mutable_data(), num_bytes_)); + format::BloomFilterHeader header; + if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) { + throw ParquetException("BloomFilter does not support Algorithm other than BLOCK"); + } + header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm()); + if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) { + throw ParquetException("BloomFilter does not support Hash other than XXHASH"); + } + header.hash.__set_XXHASH(format::XxHash()); + if (ARROW_PREDICT_FALSE(compression_strategy_ != CompressionStrategy::UNCOMPRESSED)) { + throw ParquetException( + "BloomFilter does not support Compression other than UNCOMPRESSED"); + } + header.compression.__set_UNCOMPRESSED(format::Uncompressed()); + header.__set_numBytes(num_bytes_); + + ThriftSerializer serializer; + serializer.Serialize(&header, sink); + + PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_)); } void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const { @@ -128,9 +196,9 @@ void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const { bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { const uint32_t bucket_index = - static_cast<uint32_t>((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1)); - uint32_t key = static_cast<uint32_t>(hash); - uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data()); + static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t key = static_cast<uint32_t>(hash); + const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data()); // Calculate mask for bucket. BlockMask block_mask; @@ -146,8 +214,8 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { void BlockSplitBloomFilter::InsertHash(uint64_t hash) { const uint32_t bucket_index = - static_cast<uint32_t>(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1); - uint32_t key = static_cast<uint32_t>(hash); + static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t key = static_cast<uint32_t>(hash); uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data()); // Calculate mask for bucket. diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index 4d12a7c63b..9a1f802961 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -105,21 +105,23 @@ class PARQUET_EXPORT BloomFilter { protected: // Hash strategy available for Bloom filter. - enum class HashStrategy : uint32_t { MURMUR3_X64_128 = 0 }; + enum class HashStrategy : uint32_t { XXHASH = 0 }; // Bloom filter algorithm. enum class Algorithm : uint32_t { BLOCK = 0 }; + + enum class CompressionStrategy : uint32_t { UNCOMPRESSED = 0 }; }; -// The BlockSplitBloomFilter is implemented using block-based Bloom filters from -// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to -// hash the item to a tiny Bloom filter which size fit a single cache line or smaller. -// -// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom -// filter is 32 bytes to take advantage of 32-byte SIMD instructions. +/// The BlockSplitBloomFilter is implemented using block-based Bloom filters from +/// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to +/// hash the item to a tiny Bloom filter which size fit a single cache line or smaller. +/// +/// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom +/// filter is 32 bytes to take advantage of 32-byte SIMD instructions. class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { public: - /// The constructor of BlockSplitBloomFilter. It uses murmur3_x64_128 as hash function. + /// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function. BlockSplitBloomFilter(); /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within @@ -140,7 +142,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param num_bytes The number of bytes of given bitset. void Init(const uint8_t* bitset, uint32_t num_bytes); - // Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter. + /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter. static constexpr uint32_t kMinimumBloomFilterBytes = 32; /// Calculate optimal size according to the number of distinct values and false @@ -198,9 +200,11 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// Deserialize the Bloom filter from an input stream. It is used when reconstructing /// a Bloom filter from a parquet filter. /// - /// @param input_stream The input stream from which to construct the Bloom filter + /// @param properties The parquet reader properties. + /// @param input_stream The input stream from which to construct the Bloom filter. /// @return The BlockSplitBloomFilter. - static BlockSplitBloomFilter Deserialize(ArrowInputStream* input_stream); + static BlockSplitBloomFilter Deserialize(const ReaderProperties& properties, + ArrowInputStream* input_stream); private: // Bytes in a tiny Bloom filter block. @@ -240,6 +244,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { // Algorithm used in this Bloom filter. Algorithm algorithm_; + // Compression used in this Bloom filter. + CompressionStrategy compression_strategy_; + // The hash pointer points to actual hash class used. std::unique_ptr<Hasher> hasher_; }; diff --git a/cpp/src/parquet/bloom_filter_test.cc b/cpp/src/parquet/bloom_filter_test.cc index 23aa4a5801..f161c1455f 100644 --- a/cpp/src/parquet/bloom_filter_test.cc +++ b/cpp/src/parquet/bloom_filter_test.cc @@ -22,6 +22,7 @@ #include <memory> #include <random> #include <string> +#include <utility> #include <vector> #include "arrow/buffer.h" @@ -31,23 +32,14 @@ #include "parquet/bloom_filter.h" #include "parquet/exception.h" -#include "parquet/murmur3.h" #include "parquet/platform.h" #include "parquet/test_util.h" #include "parquet/types.h" +#include "parquet/xxhasher.h" namespace parquet { namespace test { -TEST(Murmur3Test, TestBloomFilter) { - uint64_t result; - const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}; - ByteArray byteArray(8, bitset); - MurmurHash3 murmur3; - result = murmur3.Hash(&byteArray); - EXPECT_EQ(result, UINT64_C(913737700387071329)); -} - TEST(ConstructorTest, TestBloomFilter) { BlockSplitBloomFilter bloom_filter; EXPECT_NO_THROW(bloom_filter.Init(1000)); @@ -64,29 +56,74 @@ TEST(ConstructorTest, TestBloomFilter) { // The BasicTest is used to test basic operations including InsertHash, FindHash and // serializing and de-serializing. TEST(BasicTest, TestBloomFilter) { - BlockSplitBloomFilter bloom_filter; - bloom_filter.Init(1024); + const std::vector<uint32_t> kBloomFilterSizes = {32, 64, 128, 256, 512, 1024, 2048}; + const std::vector<int32_t> kIntInserts = {1, 2, 3, 5, 6, 7, 8, + 9, 10, 42, -1, 1 << 29, 1 << 30}; + const std::vector<double> kFloatInserts = {1.5, -1.5, 3.0, 6.0, 0.0, + 123.456, 1e6, 1e7, 1e8}; + const std::vector<int32_t> kNegativeIntLookups = {0, 11, 12, 13, -2, + -3, 43, 1 << 27, 1 << 28}; + + for (const auto bloom_filter_bytes : kBloomFilterSizes) { + BlockSplitBloomFilter bloom_filter; + bloom_filter.Init(bloom_filter_bytes); + + // Empty bloom filter deterministically returns false + for (const auto v : kIntInserts) { + EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v))); + } - for (int i = 0; i < 10; i++) { - bloom_filter.InsertHash(bloom_filter.Hash(i)); - } + // Insert all values + for (const auto v : kIntInserts) { + bloom_filter.InsertHash(bloom_filter.Hash(v)); + } + for (const auto v : kFloatInserts) { + bloom_filter.InsertHash(bloom_filter.Hash(v)); + } - for (int i = 0; i < 10; i++) { - EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i))); - } + // They should always lookup successfully + for (const auto v : kIntInserts) { + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v))); + } - // Serialize Bloom filter to memory output stream - auto sink = CreateOutputStream(); - bloom_filter.WriteTo(sink.get()); + // Values not inserted in the filter should only rarely lookup successfully + int false_positives = 0; + for (const auto v : kNegativeIntLookups) { + false_positives += bloom_filter.FindHash(bloom_filter.Hash(v)); + } + // (this is a crude check, see FPPTest below for a more rigourous formula) + EXPECT_LE(false_positives, 2); - // Deserialize Bloom filter from memory - ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); - ::arrow::io::BufferReader source(buffer); + // Serialize Bloom filter to memory output stream + auto sink = CreateOutputStream(); + bloom_filter.WriteTo(sink.get()); - BlockSplitBloomFilter de_bloom = BlockSplitBloomFilter::Deserialize(&source); + // Deserialize Bloom filter from memory + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + ::arrow::io::BufferReader source(buffer); - for (int i = 0; i < 10; i++) { - EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i))); + ReaderProperties reader_properties; + BlockSplitBloomFilter de_bloom = + BlockSplitBloomFilter::Deserialize(reader_properties, &source); + + // Lookup previously inserted values + for (const auto v : kIntInserts) { + EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(v))); + } + false_positives = 0; + for (const auto v : kNegativeIntLookups) { + false_positives += de_bloom.FindHash(de_bloom.Hash(v)); + } + EXPECT_LE(false_positives, 2); } } @@ -159,20 +196,22 @@ TEST(FPPTest, TestBloomFilter) { TEST(CompatibilityTest, TestBloomFilter) { const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"}; const std::string bloom_filter_test_binary = - std::string(test::get_data_dir()) + "/bloom_filter.bin"; + std::string(test::get_data_dir()) + "/bloom_filter.xxhash.bin"; PARQUET_ASSIGN_OR_THROW(auto handle, ::arrow::io::ReadableFile::Open(bloom_filter_test_binary)); PARQUET_ASSIGN_OR_THROW(int64_t size, handle->GetSize()); - // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length) - EXPECT_EQ(size, 1036); + // 16 bytes (thrift header) + 1024 bytes (bitset) + EXPECT_EQ(size, 1040); std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]()); PARQUET_ASSIGN_OR_THROW(auto buffer, handle->Read(size)); ::arrow::io::BufferReader source(buffer); - BlockSplitBloomFilter bloom_filter1 = BlockSplitBloomFilter::Deserialize(&source); + ReaderProperties reader_properties; + BlockSplitBloomFilter bloom_filter1 = + BlockSplitBloomFilter::Deserialize(reader_properties, &source); for (int i = 0; i < 4; i++) { const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()), @@ -242,6 +281,56 @@ TEST(OptimalValueTest, TestBloomFilter) { UINT32_C(1073741824)); } -} // namespace test +// The test below is plainly copied from parquet-mr and serves as a basic sanity +// check of our XXH64 wrapper. +const int64_t HASHES_OF_LOOPING_BYTES_WITH_SEED_0[32] = { + -1205034819632174695L, -1642502924627794072L, 5216751715308240086L, + -1889335612763511331L, -13835840860730338L, -2521325055659080948L, + 4867868962443297827L, 1498682999415010002L, -8626056615231480947L, + 7482827008138251355L, -617731006306969209L, 7289733825183505098L, + 4776896707697368229L, 1428059224718910376L, 6690813482653982021L, + -6248474067697161171L, 4951407828574235127L, 6198050452789369270L, + 5776283192552877204L, -626480755095427154L, -6637184445929957204L, + 8370873622748562952L, -1705978583731280501L, -7898818752540221055L, + -2516210193198301541L, 8356900479849653862L, -4413748141896466000L, + -6040072975510680789L, 1451490609699316991L, -7948005844616396060L, + 8567048088357095527L, -4375578310507393311L}; + +/** + * Test data is output of the following program with xxHash implementation + * from https://github.com/Cyan4973/xxHash with commit + * c8c4cc0f812719ce1f5b2c291159658980e7c255 + * + * #define XXH_INLINE_ALL + * #include "xxhash.h" + * #include <stdlib.h> + * #include <stdio.h> + * int main() + * { + * char* src = (char*) malloc(32); + * const int N = 32; + * for (int i = 0; i < N; i++) { + * src[i] = (char) i; + * } + * + * printf("without seed\n"); + * for (int i = 0; i <= N; i++) { + * printf("%lldL,\n", (long long) XXH64(src, i, 0)); + * } + * } + */ +TEST(XxHashTest, TestBloomFilter) { + uint8_t bytes[32] = {}; + + for (int i = 0; i < 32; i++) { + ByteArray byteArray(i, bytes); + bytes[i] = i; + + auto hasher_seed_0 = std::make_unique<XxHasher>(); + EXPECT_EQ(HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], hasher_seed_0->Hash(&byteArray)) + << "Hash with seed 0 Error: " << i; + } +} +} // namespace test } // namespace parquet diff --git a/cpp/src/parquet/murmur3.cc b/cpp/src/parquet/murmur3.cc deleted file mode 100644 index 07a936e041..0000000000 --- a/cpp/src/parquet/murmur3.cc +++ /dev/null @@ -1,222 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//----------------------------------------------------------------------------- -// MurmurHash3 was written by Austin Appleby, and is placed in the public -// domain. The author hereby disclaims copyright to this source code. - -// Note - The x86 and x64 versions do _not_ produce the same results, as the -// algorithms are optimized for their respective platforms. You can still -// compile and run any of them on any platform, but your performance with the -// non-native version will be less than optimal. - -#include "parquet/murmur3.h" - -namespace parquet { - -#if defined(_MSC_VER) - -#define FORCE_INLINE __forceinline -#define ROTL64(x, y) _rotl64(x, y) - -#else // defined(_MSC_VER) - -#define FORCE_INLINE inline __attribute__((always_inline)) -inline uint64_t rotl64(uint64_t x, int8_t r) { return (x << r) | (x >> (64 - r)); } -#define ROTL64(x, y) rotl64(x, y) - -#endif // !defined(_MSC_VER) - -#define BIG_CONSTANT(x) (x##LLU) - -//----------------------------------------------------------------------------- -// Block read - if your platform needs to do endian-swapping or can only -// handle aligned reads, do the conversion here - -FORCE_INLINE uint32_t getblock32(const uint32_t* p, int i) { return p[i]; } - -FORCE_INLINE uint64_t getblock64(const uint64_t* p, int i) { return p[i]; } - -//----------------------------------------------------------------------------- -// Finalization mix - force all bits of a hash block to avalanche - -FORCE_INLINE uint32_t fmix32(uint32_t h) { - h ^= h >> 16; - h *= 0x85ebca6b; - h ^= h >> 13; - h *= 0xc2b2ae35; - h ^= h >> 16; - - return h; -} - -//---------- - -FORCE_INLINE uint64_t fmix64(uint64_t k) { - k ^= k >> 33; - k *= BIG_CONSTANT(0xff51afd7ed558ccd); - k ^= k >> 33; - k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); - k ^= k >> 33; - - return k; -} - -//----------------------------------------------------------------------------- - -void Hash_x64_128(const void* key, const int len, const uint32_t seed, uint64_t out[2]) { - const uint8_t* data = (const uint8_t*)key; - const int nblocks = len / 16; - - uint64_t h1 = seed; - uint64_t h2 = seed; - - const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); - const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); - - //---------- - // body - - const uint64_t* blocks = (const uint64_t*)(data); - - for (int i = 0; i < nblocks; i++) { - uint64_t k1 = getblock64(blocks, i * 2 + 0); - uint64_t k2 = getblock64(blocks, i * 2 + 1); - - k1 *= c1; - k1 = ROTL64(k1, 31); - k1 *= c2; - h1 ^= k1; - - h1 = ROTL64(h1, 27); - h1 += h2; - h1 = h1 * 5 + 0x52dce729; - - k2 *= c2; - k2 = ROTL64(k2, 33); - k2 *= c1; - h2 ^= k2; - - h2 = ROTL64(h2, 31); - h2 += h1; - h2 = h2 * 5 + 0x38495ab5; - } - - //---------- - // tail - - const uint8_t* tail = (const uint8_t*)(data + nblocks * 16); - - uint64_t k1 = 0; - uint64_t k2 = 0; - - switch (len & 15) { - case 15: - k2 ^= ((uint64_t)tail[14]) << 48; // fall through - case 14: - k2 ^= ((uint64_t)tail[13]) << 40; // fall through - case 13: - k2 ^= ((uint64_t)tail[12]) << 32; // fall through - case 12: - k2 ^= ((uint64_t)tail[11]) << 24; // fall through - case 11: - k2 ^= ((uint64_t)tail[10]) << 16; // fall through - case 10: - k2 ^= ((uint64_t)tail[9]) << 8; // fall through - case 9: - k2 ^= ((uint64_t)tail[8]) << 0; - k2 *= c2; - k2 = ROTL64(k2, 33); - k2 *= c1; - h2 ^= k2; // fall through - - case 8: - k1 ^= ((uint64_t)tail[7]) << 56; // fall through - case 7: - k1 ^= ((uint64_t)tail[6]) << 48; // fall through - case 6: - k1 ^= ((uint64_t)tail[5]) << 40; // fall through - case 5: - k1 ^= ((uint64_t)tail[4]) << 32; // fall through - case 4: - k1 ^= ((uint64_t)tail[3]) << 24; // fall through - case 3: - k1 ^= ((uint64_t)tail[2]) << 16; // fall through - case 2: - k1 ^= ((uint64_t)tail[1]) << 8; // fall through - case 1: - k1 ^= ((uint64_t)tail[0]) << 0; - k1 *= c1; - k1 = ROTL64(k1, 31); - k1 *= c2; - h1 ^= k1; - } - - //---------- - // finalization - - h1 ^= len; - h2 ^= len; - - h1 += h2; - h2 += h1; - - h1 = fmix64(h1); - h2 = fmix64(h2); - - h1 += h2; - h2 += h1; - - reinterpret_cast<uint64_t*>(out)[0] = h1; - reinterpret_cast<uint64_t*>(out)[1] = h2; -} - -template <typename T> -uint64_t HashHelper(T value, uint32_t seed) { - uint64_t output[2]; - Hash_x64_128(reinterpret_cast<void*>(&value), sizeof(T), seed, output); - return output[0]; -} - -uint64_t MurmurHash3::Hash(int32_t value) const { return HashHelper(value, seed_); } - -uint64_t MurmurHash3::Hash(int64_t value) const { return HashHelper(value, seed_); } - -uint64_t MurmurHash3::Hash(float value) const { return HashHelper(value, seed_); } - -uint64_t MurmurHash3::Hash(double value) const { return HashHelper(value, seed_); } - -uint64_t MurmurHash3::Hash(const FLBA* value, uint32_t len) const { - uint64_t out[2]; - Hash_x64_128(reinterpret_cast<const void*>(value->ptr), len, seed_, out); - return out[0]; -} - -uint64_t MurmurHash3::Hash(const Int96* value) const { - uint64_t out[2]; - Hash_x64_128(reinterpret_cast<const void*>(value->value), sizeof(value->value), seed_, - out); - return out[0]; -} - -uint64_t MurmurHash3::Hash(const ByteArray* value) const { - uint64_t out[2]; - Hash_x64_128(reinterpret_cast<const void*>(value->ptr), value->len, seed_, out); - return out[0]; -} - -} // namespace parquet diff --git a/cpp/src/parquet/xxhasher.cc b/cpp/src/parquet/xxhasher.cc new file mode 100644 index 0000000000..18dfae3d98 --- /dev/null +++ b/cpp/src/parquet/xxhasher.cc @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/xxhasher.h" + +#define XXH_INLINE_ALL +#include "arrow/vendored/xxhash/xxhash.h" + +namespace parquet { + +namespace { +template <typename T> +uint64_t XxHashHelper(T value, uint32_t seed) { + return XXH64(reinterpret_cast<const void*>(&value), sizeof(T), seed); +} +} // namespace + +uint64_t XxHasher::Hash(int32_t value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::Hash(int64_t value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::Hash(float value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::Hash(double value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::Hash(const FLBA* value, uint32_t len) const { + return XXH64(reinterpret_cast<const void*>(value->ptr), len, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::Hash(const Int96* value) const { + return XXH64(reinterpret_cast<const void*>(value->value), sizeof(value->value), + kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::Hash(const ByteArray* value) const { + return XXH64(reinterpret_cast<const void*>(value->ptr), value->len, + kParquetBloomXxHashSeed); +} + +} // namespace parquet diff --git a/cpp/src/parquet/murmur3.h b/cpp/src/parquet/xxhasher.h similarity index 65% rename from cpp/src/parquet/murmur3.h rename to cpp/src/parquet/xxhasher.h index acf7088e44..4a21f14eb2 100644 --- a/cpp/src/parquet/murmur3.h +++ b/cpp/src/parquet/xxhasher.h @@ -15,10 +15,6 @@ // specific language governing permissions and limitations // under the License. -//----------------------------------------------------------------------------- -// MurmurHash3 was written by Austin Appleby, and is placed in the public -// domain. The author hereby disclaims copyright to this source code. - #pragma once #include <cstdint> @@ -29,12 +25,8 @@ namespace parquet { -/// Source: -/// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp -/// (Modified to adapt to coding conventions and to inherit the Hasher abstract class) -class PARQUET_EXPORT MurmurHash3 : public Hasher { +class PARQUET_EXPORT XxHasher : public Hasher { public: - MurmurHash3() : seed_(DEFAULT_SEED) {} uint64_t Hash(int32_t value) const override; uint64_t Hash(int64_t value) const override; uint64_t Hash(float value) const override; @@ -43,12 +35,7 @@ class PARQUET_EXPORT MurmurHash3 : public Hasher { uint64_t Hash(const ByteArray* value) const override; uint64_t Hash(const FLBA* val, uint32_t len) const override; - private: - // Default seed for hash which comes from Bloom filter in parquet-mr, it is generated - // by System.nanoTime() of java. - static constexpr int DEFAULT_SEED = 1361930890; - - uint32_t seed_; + static constexpr int kParquetBloomXxHashSeed = 0; }; } // namespace parquet diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 5b82793ef7..e2d244ab9a 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 5b82793ef7196f7b3583e85669ced211cd8b5ff2 +Subproject commit e2d244ab9a84d382e3a50f55db41f362e450428b diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index edc42d54cf..e56fd33c35 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -578,6 +578,11 @@ Miscellaneous +--------------------------+----------+----------+---------+ | Offset Index | ✓ | | \(1) | +--------------------------+----------+----------+---------+ +| Bloom Filter | ✓ | ✓ | \(2) | ++--------------------------+----------+----------+---------+ * \(1) Access to the Column and Offset Index structures is provided, but data read APIs do not currently make any use of them. + +* \(2) APIs are provided for creating, serializing and deserializing Bloom + Filters, but they are not integrated into data read APIs.