mapleFU commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r1352267359


##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API convenience to create a BloomFilterBuilder.

Review Comment:
   Yes. It's a static factory



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  void Finish() override { finished_ = true; }
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::vector<std::unique_ptr<BloomFilter>>;
+  std::vector<RowGroupBloomFilters> file_bloom_filters_;
+};
+
+std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
+    const SchemaDescriptor* schema, const WriterProperties& properties) {
+  return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
+}
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to finished BloomFilterBuilder.");
+  }
+  RowGroupBloomFilters row_group_bloom_filters(schema_->num_columns());
+  file_bloom_filters_.emplace_back(std::move(row_group_bloom_filters));
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+    return nullptr;
+  }
+  auto bloom_filter_options_opt =
+      
properties_.bloom_filter_options(schema_->Column(column_ordinal)->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions& bloom_filter_options = bloom_filter_options_opt.value();
+  std::unique_ptr<BloomFilter>& bloom_filter = 
file_bloom_filters_.back()[column_ordinal];
+  if (bloom_filter == nullptr) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_.memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    bloom_filter = std::move(block_split_bloom_filter);
+  }
+  return bloom_filter.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (!finished_) {
+    throw ParquetException("Cannot call WriteTo() to unfinished 
PageIndexBuilder.");
+  }
+  if (file_bloom_filters_.empty()) {

Review Comment:
   Sounds reasonable



##########
cpp/src/parquet/properties.h:
##########
@@ -532,6 +559,43 @@ class PARQUET_EXPORT WriterProperties {
       return this->disable_statistics(path->ToDotString());
     }
 
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default disabled.
+    Builder* disable_bloom_filter(const std::string& path) {
+      bloom_filter_options_[path] = std::nullopt;
+      return this;
+    }
+
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default enabled.
+    Builder* disable_bloom_filter(const std::shared_ptr<schema::ColumnPath>& 
path) {
+      return this->disable_bloom_filter(path->ToDotString());
+    }
+
+    /// Enable bloom filter options for the column specified by `path`.
+    ///
+    /// Default disabled.
+    ///
+    /// Note: Currently we don't support bloom filter for boolean columns,
+    /// so if enable bloom filter for boolean columns, it will be ignored.

Review Comment:
   I keep the same just the same as `use_dictionary` for `BOOLEAN` column. 
Also, we doesn't have schema in builder



##########
cpp/src/parquet/properties.h:
##########
@@ -757,6 +825,19 @@ class PARQUET_EXPORT WriterProperties {
     return false;
   }
 
+  bool bloom_filter_enabled() const {
+    // Note: We do not encourage enabling bloom filter for all columns. So
+    // default_column_properties_.bloom_filter_enabled is always false and
+    // cannot be altered by user. Thus we can safely skip checking it here.
+    return std::any_of(column_properties_.begin(), column_properties_.end(),
+                       [](auto& p) { return p.second.bloom_filter_enabled(); 
});

Review Comment:
   Because `column_properties_` is an mapping, should I use a decltype or spell 
out the type?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  void Finish() override { finished_ = true; }
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::vector<std::unique_ptr<BloomFilter>>;

Review Comment:
   Okay, however `map` or `unordered_map` should keep a `shared_ptr`, let me 
change it



##########
cpp/src/parquet/metadata.cc:
##########
@@ -1793,36 +1793,63 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
   }
 
   void SetPageIndexLocation(const PageIndexLocation& location) {
-    auto set_index_location =
+    auto set_index_location = [this](size_t row_group_ordinal,
+                                     const FileIndexLocation& 
file_index_location,
+                                     bool column_index) {
+      auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
+      auto iter = file_index_location.find(row_group_ordinal);
+      if (iter != file_index_location.cend()) {
+        const auto& row_group_index_location = iter->second;
+        for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+          if (i >= row_group_metadata.columns.size()) {
+            throw ParquetException("Cannot find metadata for column ordinal ", 
i);
+          }
+          auto& column_metadata = row_group_metadata.columns.at(i);
+          const auto& index_location = row_group_index_location.at(i);
+          if (index_location.has_value()) {
+            if (column_index) {
+              
column_metadata.__set_column_index_offset(index_location->offset);
+              
column_metadata.__set_column_index_length(index_location->length);
+            } else {
+              
column_metadata.__set_offset_index_offset(index_location->offset);
+              
column_metadata.__set_offset_index_length(index_location->length);
+            }
+          }
+        }
+      }
+    };
+
+    for (size_t i = 0; i < row_groups_.size(); ++i) {
+      set_index_location(i, location.column_index_location, true);
+      set_index_location(i, location.offset_index_location, false);
+    }
+  }
+
+  // Update location to all bloom filters in the parquet file.
+  void SetBloomFilterLocation(const BloomFilterLocation& location) {
+    auto set_bloom_filter_location =
         [this](size_t row_group_ordinal,
-               const PageIndexLocation::FileIndexLocation& file_index_location,
-               bool column_index) {
+               const FileIndexLocation& file_bloom_filter_location) {
           auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
-          auto iter = file_index_location.find(row_group_ordinal);
-          if (iter != file_index_location.cend()) {
-            const auto& row_group_index_location = iter->second;
-            for (size_t i = 0; i < row_group_index_location.size(); ++i) {
-              if (i >= row_group_metadata.columns.size()) {
-                throw ParquetException("Cannot find metadata for column 
ordinal ", i);
-              }
-              auto& column_metadata = row_group_metadata.columns.at(i);
-              const auto& index_location = row_group_index_location.at(i);
-              if (index_location.has_value()) {
-                if (column_index) {
-                  
column_metadata.__set_column_index_offset(index_location->offset);
-                  
column_metadata.__set_column_index_length(index_location->length);
-                } else {
-                  
column_metadata.__set_offset_index_offset(index_location->offset);
-                  
column_metadata.__set_offset_index_length(index_location->length);
-                }
+          auto iter = file_bloom_filter_location.find(row_group_ordinal);
+          if (iter != file_bloom_filter_location.cend()) {
+            const auto& row_group_bloom_filter_location = iter->second;
+            for (size_t i = 0; i < row_group_bloom_filter_location.size(); 
++i) {
+              DCHECK(i < row_group_metadata.columns.size());

Review Comment:
   Nice catch, will change `at` to `operator[]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to