kszucs commented on code in PR #45360: URL: https://github.com/apache/arrow/pull/45360#discussion_r1983890061
########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_hashtable.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +// create a fake null array class with a GetView method returning 0 always +class FakeNullArray { + public: + uint8_t GetView(int64_t i) const { return 0; } + + std::shared_ptr<::arrow::DataType> type() const { return ::arrow::null(); } + + int64_t null_count() const { return 0; } +}; + +static uint64_t GetMask(uint64_t min_size, uint64_t max_size, uint8_t norm_factor) { + // we aim for gaussian-like distribution of chunk sizes between min_size and max_size + uint64_t avg_size = (min_size + max_size) / 2; + // we skip calculating gearhash for the first `min_size` bytes, so we are looking for + // a smaller chunk as the average size + uint64_t target_size = avg_size - min_size; + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution + // `norm_factor` narrows the chunk size distribution aroun avg_size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + std::pair<uint64_t, uint64_t> size_range, + uint8_t norm_factor) + : level_info_(level_info), + min_size_(size_range.first), + max_size_(size_range.second), + hash_mask_(GetMask(size_range.first, size_range.second, norm_factor)) {} + +template <typename T> +void ContentDefinedChunker::Roll(const T value) { + constexpr size_t BYTE_WIDTH = sizeof(T); + chunk_size_ += BYTE_WIDTH; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + auto bytes = reinterpret_cast<const uint8_t*>(&value); + for (size_t i = 0; i < BYTE_WIDTH; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][bytes[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +void ContentDefinedChunker::Roll(std::string_view value) { + chunk_size_ += value.size(); + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (char c : value) { + rolling_hash_ = + (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][static_cast<uint8_t>(c)]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +bool ContentDefinedChunker::NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has exponential distribution, and + // we use central limit theorem to approximate normal distribution) + if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) { + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) { + // we have a hard limit on the maximum chunk size, not that we don't reset the rolling Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org