wgtmac commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r1339556198


##########
cpp/src/parquet/properties.h:
##########
@@ -139,6 +139,15 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::UNKNOWN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+struct PARQUET_EXPORT BloomFilterOptions {
+  /// The number of distinct values to expect to be inserted into the bloom.

Review Comment:
   ```suggestion
     /// Expected number of distinct values to be inserted into the bloom 
filter.
   ```



##########
cpp/src/parquet/properties.h:
##########
@@ -532,6 +559,43 @@ class PARQUET_EXPORT WriterProperties {
       return this->disable_statistics(path->ToDotString());
     }
 
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default disabled.
+    Builder* disable_bloom_filter(const std::string& path) {
+      bloom_filter_options_[path] = std::nullopt;
+      return this;
+    }
+
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default enabled.
+    Builder* disable_bloom_filter(const std::shared_ptr<schema::ColumnPath>& 
path) {
+      return this->disable_bloom_filter(path->ToDotString());
+    }
+
+    /// Enable bloom filter options for the column specified by `path`.
+    ///
+    /// Default disabled.
+    ///
+    /// Note: Currently we don't support bloom filter for boolean columns,
+    /// so if enable bloom filter for boolean columns, it will be ignored.
+    Builder* enable_bloom_filter_options(BloomFilterOptions 
bloom_filter_options,
+                                         const std::string& path) {
+      bloom_filter_options_[path] = bloom_filter_options;
+      return this;
+    }
+
+    /// Enable bloom filter options for the column specified by `path`.
+    ///
+    /// Default disabled.
+    ///
+    /// Note: Currently we don't support bloom filter for boolean columns,
+    /// so if enable bloom filter for boolean columns, it will be ignored.

Review Comment:
   ```suggestion
       /// so it will be ignored if the column is of boolean type
   ```



##########
cpp/src/parquet/properties.h:
##########
@@ -139,6 +139,15 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::UNKNOWN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+struct PARQUET_EXPORT BloomFilterOptions {
+  /// The number of distinct values to expect to be inserted into the bloom.
+  int32_t ndv = DEFAULT_BLOOM_FILTER_NDV;
+  /// The false positive probability expected from the bloom.

Review Comment:
   ```suggestion
     /// False-positive probability of the bloom filter.
   ```



##########
cpp/src/parquet/properties.h:
##########
@@ -532,6 +559,43 @@ class PARQUET_EXPORT WriterProperties {
       return this->disable_statistics(path->ToDotString());
     }
 
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default disabled.
+    Builder* disable_bloom_filter(const std::string& path) {
+      bloom_filter_options_[path] = std::nullopt;
+      return this;
+    }
+
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default enabled.
+    Builder* disable_bloom_filter(const std::shared_ptr<schema::ColumnPath>& 
path) {
+      return this->disable_bloom_filter(path->ToDotString());
+    }
+
+    /// Enable bloom filter options for the column specified by `path`.
+    ///
+    /// Default disabled.
+    ///
+    /// Note: Currently we don't support bloom filter for boolean columns,
+    /// so if enable bloom filter for boolean columns, it will be ignored.

Review Comment:
   ```suggestion
       /// so it will be ignored if the column is of boolean type
   ```



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1614,5 +1615,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(

Review Comment:
   The default values seem not necessary?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1614,5 +1615,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size = SMALL_SIZE,
+      const ColumnProperties& column_properties = ColumnProperties());
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  wp_builder.max_statistics_size(column_properties.max_statistics_size());
+  auto path = this->schema_.Column(0)->path();
+  if (column_properties.bloom_filter_enabled()) {
+    wp_builder.enable_bloom_filter_options(
+        column_properties.bloom_filter_options().value(), path);
+  } else {
+    wp_builder.disable_bloom_filter(path);
+  }
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ = BloomFilterBuilder::Make(&this->schema_, 
*this->writer_properties_);
+  // Initial RowGroup
+  builder_->AppendRowGroup();
+  bloom_filter_ = builder_->GetOrCreateBloomFilter(0);
+  std::shared_ptr<ColumnWriter> writer =
+      ColumnWriter::Make(this->metadata_.get(), std::move(pager),
+                         this->writer_properties_.get(), bloom_filter_);
+  return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
+}
+
+// Note: BooleanType is Excluded.
+using TestBloomFilterTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, 
FloatType,
+                                              DoubleType, ByteArrayType, 
FLBAType>;
+
+TYPED_TEST_SUITE(TestBloomFilterWriter, TestBloomFilterTypes);
+
+TYPED_TEST(TestBloomFilterWriter, Basic) {
+  this->GenerateData(SMALL_SIZE);
+  ColumnProperties column_properties;
+  BloomFilterOptions options;
+  options.ndv = 10;

Review Comment:
   Should we replace 10 with SMALL_SIZE?



##########
cpp/src/parquet/properties.h:
##########
@@ -186,6 +195,17 @@ class PARQUET_EXPORT ColumnProperties {
     page_index_enabled_ = page_index_enabled;
   }
 
+  void set_bloom_filter_options(std::optional<BloomFilterOptions> 
bloom_filter_options) {
+    if (bloom_filter_options) {
+      if (bloom_filter_options->fpp > 1.0 || bloom_filter_options->fpp < 0.0) {
+        throw ParquetException(
+            "Bloom Filter False positive probability must be between 0.0 and 
1.0, got " +

Review Comment:
   ```suggestion
               "Bloom filter false-positive probability must fall in [0.0, 
1.0], got " +
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2322,12 +2347,133 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                            hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, descr_->type_length(),
+                            static_cast<int>(current_hash_batch_size), 
hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);

Review Comment:
   Should we throw an Unreachable here?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1614,5 +1615,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size = SMALL_SIZE,
+      const ColumnProperties& column_properties = ColumnProperties());
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||

Review Comment:
   Do we actually need lines between 1641 and 1648?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"

Review Comment:
   Move this to below line 30?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2322,12 +2347,133 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                            hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, descr_->type_length(),
+                            static_cast<int>(current_hash_batch_size), 
hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i += kHashBatchSize) {
+            auto current_hash_batch_size = std::min(kHashBatchSize, length - 
i);
+            bloom_filter_->Hashes(values + i + position,
+                                  static_cast<int>(current_hash_batch_size),
+                                  hashes.data());
+            bloom_filter_->InsertHashes(hashes.data(),
+                                        
static_cast<int>(current_hash_batch_size));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  DCHECK(bloom_filter_ == nullptr);

Review Comment:
   ditto



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2322,12 +2347,133 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                            hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, descr_->type_length(),
+                            static_cast<int>(current_hash_batch_size), 
hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i += kHashBatchSize) {
+            auto current_hash_batch_size = std::min(kHashBatchSize, length - 
i);
+            bloom_filter_->Hashes(values + i + position,
+                                  static_cast<int>(current_hash_batch_size),
+                                  hashes.data());
+            bloom_filter_->InsertHashes(hashes.data(),
+                                        
static_cast<int>(current_hash_batch_size));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i += kHashBatchSize) {
+            auto current_hash_batch_size = std::min(kHashBatchSize, length - 
i);
+            bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                  static_cast<int>(current_hash_batch_size),
+                                  hashes.data());
+            bloom_filter_->InsertHashes(hashes.data(),
+                                        
static_cast<int>(current_hash_batch_size));
+          }
+        });
+  }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType& 
array) {
+  PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+      *array.data(),
+      [&](const std::string_view& view) {
+        bloom_filter->InsertHash(bloom_filter->Hash(view));
+        return Status::OK();
+      },
+      []() { return Status::OK(); }));
+}
+
+template <>
+void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
+    const ::arrow::Array& values) {
+  if (bloom_filter_) {
+    if (!::arrow::is_base_binary_like(values.type_id())) {
+      throw ParquetException("Only BaseBinaryArray and subclasses supported");
+    }
+
+    if (::arrow::is_binary_like(values.type_id())) {
+      UpdateBinaryBloomFilter(bloom_filter_,
+                              checked_cast<const 
::arrow::BinaryArray&>(values));
+    } else {
+      DCHECK(::arrow::is_large_binary_like(values.type_id()));
+      UpdateBinaryBloomFilter(bloom_filter_,
+                              checked_cast<const 
::arrow::LargeBinaryArray&>(values));
+    }
+  }
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterArray(const 
::arrow::Array& values) {
+  // Only ByteArray type would write ::arrow::Array directly.
+  ParquetException::NYI("Unreachable");

Review Comment:
   Use ::arrow::Unreachable()?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,95 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto read_properties = default_arrow_reader_properties();

Review Comment:
   read_properties is not used.



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,95 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {

Review Comment:
   It seems that expect_columns_without_filter is not necessary if there is 
only one test case. 



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,95 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto read_properties = default_arrow_reader_properties();
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(bloom_filter, nullptr);
+        } else {
+          bloom_filters_.push_back(std::move(bloom_filter));
+        }
+      }
+    }
+  }
+
+  template <typename ArrowType>
+  void VerifyBloomFilter(const BloomFilter* bloom_filter,
+                         const ::arrow::ChunkedArray& chunked_array) {
+    for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
+      if (value == std::nullopt) {
+        continue;
+      }
+      EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
+    }
+  }
+
+ protected:
+  std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
+};
+
+TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {
+  BloomFilterOptions options;
+  options.ndv = 100;
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_bloom_filter_options(options, "c0")
+                               ->enable_bloom_filter_options(options, "c1")
+                               ->max_row_group_length(4)
+                               ->build();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+        [1,     "a"],
+        [2,     "b"],
+        [3,     "c"],
+        [null,  "d"],
+        [5,     null],
+        [6,     "f"]
+  ])"});
+  WriteFile(writer_properties, table);
+
+  ReadBloomFilters(/*expect_num_row_groups=*/2);
+  ASSERT_EQ(4, bloom_filters_.size());

Review Comment:
   These lines have repeated a lot. Could we put all sliced arrays in a single 
place and then use a for loop to verify them?



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/testing/gtest_util.h>
+#include <gtest/gtest.h>
+
+#include "parquet/bloom_filter.h"
+#include "parquet/bloom_filter_builder.h"
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/test_util.h"
+
+namespace parquet::test {
+
+TEST(BloomFilterReader, ReadBloomFilter) {
+  std::string dir_string(parquet::test::get_data_dir());

Review Comment:
   ```suggestion
     std::string dir_string(get_data_dir());
   ```



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/testing/gtest_util.h>

Review Comment:
   ```suggestion
   #include "arrow/testing/gtest_util.h"
   ```



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/testing/gtest_util.h>
+#include <gtest/gtest.h>
+
+#include "parquet/bloom_filter.h"
+#include "parquet/bloom_filter_builder.h"
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/test_util.h"
+
+namespace parquet::test {
+
+TEST(BloomFilterReader, ReadBloomFilter) {
+  std::string dir_string(parquet::test::get_data_dir());
+  std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";
+  auto reader = ParquetFileReader::OpenFile(path, false);
+  auto file_metadata = reader->metadata();
+  EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+  auto& bloom_filter_reader = reader->GetBloomFilterReader();
+  auto row_group_0 = bloom_filter_reader.RowGroup(0);
+  ASSERT_NE(nullptr, row_group_0);
+  EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
+  auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+  ASSERT_NE(nullptr, bloom_filter);
+  EXPECT_THROW(row_group_0->GetColumnBloomFilter(1), ParquetException);
+
+  // assert exists
+  {
+    std::string_view sv = "Hello";
+    ByteArray ba{sv};
+    EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+  }
+
+  // no exists
+  {
+    std::string_view sv = "NOT_EXISTS";
+    ByteArray ba{sv};
+    EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+  }
+}
+
+TEST(BloomFilterReader, FileNotHaveBloomFilter) {
+  // Can still get a BloomFilterReader and a RowGroupBloomFilter
+  // reader, but cannot get a non-null BloomFilter.
+  std::string dir_string(parquet::test::get_data_dir());

Review Comment:
   ```suggestion
     std::string dir_string(get_data_dir());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to