pitrou commented on code in PR #50008:
URL: https://github.com/apache/arrow/pull/50008#discussion_r3340956945
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +348,88 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream*
sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+ const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+ if (num_folds > 0) {
+ Fold(num_folds);
+ }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+ const uint32_t num_blocks = NumBlocks();
+ if (num_blocks < 2) {
+ return 0;
+ }
+ DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+ // Estimate the fill rate after folding from the current average fill rate.
+ // Folding ORs block groups together, so each fold changes the estimated
fill rate
+ // from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits,
making
+ // the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
+ //
+ // See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
+ // https://hal.science/hal-01126174v1
+ uint64_t total_set_bits = 0;
+ const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+ const uint32_t num_words = num_bytes_ /
static_cast<uint32_t>(sizeof(uint32_t));
+ for (uint32_t i = 0; i < num_words; ++i) {
+ total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+ }
+
+ const double avg_fill = static_cast<double>(total_set_bits) /
+ (static_cast<double>(num_blocks) *
kBytesPerFilterBlock * 8);
+ const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+ if (avg_fill == 0.0) {
+ return max_folds;
+ }
+
+ uint32_t num_folds = 0;
+ double unset_probability_after_folds = 1.0 - avg_fill;
+ for (uint32_t i = 0; i < max_folds; ++i) {
+ unset_probability_after_folds *= unset_probability_after_folds;
+ const double folded_fill_rate = 1.0 - unset_probability_after_folds;
+ const double estimated_fpp = std::pow(folded_fill_rate, kBitsSetPerBlock);
+ if (estimated_fpp > target_fpp) {
+ break;
+ }
+ ++num_folds;
+ }
+ return num_folds;
+}
+
+void BlockSplitBloomFilter::Fold(uint32_t num_folds) {
+ DCHECK_GT(num_folds, 0);
+
+ const uint32_t num_blocks = NumBlocks();
+ const uint32_t group_size = UINT32_C(1) << num_folds;
Review Comment:
Can you add comments? It's not obvious what a "group size" is.
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +348,88 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream*
sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+ const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+ if (num_folds > 0) {
+ Fold(num_folds);
+ }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+ const uint32_t num_blocks = NumBlocks();
+ if (num_blocks < 2) {
+ return 0;
+ }
+ DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+ // Estimate the fill rate after folding from the current average fill rate.
+ // Folding ORs block groups together, so each fold changes the estimated
fill rate
+ // from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits,
making
+ // the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
+ //
+ // See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
+ // https://hal.science/hal-01126174v1
+ uint64_t total_set_bits = 0;
+ const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+ const uint32_t num_words = num_bytes_ /
static_cast<uint32_t>(sizeof(uint32_t));
+ for (uint32_t i = 0; i < num_words; ++i) {
+ total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+ }
+
+ const double avg_fill = static_cast<double>(total_set_bits) /
+ (static_cast<double>(num_blocks) *
kBytesPerFilterBlock * 8);
Review Comment:
More simply
```suggestion
const double avg_fill = static_cast<double>(total_set_bits) / (num_bytes_
* 8);
```
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +348,88 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream*
sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+ const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+ if (num_folds > 0) {
+ Fold(num_folds);
+ }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+ const uint32_t num_blocks = NumBlocks();
+ if (num_blocks < 2) {
+ return 0;
+ }
+ DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+ // Estimate the fill rate after folding from the current average fill rate.
+ // Folding ORs block groups together, so each fold changes the estimated
fill rate
+ // from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits,
making
+ // the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
+ //
+ // See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
+ // https://hal.science/hal-01126174v1
+ uint64_t total_set_bits = 0;
+ const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+ const uint32_t num_words = num_bytes_ /
static_cast<uint32_t>(sizeof(uint32_t));
+ for (uint32_t i = 0; i < num_words; ++i) {
+ total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+ }
+
+ const double avg_fill = static_cast<double>(total_set_bits) /
+ (static_cast<double>(num_blocks) *
kBytesPerFilterBlock * 8);
+ const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+ if (avg_fill == 0.0) {
+ return max_folds;
+ }
+
+ uint32_t num_folds = 0;
+ double unset_probability_after_folds = 1.0 - avg_fill;
+ for (uint32_t i = 0; i < max_folds; ++i) {
+ unset_probability_after_folds *= unset_probability_after_folds;
+ const double folded_fill_rate = 1.0 - unset_probability_after_folds;
+ const double estimated_fpp = std::pow(folded_fill_rate, kBitsSetPerBlock);
+ if (estimated_fpp > target_fpp) {
+ break;
+ }
+ ++num_folds;
+ }
+ return num_folds;
Review Comment:
With this algorithm the actual size reduction will always be a power of 2
(`group_size = UINT32_C(1) << num_folds`). Why aren't we trying to be more
granular?
##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -150,6 +150,84 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
}
}
+namespace {
+
+struct BloomFilterBuilderFoldingTestCase {
+ int64_t ndv;
+ bool fold;
+ int32_t inserted_count;
+ int64_t expected_bitset_ndv;
+};
+
+class BloomFilterBuilderFoldingTest
+ : public ::testing::TestWithParam<BloomFilterBuilderFoldingTestCase> {};
+
+} // namespace
+
+TEST_P(BloomFilterBuilderFoldingTest, RespectsOption) {
+ const auto& test_case = GetParam();
+
+ SchemaDescriptor schema;
+ schema::NodePtr root =
+ schema::GroupNode::Make("schema", Repetition::REPEATED,
{schema::ByteArray("c1")});
+ schema.Init(root);
+
+ constexpr double kFpp = 0.05;
+ BloomFilterOptions bloom_filter_options{
+ .ndv = test_case.ndv, .fpp = kFpp, .fold = test_case.fold};
+ const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
+ bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
+ WriterProperties::Builder properties_builder;
+ properties_builder.enable_bloom_filter("c1", bloom_filter_options);
+ auto writer_properties = properties_builder.build();
+ auto bloom_filter_builder = BloomFilterBuilder::Make(&schema,
writer_properties.get());
+
+ bloom_filter_builder->AppendRowGroup();
+ auto bloom_filter =
bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0);
+ ASSERT_NE(bloom_filter, nullptr);
+ ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize());
+
+ std::vector<uint64_t> hashes;
+ hashes.reserve(test_case.inserted_count);
+ for (int32_t i = 0; i < test_case.inserted_count; ++i) {
+ const auto hash = bloom_filter->Hash(i);
+ hashes.push_back(hash);
+ bloom_filter->InsertHash(hash);
+ }
+
+ auto sink = CreateOutputStream();
+ auto locations = bloom_filter_builder->WriteTo(sink.get());
+ ASSERT_EQ(locations.size(), 1);
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+ const auto& location = locations.front().second;
+ ReaderProperties reader_properties;
+ ::arrow::io::BufferReader reader(
+ ::arrow::SliceBuffer(buffer, location.offset, location.length));
+ auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties,
&reader);
+
+
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(test_case.expected_bitset_ndv,
kFpp),
+ filter.GetBitsetSize());
+ for (uint64_t hash : hashes) {
+ EXPECT_TRUE(filter.FindHash(hash));
+ }
Review Comment:
Should we check that most non-inserted values are not found, with an actual
FPP value below `kFpp`?
##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +348,88 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream*
sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+ const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+ if (num_folds > 0) {
+ Fold(num_folds);
+ }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+ const uint32_t num_blocks = NumBlocks();
+ if (num_blocks < 2) {
+ return 0;
+ }
+ DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+ // Estimate the fill rate after folding from the current average fill rate.
+ // Folding ORs block groups together, so each fold changes the estimated
fill rate
+ // from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits,
making
+ // the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
+ //
+ // See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
+ // https://hal.science/hal-01126174v1
+ uint64_t total_set_bits = 0;
+ const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+ const uint32_t num_words = num_bytes_ /
static_cast<uint32_t>(sizeof(uint32_t));
+ for (uint32_t i = 0; i < num_words; ++i) {
+ total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+ }
+
+ const double avg_fill = static_cast<double>(total_set_bits) /
+ (static_cast<double>(num_blocks) *
kBytesPerFilterBlock * 8);
+ const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+ if (avg_fill == 0.0) {
+ return max_folds;
+ }
+
+ uint32_t num_folds = 0;
+ double unset_probability_after_folds = 1.0 - avg_fill;
+ for (uint32_t i = 0; i < max_folds; ++i) {
+ unset_probability_after_folds *= unset_probability_after_folds;
+ const double folded_fill_rate = 1.0 - unset_probability_after_folds;
+ const double estimated_fpp = std::pow(folded_fill_rate, kBitsSetPerBlock);
+ if (estimated_fpp > target_fpp) {
+ break;
+ }
+ ++num_folds;
+ }
+ return num_folds;
+}
+
+void BlockSplitBloomFilter::Fold(uint32_t num_folds) {
+ DCHECK_GT(num_folds, 0);
+
+ const uint32_t num_blocks = NumBlocks();
+ const uint32_t group_size = UINT32_C(1) << num_folds;
+ DCHECK_LE(group_size, num_blocks);
+
+ const uint32_t new_num_blocks = num_blocks / group_size;
+ auto* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+ for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) {
+ uint32_t* dst = bitset32 + dst_block * kBitsSetPerBlock;
+
+ const uint32_t src_block = dst_block * group_size;
+ const uint32_t* src = bitset32 + src_block * kBitsSetPerBlock;
+ if (dst != src) {
+ std::copy_n(src, kBitsSetPerBlock, dst);
+ }
+
+ for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) {
+ src = bitset32 + (src_block + fold_block) * kBitsSetPerBlock;
+ for (int word = 0; word < kBitsSetPerBlock; ++word) {
+ dst[word] |= src[word];
+ }
+ }
+ }
+
+ num_bytes_ = new_num_blocks * kBytesPerFilterBlock;
Review Comment:
`data_` is now oversized, would it be useful to shrink it here?
##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -185,8 +185,21 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
const WriterProperties* properties_;
bool finished_ = false;
- using RowGroupBloomFilters =
- std::map</*column_id=*/int32_t, std::shared_ptr<BloomFilter>>;
+ struct RowGroupBloomFilters {
+ RowGroupBloomFilters() = default;
+ RowGroupBloomFilters(RowGroupBloomFilters&&) noexcept = default;
Review Comment:
Or we just keep using `std::shared_ptr` instead of `std::unique_ptr`. Is it
important here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]