kszucs commented on code in PR #45360: URL: https://github.com/apache/arrow/pull/45360#discussion_r2086220901
########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +namespace { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_level parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_level Normalization level (default 0) +// @return The mask used to compare against the rolling hash +uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, int norm_level) { + if (min_chunk_size < 0) { + throw ParquetException("min_chunk_size must be non-negative"); + } + if (max_chunk_size <= min_chunk_size) { + throw ParquetException("max_chunk_size must be greater than min_chunk_size"); + } + + // calculate the average size of the chunks + int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2; + // since we are skipping the first `min_chunk_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_chunk_size` bytes; also divide by the number of gearhash tables to have a + // a more gaussian-like distribution + int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables; + + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the floor(log2(target_size)) + int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) - 1); + + // a user defined `norm_level` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_level we increase the probability of matching + // the mask, forcing the distribution closer to the average size; norm_level is 0 by + // default + int effective_bits = mask_bits - norm_level; + + if (effective_bits < 1 || effective_bits > 63) { + throw ParquetException( + "The number of bits in the CDC mask must be between 1 and 63, got " + + std::to_string(effective_bits)); + } else { + // create the mask by setting the top bits + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); + } +} + +} // namespace + +class ContentDefinedChunker::Impl { + public: + Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t max_chunk_size, + int norm_level) + : level_info_(level_info), + min_chunk_size_(min_chunk_size), + max_chunk_size_(max_chunk_size), + rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, norm_level)) {} + + uint64_t GetRollingHashMask() const { return rolling_hash_mask_; } + + void Roll(bool value) { + if (++chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the Review Comment: `WriterProperties` contains most of the controllable properties, ideally we should be able to serialize those settings in the KV metadata. Though I am not sure about the serialization format since JSON is not available. Another example for this use case could be storing page indexes for existing parquet files with keeping the exact same row-group/page layout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org