wgtmac commented on code in PR #37400: URL: https://github.com/apache/arrow/pull/37400#discussion_r2593028729
########## cpp/src/parquet/bloom_filter_writer.cc: ########## @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/bloom_filter_writer.h" + +#include <map> +#include <utility> + +#include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/unreachable.h" + +#include "parquet/exception.h" +#include "parquet/metadata.h" +#include "parquet/properties.h" +#include "parquet/schema.h" +#include "parquet/types.h" + +namespace parquet { + +constexpr int64_t kHashBatchSize = 256; + +template <typename ParquetType> +BloomFilterWriter<ParquetType>::BloomFilterWriter(const ColumnDescriptor* descr, + BloomFilter* bloom_filter) + : descr_(descr), bloom_filter_(bloom_filter) {} + +template <typename ParquetType> +bool BloomFilterWriter<ParquetType>::bloom_filter_enabled() const { + return bloom_filter_ != nullptr; +} + +template <typename ParquetType> +void BloomFilterWriter<ParquetType>::Update(const T* values, int64_t num_values) { + if (!bloom_filter_enabled()) { + return; + } + + if constexpr (std::is_same_v<ParquetType, BooleanType>) { + throw ParquetException("Bloom filter is not supported for boolean type"); + } + + std::array<uint64_t, kHashBatchSize> hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + auto batch_size = static_cast<int>(std::min(kHashBatchSize, num_values - i)); + if constexpr (std::is_same_v<ParquetType, FLBAType>) { + bloom_filter_->Hashes(values, descr_->type_length(), batch_size, hashes.data()); + } else { + bloom_filter_->Hashes(values, batch_size, hashes.data()); + } + bloom_filter_->InsertHashes(hashes.data(), batch_size); + } +} + +template <> +void BloomFilterWriter<BooleanType>::Update(const bool*, int64_t) { + if (!bloom_filter_enabled()) { + return; + } + throw ParquetException("Bloom filter is not supported for boolean type"); +} + +template <typename ParquetType> +void BloomFilterWriter<ParquetType>::UpdateSpaced(const T* values, int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (!bloom_filter_enabled()) { + return; + } + + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - i)); + if constexpr (std::is_same_v<ParquetType, FLBAType>) { + bloom_filter_->Hashes(values + i + position, descr_->type_length(), + batch_size, hashes.data()); + } else { + bloom_filter_->Hashes(values + i + position, batch_size, hashes.data()); + } + bloom_filter_->InsertHashes(hashes.data(), batch_size); + } + }); +} + +template <> +void BloomFilterWriter<BooleanType>::UpdateSpaced(const bool*, int64_t, const uint8_t*, + int64_t) { + if (!bloom_filter_enabled()) { + return; + } + throw ParquetException("Bloom filter is not supported for boolean type"); +} + +template <typename ParquetType> +void BloomFilterWriter<ParquetType>::Update(const ::arrow::Array& values) { + ::arrow::Unreachable("Update for non-ByteArray type should be unreachable"); +} + +namespace { + +template <typename ArrayType> +void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& array) { + // Using a small batch size because an extra `byte_arrays` is used. + constexpr int64_t kBinaryHashBatchSize = 64; + std::array<ByteArray, kBinaryHashBatchSize> byte_arrays; + std::array<uint64_t, kBinaryHashBatchSize> hashes; + + auto batch_insert_hashes = [&](int count) { + if (count > 0) { + bloom_filter.Hashes(byte_arrays.data(), count, hashes.data()); + bloom_filter.InsertHashes(hashes.data(), count); + } + }; + + int batch_idx = 0; + ::arrow::internal::VisitSetBitRunsVoid( + array.null_bitmap_data(), array.offset(), array.length(), + [&](int64_t position, int64_t run_length) { + for (int64_t i = 0; i < run_length; ++i) { + byte_arrays[batch_idx++] = array.GetView(position + i); + if (batch_idx == kBinaryHashBatchSize) { + batch_insert_hashes(batch_idx); + batch_idx = 0; + } + } + }); + batch_insert_hashes(batch_idx); +} + +} // namespace + +template <> +void BloomFilterWriter<ByteArrayType>::Update(const ::arrow::Array& values) { + if (!bloom_filter_enabled()) { + return; + } + + if (::arrow::is_binary_view_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::BinaryViewArray&>(values)); + } else if (::arrow::is_binary_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::BinaryArray&>(values)); + } else if (::arrow::is_large_binary_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::LargeBinaryArray&>(values)); + } else { + ParquetException::NYI("Bloom filter is not supported for this Arrow type: " + + values.type()->ToString()); + } +} + +template class BloomFilterWriter<BooleanType>; Review Comment: This is required. Otherwise it will crash like below: ``` (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x0) * frame #0: 0x0000000000000000 frame #1: 0x0000000101f580dc libparquet.2300.dylib`std::__1::__unique_if<parquet::BloomFilterWriter<parquet::PhysicalType<(parquet::Type::type)0>>>::__unique_single std::__1::make_unique[abi:ne190102]<parquet::BloomFilterWriter<parquet::PhysicalType<(parquet::Type::type)0>>, parquet::ColumnDescriptor const*, parquet::BloomFilter*&>(__args=0x000000016fdfd0e8, __args=0x000000016fdfd118) at unique_ptr.h:635:30 frame #2: 0x0000000101f576f0 libparquet.2300.dylib`parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>::TypedColumnWriterImpl(this=0x00000001200086c8, metadata=0x000000012000b9a0, pager=nullptr, use_dictionary=false, encoding=PLAIN, properties=0x0000000120009b70, bloom_filter=0x0000000000000000) at column_writer.cc:1301:9 frame #3: 0x0000000101f57224 libparquet.2300.dylib`parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>::TypedColumnWriterImpl(this=0x00000001200086c8, metadata=0x000000012000b9a0, pager=nullptr, use_dictionary=false, encoding=PLAIN, properties=0x0000000120009b70, bloom_filter=0x0000000000000000) at column_writer.cc:1289:38 frame #4: 0x0000000101f57168 libparquet.2300.dylib`void std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>::construct[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>, parquet::ColumnChunkMetaDataBuilder*&, std::__1::unique_ptr<parquet::PageWriter, std::__1::default_delete<parquet::PageWriter>>, bool const&, parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t>(this=0x000000016fdfd2a3, __p=0x00000001200086c8, __args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, __args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at allocator.h:165:24 frame #5: 0x0000000101f56f54 libparquet.2300.dylib`void std::__1::allocator_traits<std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>>::construct[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>, parquet::ColumnChunkMetaDataBuilder*&, std::__1::unique_ptr<parquet::PageWriter, std::__1::default_delete<parquet::PageWriter>>, bool const&, parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 0>(__a=0x000000016fdfd2a3, __p=0x00000001200086c8, __args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, __args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at allocator_traits.h:319:9 frame #6: 0x0000000101f56e54 libparquet.2300.dylib`std::__1::__shared_ptr_emplace<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>, std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>>::__shared_ptr_emplace[abi:ne190102]<parquet::ColumnChunkMetaDataBuilder*&, std::__1::unique_ptr<parquet::PageWriter, std::__1::default_delete<parquet::PageWriter>>, bool const&, parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>, 0>(this=0x00000001200086b0, __a=allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0> > > @ 0x000000016fdfd2ef, __args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, __args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:266:5 frame #7: 0x0000000101f56b08 libparquet.2300.dylib`std::__1::__shared_ptr_emplace<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>, std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>>::__shared_ptr_emplace[abi:ne190102]<parquet::ColumnChunkMetaDataBuilder*&, std::__1::unique_ptr<parquet::PageWriter, std::__1::default_delete<parquet::PageWriter>>, bool const&, parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>, 0>(this=0x00000001200086b0, __a=allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0> > > @ 0x000000016fdfd34f, __args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, __args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:263:115 frame #8: 0x0000000101f569b4 libparquet.2300.dylib`std::__1::shared_ptr<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>> std::__1::allocate_shared[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>, std::__1::allocator<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>>, parquet::ColumnChunkMetaDataBuilder*&, std::__1::unique_ptr<parquet::PageWriter, std::__1::default_delete<parquet::PageWriter>>, bool const&, parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 0>(__a=0x000000016fdfd427, __args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, __args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:845:51 frame #9: 0x0000000101f1ebe8 libparquet.2300.dylib`std::__1::shared_ptr<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>> std::__1::make_shared[abi:ne190102]<parquet::TypedColumnWriterImpl<parquet::PhysicalType<(parquet::Type::type)0>>, parquet::ColumnChunkMetaDataBuilder*&, std::__1::unique_ptr<parquet::PageWriter, std::__1::default_delete<parquet::PageWriter>>, bool const&, parquet::Encoding::type&, parquet::WriterProperties const*&, std::nullptr_t, 0>(__args=0x000000016fdfd630, __args=nullptr, __args=0x000000016fdfd60f, __args=0x000000016fdfd5e8, __args=0x000000016fdfd620, __args=0x000000016fdfd5c0) at shared_ptr.h:853:10 frame #10: 0x0000000101f1e8ac libparquet.2300.dylib`parquet::ColumnWriter::Make(metadata=0x000000012000b9a0, pager=nullptr, properties=0x0000000120009b70, bloom_filter=0x0000000000000000) at column_writer.cc:2686:14 frame #11: 0x0000000102089860 libparquet.2300.dylib`parquet::RowGroupSerializer::CreateColumnWriterForColumn(this=0x000000012000b900, col_meta=0x000000012000b9a0, column_ordinal=0) const at file_writer.cc:304:12 frame #12: 0x0000000102088c44 libparquet.2300.dylib`parquet::RowGroupSerializer::NextColumn(this=0x000000012000b900) at file_writer.cc:146:26 frame #13: 0x000000010207fd14 libparquet.2300.dylib`parquet::RowGroupWriter::NextColumn(this=0x000000012000b990) at file_writer.cc:56:64 frame #14: 0x00000001001d6e90 parquet-internals-test`parquet::test::TestStatistics<parquet::PhysicalType<(parquet::Type::type)0>>::TestFullRoundtrip(this=0x00000001200091c0, num_values=100, null_count=31) at statistics_test.cc:436:69 frame #15: 0x00000001001d6948 parquet-internals-test`parquet::test::TestStatistics_FullRoundtrip_Test<parquet::PhysicalType<(parquet::Type::type)0>>::TestBody(this=0x00000001200091c0) at statistics_test.cc:607:3 frame #16: 0x0000000101071f50 libarrow_testing.2300.dylib`void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) + 104 frame #17: 0x0000000101071e98 libarrow_testing.2300.dylib`testing::Test::Run() + 208 frame #18: 0x0000000101072a78 libarrow_testing.2300.dylib`testing::TestInfo::Run() + 212 frame #19: 0x00000001010734fc libarrow_testing.2300.dylib`testing::TestSuite::Run() + 456 frame #20: 0x000000010107e924 libarrow_testing.2300.dylib`testing::internal::UnitTestImpl::RunAllTests() + 1328 frame #21: 0x000000010107e2d0 libarrow_testing.2300.dylib`bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) + 104 frame #22: 0x000000010107e234 libarrow_testing.2300.dylib`testing::UnitTest::Run() + 124 frame #23: 0x0000000100262f30 parquet-internals-test`main + 68 frame #24: 0x000000019594ab98 dyld`start + 6076 ``` EDIT: this can be removed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
