mapleFU commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r1313729706
##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -335,7 +336,7 @@ install(FILES
"${CMAKE_CURRENT_BINARY_DIR}/parquet_version.h"
add_parquet_test(internals-test
SOURCES
bloom_filter_test.cc
- bloom_filter_reader_test.cc
+ bloom_filter_parquet_test.cc
Review Comment:
Oh I keep the name for `bloom_filter_test.cc`, change
`bloom_filter_reader_test.cc` to `bloom_filter_reader_writer_test.cc`
##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace schema {
+class ColumnPath;
+}
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
Review Comment:
IMO, BloomFilterBuilder is better because the class `BloomFilter` is more
like a BloomFilterWriter
##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2335,28 +2466,28 @@ std::shared_ptr<ColumnWriter>
ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
- metadata, std::move(pager), use_dictionary, encoding, properties);
+ metadata, std::move(pager), use_dictionary, encoding, properties,
bloom_filter);
Review Comment:
Nice catch!
##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
return Status::OK();
}
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+ int64_t num_values) {
+ if (bloom_filter_) {
+ // TODO(mwish): Would it allocate too much memory? Would an
std::array<uint64_t, 64>
+ // better here?
+ std::vector<uint64_t> hashes(num_values);
+ bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+ bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+ }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+ int64_t num_values) {
+ if (bloom_filter_) {
+ for (int64_t i = 0; i < num_values; ++i) {
+ bloom_filter_->InsertHash(bloom_filter_->Hash(values + i,
descr_->type_length()));
+ }
+ }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*,
int64_t) {
+ DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+ int64_t num_values,
+ const uint8_t*
valid_bits,
+ int64_t
valid_bits_offset) {
+ if (bloom_filter_) {
+ ::arrow::internal::VisitSetBitRunsVoid(
+ valid_bits, valid_bits_offset, num_values, [&](int64_t position,
int64_t length) {
+ for (int64_t i = 0; i < length; i++) {
+ bloom_filter_->InsertHash(bloom_filter_->Hash(values + i +
position));
+ }
+ });
+ }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*,
int64_t,
+ const
uint8_t*,
+ int64_t) {
+ DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA*
values,
+ int64_t
num_values,
+ const uint8_t*
valid_bits,
+ int64_t
valid_bits_offset) {
+ if (bloom_filter_) {
+ ::arrow::internal::VisitSetBitRunsVoid(
+ valid_bits, valid_bits_offset, num_values, [&](int64_t position,
int64_t length) {
+ for (int64_t i = 0; i < length; i++) {
+ bloom_filter_->InsertHash(
+ bloom_filter_->Hash(values + i + position,
descr_->type_length()));
+ }
+ });
+ }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType&
array) {
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](const std::string_view& view) {
Review Comment:
Not fully understand how can we batch hashes here. Since hash should be
ignored for null values.
##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -373,7 +375,6 @@ class TestPrimitiveWriter : public
PrimitiveTypedTest<TestType> {
const ColumnDescriptor* descr_;
- private:
Review Comment:
The code here is:
```
protected:
...
private:
...
```
After remove private, they're already protected
##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,113 @@ Status
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
return Status::OK();
}
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+ int64_t num_values) {
+ if (bloom_filter_) {
+ // TODO(mwish): Would it allocate too much memory? Would an
std::array<uint64_t, 64>
+ // better here?
+ std::vector<uint64_t> hashes(num_values);
Review Comment:
done
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,104 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
/*null_counts=*/{0}}));
}
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
Review Comment:
Can I just separate another pr for that?
##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace schema {
+class ColumnPath;
+}
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+ /// \brief API convenience to create a BloomFilterBuilder.
+ static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor*
schema,
+ const WriterProperties&
properties);
+
+ /// Append a new row group to host all incoming bloom filters.
+ virtual void AppendRowGroup() = 0;
+
+ /// \brief Get the BloomFilter from column ordinal.
+ ///
+ /// \param column_ordinal Column ordinal in schema, which is only for leaf
columns.
+ /// \param bloom_filter_options The options(like num distinct values and
false positive
+ /// rate) to create a BloomFilter.
+ ///
+ /// \return BloomFilter for the column and its memory ownership belongs to
the
+ /// BloomFilterBuilder.
+ virtual BloomFilter* GetOrCreateBloomFilter(
+ int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options)
= 0;
+
+ /// \brief Write the bloom filter to sink.
+ ///
+ /// \param[out] sink The output stream to write the bloom filter.
+ /// \param[out] location The location of all bloom filter to the start of
sink.
+ virtual void WriteTo(::arrow::io::OutputStream* sink,
+ BloomFilterLocation* location) = 0;
+
+ /// \brief Complete the bloom filter builder and no more write is allowed.
Review Comment:
Is it ok now?
##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2163,6 +2183,9 @@ Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(
// ----------------------------------------------------------------------
// Write Arrow to BYTE_ARRAY
+template <>
+void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
+ const ::arrow::Array& values);
Review Comment:
```
arrow/cpp/src/parquet/column_writer.cc:2423:44: error: explicit
specialization of 'UpdateBloomFilterArray' after instantiation
void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
```
Otherwise this would be raised.
##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
return Status::OK();
}
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+ int64_t num_values) {
+ if (bloom_filter_) {
+ // TODO(mwish): Would it allocate too much memory? Would an
std::array<uint64_t, 64>
+ // better here?
+ std::vector<uint64_t> hashes(num_values);
+ bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+ bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+ }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+ int64_t num_values) {
+ if (bloom_filter_) {
+ for (int64_t i = 0; i < num_values; ++i) {
+ bloom_filter_->InsertHash(bloom_filter_->Hash(values + i,
descr_->type_length()));
+ }
+ }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*,
int64_t) {
+ DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+ int64_t num_values,
+ const uint8_t*
valid_bits,
+ int64_t
valid_bits_offset) {
+ if (bloom_filter_) {
+ ::arrow::internal::VisitSetBitRunsVoid(
+ valid_bits, valid_bits_offset, num_values, [&](int64_t position,
int64_t length) {
+ for (int64_t i = 0; i < length; i++) {
+ bloom_filter_->InsertHash(bloom_filter_->Hash(values + i +
position));
+ }
+ });
+ }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*,
int64_t,
+ const
uint8_t*,
+ int64_t) {
+ DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA*
values,
+ int64_t
num_values,
+ const uint8_t*
valid_bits,
+ int64_t
valid_bits_offset) {
+ if (bloom_filter_) {
+ ::arrow::internal::VisitSetBitRunsVoid(
+ valid_bits, valid_bits_offset, num_values, [&](int64_t position,
int64_t length) {
+ for (int64_t i = 0; i < length; i++) {
+ bloom_filter_->InsertHash(
+ bloom_filter_->Hash(values + i + position,
descr_->type_length()));
+ }
+ });
+ }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType&
array) {
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](const std::string_view& view) {
+ const ByteArray value{view};
+ bloom_filter->InsertHash(bloom_filter->Hash(&value));
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterArray(const
::arrow::Array& values) {
+ // Only ByteArray type would write ::arrow::Array directly.
+ ParquetException::NYI("Unreachable");
+}
+
+template <>
+void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
+ const ::arrow::Array& values) {
+ if (bloom_filter_) {
+ if (!::arrow::is_base_binary_like(values.type_id())) {
+ throw ParquetException("Only BaseBinaryArray and subclasses supported");
+ }
+ if (::arrow::is_dictionary(values.type_id())) {
+ // Dictionary is handled in WriteArrowDense, so this should never happen.
+ throw ParquetException("UpdateBloomFilterArray not support dictionary");
+ }
+
+ if (::arrow::is_binary_like(values.type_id())) {
+ UpdateBinaryBloomFilter(bloom_filter_,
+ checked_cast<const
::arrow::BinaryArray&>(values));
+ } else {
+ DCHECK(::arrow::is_large_binary_like(values.type_id()));
+ UpdateBinaryBloomFilter(bloom_filter_,
+ checked_cast<const
::arrow::LargeBinaryArray&>(values));
+ }
+ }
+}
+
Review Comment:
Hmmm would this introduce some problem like hard to debug? Maybe we should
separate an issue for that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]