kszucs commented on code in PR #45360:
URL: https://github.com/apache/arrow/pull/45360#discussion_r2086711091


##########
cpp/src/parquet/chunker_internal.cc:
##########
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/chunker_internal.h"
+
+#include <cstdint>
+#include <iterator>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging_internal.h"
+#include "arrow/visit_type_inline.h"
+#include "parquet/chunker_internal_generated.h"
+#include "parquet/exception.h"
+#include "parquet/level_conversion.h"
+
+namespace parquet::internal {
+
+using ::arrow::internal::checked_cast;
+
+static_assert(std::size(kGearhashTable) == kNumGearhashTables,
+              "should update CDC code to reflect number of generated hash 
tables");
+static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8,
+              "each table should have 256 entries of 64 bit values");
+
+namespace {
+
+/// Calculate the mask to use for the rolling hash, the mask is used to 
determine if a
+/// new chunk should be created based on the rolling hash value. The mask is 
calculated
+/// based on the min_chunk_size, max_chunk_size and norm_level parameters.
+///
+/// Assuming that the gear hash hash random values with a uniform 
distribution, then each
+/// bit in the actual value of rolling_hash_ has even probability of being set 
so a mask
+/// with the top N bits set has a probability of 1/2^N of matching the rolling 
hash. This
+/// is the judgment criteria for the original gear hash based content-defined 
chunking.
+/// The main drawback of this approach is the non-uniform distribution of the 
chunk sizes.
+///
+/// Later on the FastCDC has improved the process by introducing:
+/// - sub-minimum chunk cut-point skipping (not hashing the first 
`min_chunk_size` bytes)
+/// - chunk size normalization (using two masks)
+///
+/// This implementation uses cut-point skipping because it improves the overall
+/// performance and a more accurate alternative to have less skewed chunk size
+/// distribution. Instead of using two different masks (one with a lower and 
one with a
+/// higher probability of matching and switching them based on the actual 
chunk size), we
+/// rather use 8 different gear hash tables and require having 8 consecutive 
matches while
+/// switching between the used hashtables. This approach is based on central 
limit theorem
+/// and approximates normal distribution of the chunk sizes.
+//
+// @param min_chunk_size The minimum chunk size (default 256KiB)
+// @param max_chunk_size The maximum chunk size (default 1MiB)
+// @param norm_level Normalization level (default 0)
+// @return The mask used to compare against the rolling hash
+uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, int 
norm_level) {
+  if (min_chunk_size < 0) {
+    throw ParquetException("min_chunk_size must be non-negative");
+  }
+  if (max_chunk_size <= min_chunk_size) {
+    throw ParquetException("max_chunk_size must be greater than 
min_chunk_size");
+  }
+
+  // calculate the average size of the chunks
+  int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2;
+  // since we are skipping the first `min_chunk_size` bytes for each chunk, we 
need to
+  // target a smaller chunk size to reach the average size after skipping the 
first
+  // `min_chunk_size` bytes; also divide by the number of gearhash tables to 
have a
+  // a more gaussian-like distribution
+  int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables;
+
+  // assuming that the gear hash has a uniform distribution, we can calculate 
the mask
+  // by taking the floor(log2(target_size))
+  int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) 
- 1);
+
+  // a user defined `norm_level` can be used to adjust the mask size, hence 
the matching
+  // probability, by increasing the norm_level we increase the probability of 
matching
+  // the mask, forcing the distribution closer to the average size; norm_level 
is 0 by
+  // default
+  int effective_bits = mask_bits - norm_level;
+
+  if (effective_bits < 1 || effective_bits > 63) {
+    throw ParquetException(
+        "The number of bits in the CDC mask must be between 1 and 63, got " +
+        std::to_string(effective_bits));
+  } else {
+    // create the mask by setting the top bits
+    return std::numeric_limits<uint64_t>::max() << (64 - effective_bits);
+  }
+}
+
+}  // namespace
+
+class ContentDefinedChunker::Impl {
+ public:
+  Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t 
max_chunk_size,
+       int norm_level)
+      : level_info_(level_info),
+        min_chunk_size_(min_chunk_size),
+        max_chunk_size_(max_chunk_size),
+        rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, 
norm_level)) {}
+
+  uint64_t GetRollingHashMask() const { return rolling_hash_mask_; }
+
+  void Roll(bool value) {
+    if (++chunk_size_ < min_chunk_size_) {
+      // short-circuit if we haven't reached the minimum chunk size, this 
speeds up the
+      // chunking process since the gearhash doesn't need to be updated
+      return;
+    }
+    rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value];
+    has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0);
+  }
+
+  template <int kByteWidth>
+  void Roll(const uint8_t* value) {
+    // Update the rolling hash with a compile-time known sized value, set 
has_matched_ to
+    // true if the hash matches the mask.
+    chunk_size_ += kByteWidth;
+    if (chunk_size_ < min_chunk_size_) {
+      // short-circuit if we haven't reached the minimum chunk size, this 
speeds up the
+      // chunking process since the gearhash doesn't need to be updated
+      return;
+    }
+    for (size_t i = 0; i < kByteWidth; ++i) {
+      rolling_hash_ = (rolling_hash_ << 1) + 
kGearhashTable[nth_run_][value[i]];
+      has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 
0);
+    }
+  }
+
+  template <typename T>
+  void Roll(const T* value) {
+    return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value));
+  }
+
+  void Roll(const uint8_t* value, int64_t length) {
+    // Update the rolling hash with a binary-like value, set has_matched_ to 
true if the
+    // hash matches the mask.
+    chunk_size_ += length;
+    if (chunk_size_ < min_chunk_size_) {
+      // short-circuit if we haven't reached the minimum chunk size, this 
speeds up the
+      // chunking process since the gearhash doesn't need to be updated
+      return;
+    }
+    for (auto i = 0; i < length; ++i) {
+      rolling_hash_ = (rolling_hash_ << 1) + 
kGearhashTable[nth_run_][value[i]];
+      has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 
0);
+    }
+  }
+
+  bool NeedNewChunk() {
+    // decide whether to create a new chunk based on the rolling hash; 
has_matched_ is
+    // set to true if we encountered a match since the last NeedNewChunk() call
+    if (ARROW_PREDICT_FALSE(has_matched_)) {
+      has_matched_ = false;
+      // in order to have a normal distribution of chunk sizes, we only create 
a new chunk
+      // if the adjused mask matches the rolling hash 8 times in a row, each 
run uses a
+      // different gearhash table (gearhash's chunk size has geometric 
distribution, and
+      // we use central limit theorem to approximate normal distribution, see
+      // section 6.2.1 in paper 
https://www.cidrdb.org/cidr2023/papers/p43-low.pdf)
+      if (ARROW_PREDICT_FALSE(++nth_run_ >= kNumGearhashTables)) {
+        // note that we choose not to reset the rolling hash state here, nor 
anywhere else
+        // in the code, in practice this doesn't seem to affect the chunking 
effectiveness
+        nth_run_ = 0;
+        chunk_size_ = 0;
+        return true;
+      }
+    }
+    if (ARROW_PREDICT_FALSE(chunk_size_ >= max_chunk_size_)) {
+      // we have a hard limit on the maximum chunk size, note that we don't 
reset the
+      // rolling hash state here, so the next NeedNewChunk() call will 
continue from the
+      // current state
+      chunk_size_ = 0;
+      return true;
+    }
+    return false;
+  }
+
+  void ValidateChunks(const std::vector<Chunk>& chunks, int64_t num_levels) 
const {
+    // chunks must be non-empty and monotonic increasing
+    ARROW_DCHECK(!chunks.empty());
+
+    // the first chunk must start at the first level
+    auto first_chunk = chunks.front();
+    ARROW_DCHECK_EQ(first_chunk.level_offset, 0);
+    ARROW_DCHECK_EQ(first_chunk.value_offset, 0);
+
+    // the following chunks must be contiguous, non-overlapping and 
monotonically
+    // increasing
+    auto sum_levels = first_chunk.levels_to_write;
+    for (size_t i = 1; i < chunks.size(); ++i) {
+      auto chunk = chunks[i];
+      auto prev_chunk = chunks[i - 1];
+      ARROW_DCHECK_GT(chunk.levels_to_write, 0);
+      ARROW_DCHECK_GE(chunk.value_offset, prev_chunk.value_offset);
+      ARROW_DCHECK_EQ(chunk.level_offset,
+                      prev_chunk.level_offset + prev_chunk.levels_to_write);
+      sum_levels += chunk.levels_to_write;
+    }
+    ARROW_DCHECK_EQ(sum_levels, num_levels);
+
+    // the last chunk must end at the last level
+    auto last_chunk = chunks.back();
+    ARROW_DCHECK_EQ(last_chunk.level_offset + last_chunk.levels_to_write, 
num_levels);
+  }
+
+  template <typename RollFunc>
+  std::vector<Chunk> Calculate(const int16_t* def_levels, const int16_t* 
rep_levels,
+                               int64_t num_levels, const RollFunc& RollValue) {
+    // Calculate the chunk boundaries for typed Arrow arrays.

Review Comment:
   The chunking process applies to the entire column across all pages and row 
groups and only initialized when the column writer is created. This way 
chunking can be continued between different WriteArrow calls. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to