wgtmac commented on code in PR #45360: URL: https://github.com/apache/arrow/pull/45360#discussion_r2083708018
########## cpp/src/parquet/chunker_internal.h: ########## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <vector> + +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + // The start offset of this chunk inside the given levels + int64_t level_offset; + // The start offset of this chunk inside the given values array + int64_t value_offset; + // The length of the chunk in levels + int64_t levels_to_write; +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which is contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... Review Comment: I just don't quite understand how does the rolling hash can perfectly produce page1, page4 as above. I need to read the paper and blogs more carefully but cannot promise that my math background allows me to totally understand it. :) ########## cpp/src/parquet/properties.h: ########## @@ -275,10 +307,38 @@ class PARQUET_EXPORT WriterProperties { page_checksum_enabled_(properties.page_checksum_enabled()), size_statistics_level_(properties.size_statistics_level()), sorting_columns_(properties.sorting_columns()), - default_column_properties_(properties.default_column_properties()) {} + default_column_properties_(properties.default_column_properties()), + content_defined_chunking_enabled_( + properties.content_defined_chunking_enabled()), + content_defined_chunking_options_( + properties.content_defined_chunking_options()) {} virtual ~Builder() {} + /// \brief EXPERIMENTAL: Use content-defined page chunking for all columns. + /// + /// Optimize parquet files for content addressable storage (CAS) systems by writing Review Comment: It is worth noting that only WriteArrow interface is supported. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +namespace { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_level parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_level Normalization level (default 0) +// @return The mask used to compare against the rolling hash +uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, int norm_level) { + if (min_chunk_size < 0) { + throw ParquetException("min_chunk_size must be non-negative"); + } + if (max_chunk_size <= min_chunk_size) { + throw ParquetException("max_chunk_size must be greater than min_chunk_size"); + } + + // calculate the average size of the chunks + int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2; + // since we are skipping the first `min_chunk_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_chunk_size` bytes; also divide by the number of gearhash tables to have a + // a more gaussian-like distribution + int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables; + + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the floor(log2(target_size)) + int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) - 1); + + // a user defined `norm_level` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_level we increase the probability of matching + // the mask, forcing the distribution closer to the average size; norm_level is 0 by + // default + int effective_bits = mask_bits - norm_level; + + if (effective_bits < 1 || effective_bits > 63) { + throw ParquetException( + "The number of bits in the CDC mask must be between 1 and 63, got " + + std::to_string(effective_bits)); + } else { + // create the mask by setting the top bits + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); + } +} + +} // namespace + +class ContentDefinedChunker::Impl { + public: + Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t max_chunk_size, + int norm_level) + : level_info_(level_info), + min_chunk_size_(min_chunk_size), + max_chunk_size_(max_chunk_size), + rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, norm_level)) {} + + uint64_t GetRollingHashMask() const { return rolling_hash_mask_; } + + void Roll(bool value) { + if (++chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); + } + + template <int kByteWidth> + void Roll(const uint8_t* value) { + // Update the rolling hash with a compile-time known sized value, set has_matched_ to + // true if the hash matches the mask. + chunk_size_ += kByteWidth; + if (chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < kByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); + } + } + + template <typename T> + void Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); + } + + void Roll(const uint8_t* value, int64_t length) { + // Update the rolling hash with a binary-like value, set has_matched_ to true if the + // hash matches the mask. + chunk_size_ += length; + if (chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); + } + } + + bool NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has geometric distribution, and + // we use central limit theorem to approximate normal distribution, see + // section 6.2.1 in paper https://www.cidrdb.org/cidr2023/papers/p43-low.pdf) + if (ARROW_PREDICT_FALSE(++nth_run_ >= kNumGearhashTables)) { + // note that we choose not to reset the rolling hash state here, nor anywhere else + // in the code, in practice this doesn't seem to affect the chunking effectiveness + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_chunk_size_)) { + // we have a hard limit on the maximum chunk size, note that we don't reset the + // rolling hash state here, so the next NeedNewChunk() call will continue from the + // current state + chunk_size_ = 0; + return true; + } + return false; + } + + void ValidateChunks(const std::vector<Chunk>& chunks, int64_t num_levels) const { + // chunks must be non-empty and monotonic increasing + ARROW_DCHECK(!chunks.empty()); + + // the first chunk must start at the first level + auto first_chunk = chunks.front(); + ARROW_DCHECK_EQ(first_chunk.level_offset, 0); + ARROW_DCHECK_EQ(first_chunk.value_offset, 0); + + // the following chunks must be contiguous, non-overlapping and monotonically + // increasing + auto sum_levels = first_chunk.levels_to_write; + for (size_t i = 1; i < chunks.size(); ++i) { + auto chunk = chunks[i]; + auto prev_chunk = chunks[i - 1]; + ARROW_DCHECK_GT(chunk.levels_to_write, 0); + ARROW_DCHECK_GE(chunk.value_offset, prev_chunk.value_offset); + ARROW_DCHECK_EQ(chunk.level_offset, + prev_chunk.level_offset + prev_chunk.levels_to_write); + sum_levels += chunk.levels_to_write; + } + ARROW_DCHECK_EQ(sum_levels, num_levels); + + // the last chunk must end at the last level + auto last_chunk = chunks.back(); + ARROW_DCHECK_EQ(last_chunk.level_offset + last_chunk.levels_to_write, num_levels); + } + + template <typename RollFunc> + std::vector<Chunk> Calculate(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const RollFunc& RollValue) { + // Calculate the chunk boundaries for typed Arrow arrays. + std::vector<Chunk> chunks; + int64_t offset; + int64_t prev_offset = 0; + int64_t prev_value_offset = 0; + bool has_def_levels = level_info_.def_level > 0; + bool has_rep_levels = level_info_.rep_level > 0; + + if (!has_rep_levels && !has_def_levels) { + // fastest path for non-nested non-null data + for (offset = 0; offset < num_levels; ++offset) { + RollValue(offset); + if (NeedNewChunk()) { + chunks.push_back({prev_offset, prev_offset, offset - prev_offset}); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else if (!has_rep_levels) { + // non-nested data with nulls + int16_t def_level; + for (int64_t offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + + Roll(&def_level); + if (def_level == level_info_.def_level) { + RollValue(offset); + } + if (NeedNewChunk()) { + chunks.push_back({prev_offset, prev_offset, offset - prev_offset}); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else { + // nested data with nulls + int16_t def_level; + int16_t rep_level; + int64_t value_offset = 0; + + for (offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + rep_level = rep_levels[offset]; + + Roll(&def_level); + Roll(&rep_level); + if (def_level == level_info_.def_level) { + RollValue(value_offset); + } + + if ((rep_level == 0) && NeedNewChunk()) { Review Comment: ```suggestion if (rep_level == 0 && NeedNewChunk()) { ``` ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +namespace { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_level parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_level Normalization level (default 0) +// @return The mask used to compare against the rolling hash +uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, int norm_level) { + if (min_chunk_size < 0) { + throw ParquetException("min_chunk_size must be non-negative"); + } + if (max_chunk_size <= min_chunk_size) { + throw ParquetException("max_chunk_size must be greater than min_chunk_size"); + } + + // calculate the average size of the chunks + int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2; + // since we are skipping the first `min_chunk_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_chunk_size` bytes; also divide by the number of gearhash tables to have a + // a more gaussian-like distribution + int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables; + + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the floor(log2(target_size)) + int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) - 1); + + // a user defined `norm_level` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_level we increase the probability of matching + // the mask, forcing the distribution closer to the average size; norm_level is 0 by + // default + int effective_bits = mask_bits - norm_level; + + if (effective_bits < 1 || effective_bits > 63) { + throw ParquetException( + "The number of bits in the CDC mask must be between 1 and 63, got " + + std::to_string(effective_bits)); + } else { + // create the mask by setting the top bits + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); + } +} + +} // namespace + +class ContentDefinedChunker::Impl { + public: + Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t max_chunk_size, + int norm_level) + : level_info_(level_info), + min_chunk_size_(min_chunk_size), + max_chunk_size_(max_chunk_size), + rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, norm_level)) {} + + uint64_t GetRollingHashMask() const { return rolling_hash_mask_; } + + void Roll(bool value) { + if (++chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); + } + + template <int kByteWidth> + void Roll(const uint8_t* value) { + // Update the rolling hash with a compile-time known sized value, set has_matched_ to + // true if the hash matches the mask. + chunk_size_ += kByteWidth; + if (chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < kByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); + } + } + + template <typename T> + void Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); + } + + void Roll(const uint8_t* value, int64_t length) { + // Update the rolling hash with a binary-like value, set has_matched_ to true if the + // hash matches the mask. + chunk_size_ += length; + if (chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); + } + } + + bool NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has geometric distribution, and + // we use central limit theorem to approximate normal distribution, see + // section 6.2.1 in paper https://www.cidrdb.org/cidr2023/papers/p43-low.pdf) + if (ARROW_PREDICT_FALSE(++nth_run_ >= kNumGearhashTables)) { + // note that we choose not to reset the rolling hash state here, nor anywhere else + // in the code, in practice this doesn't seem to affect the chunking effectiveness + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_chunk_size_)) { + // we have a hard limit on the maximum chunk size, note that we don't reset the + // rolling hash state here, so the next NeedNewChunk() call will continue from the + // current state + chunk_size_ = 0; + return true; + } + return false; + } + + void ValidateChunks(const std::vector<Chunk>& chunks, int64_t num_levels) const { + // chunks must be non-empty and monotonic increasing + ARROW_DCHECK(!chunks.empty()); + + // the first chunk must start at the first level + auto first_chunk = chunks.front(); + ARROW_DCHECK_EQ(first_chunk.level_offset, 0); + ARROW_DCHECK_EQ(first_chunk.value_offset, 0); + + // the following chunks must be contiguous, non-overlapping and monotonically + // increasing + auto sum_levels = first_chunk.levels_to_write; + for (size_t i = 1; i < chunks.size(); ++i) { + auto chunk = chunks[i]; + auto prev_chunk = chunks[i - 1]; + ARROW_DCHECK_GT(chunk.levels_to_write, 0); + ARROW_DCHECK_GE(chunk.value_offset, prev_chunk.value_offset); + ARROW_DCHECK_EQ(chunk.level_offset, + prev_chunk.level_offset + prev_chunk.levels_to_write); + sum_levels += chunk.levels_to_write; + } + ARROW_DCHECK_EQ(sum_levels, num_levels); + + // the last chunk must end at the last level + auto last_chunk = chunks.back(); + ARROW_DCHECK_EQ(last_chunk.level_offset + last_chunk.levels_to_write, num_levels); + } + + template <typename RollFunc> + std::vector<Chunk> Calculate(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const RollFunc& RollValue) { + // Calculate the chunk boundaries for typed Arrow arrays. + std::vector<Chunk> chunks; + int64_t offset; + int64_t prev_offset = 0; + int64_t prev_value_offset = 0; + bool has_def_levels = level_info_.def_level > 0; + bool has_rep_levels = level_info_.rep_level > 0; + + if (!has_rep_levels && !has_def_levels) { + // fastest path for non-nested non-null data + for (offset = 0; offset < num_levels; ++offset) { + RollValue(offset); + if (NeedNewChunk()) { + chunks.push_back({prev_offset, prev_offset, offset - prev_offset}); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else if (!has_rep_levels) { + // non-nested data with nulls + int16_t def_level; + for (int64_t offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + + Roll(&def_level); + if (def_level == level_info_.def_level) { + RollValue(offset); + } + if (NeedNewChunk()) { + chunks.push_back({prev_offset, prev_offset, offset - prev_offset}); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else { + // nested data with nulls + int16_t def_level; + int16_t rep_level; + int64_t value_offset = 0; + + for (offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + rep_level = rep_levels[offset]; + + Roll(&def_level); + Roll(&rep_level); + if (def_level == level_info_.def_level) { + RollValue(value_offset); + } + + if ((rep_level == 0) && NeedNewChunk()) { + // if we are at a record boundary and need a new chunk, we create a new chunk + auto levels_to_write = offset - prev_offset; + if (levels_to_write > 0) { + chunks.push_back({prev_offset, prev_value_offset, levels_to_write}); + prev_offset = offset; + prev_value_offset = value_offset; + } + } + if (def_level >= level_info_.repeated_ancestor_def_level) { + // we only increment the value offset if we have a leaf value + ++value_offset; + } + } + } + + // add the last chunk if we have any levels left + if (prev_offset < num_levels) { + chunks.push_back({prev_offset, prev_value_offset, num_levels - prev_offset}); + } +#ifndef NDEBUG + ValidateChunks(chunks, num_levels); +#endif + + return chunks; + } + + template <int kByteWidth> + std::vector<Chunk> CalculateFixedWidth(const int16_t* def_levels, + const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& values) { + const uint8_t* raw_values = + values.data()->GetValues<uint8_t>(1, 0) + values.offset() * kByteWidth; Review Comment: ```suggestion values.data()->GetValues<uint8_t>(/*i=*/1, /*absolute_offset=*/0) + values.offset() * kByteWidth; ``` Just for better readability. ########## python/pyarrow/_parquet.pxd: ########## @@ -495,6 +500,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* disable_write_page_index() Builder* enable_page_checksum() Builder* disable_page_checksum() + Builder* enable_content_defined_chunking() + Builder* disable_content_defined_chunking() + Builder* content_defined_chunking_options(const CdcOptions options) Review Comment: ```suggestion Builder* content_defined_chunking_options(CdcOptions options) ``` It seems that we can remove `const`? ########## cpp/src/parquet/properties.h: ########## @@ -260,7 +290,9 @@ class PARQUET_EXPORT WriterProperties { created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), page_checksum_enabled_(false), - size_statistics_level_(DEFAULT_SIZE_STATISTICS_LEVEL) {} + size_statistics_level_(DEFAULT_SIZE_STATISTICS_LEVEL), + content_defined_chunking_enabled_(false), + content_defined_chunking_options_(kDefaultCdcOptions) {} Review Comment: Do we still need `kDefaultCdcOptions`? `content_defined_chunking_options_({})` seems to be enough. ########## cpp/src/parquet/chunker_internal_test.cc: ########## @@ -0,0 +1,1687 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> +#include <iostream> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include <gtest/gtest.h> + +#include "arrow/table.h" +#include "arrow/testing/extension_type.h" +#include "arrow/testing/generator.h" +#include "arrow/type_fwd.h" +#include "arrow/util/float16.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/chunker_internal.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" + +namespace parquet::internal { + +using ::arrow::Array; +using ::arrow::ChunkedArray; +using ::arrow::ConcatenateTables; +using ::arrow::DataType; +using ::arrow::default_memory_pool; +using ::arrow::Field; +using ::arrow::Result; +using ::arrow::Schema; +using ::arrow::Table; +using ::arrow::internal::checked_cast; +using ::arrow::io::BufferReader; +using ::parquet::arrow::FileReader; +using ::parquet::arrow::FileReaderBuilder; +using ::parquet::arrow::MakeSimpleTable; +using ::parquet::arrow::NonNullArray; +using ::parquet::arrow::WriteTable; + +using ::testing::Bool; +using ::testing::Combine; +using ::testing::Values; + +// generate determinisic and platform-independent data +inline uint64_t hash(uint64_t seed, uint64_t index) { + uint64_t h = (index + seed) * 0xc4ceb9fe1a85ec53ull; + h ^= h >> 33; + h *= 0xff51afd7ed558ccdull; + h ^= h >> 33; + h *= 0xc4ceb9fe1a85ec53ull; + h ^= h >> 33; + return h; +} + +template <typename BuilderType, typename ValueFunc> +Result<std::shared_ptr<Array>> GenerateArray(const std::shared_ptr<DataType>& type, + bool nullable, int64_t length, uint64_t seed, + ValueFunc value_func) { + BuilderType builder(type, default_memory_pool()); + + if (nullable) { + for (int64_t i = 0; i < length; ++i) { + uint64_t val = hash(seed, i); + if (val % 10 == 0) { + RETURN_NOT_OK(builder.AppendNull()); + } else { + RETURN_NOT_OK(builder.Append(value_func(val))); + } + } + } else { + for (int64_t i = 0; i < length; ++i) { + uint64_t val = hash(seed, i); + RETURN_NOT_OK(builder.Append(value_func(val))); + } + } + + std::shared_ptr<Array> array; + RETURN_NOT_OK(builder.Finish(&array)); + RETURN_NOT_OK(array->ValidateFull()); + return array; +} + +#define GENERATE_CASE(TYPE_ID, BUILDER_TYPE, VALUE_EXPR) \ + case ::arrow::Type::TYPE_ID: { \ + auto value_func = [](uint64_t val) { return VALUE_EXPR; }; \ + return GenerateArray<BUILDER_TYPE>(type, nullable, length, seed, value_func); \ + } + +Result<std::shared_ptr<Array>> GenerateArray(const std::shared_ptr<Field>& field, + int64_t length, int64_t seed) { + const std::shared_ptr<DataType>& type = field->type(); + bool nullable = field->nullable(); + + switch (type->id()) { + GENERATE_CASE(BOOL, ::arrow::BooleanBuilder, (val % 2 == 0)) + + // Numeric types. + GENERATE_CASE(INT8, ::arrow::Int8Builder, static_cast<int8_t>(val)) + GENERATE_CASE(INT16, ::arrow::Int16Builder, static_cast<int16_t>(val)) + GENERATE_CASE(INT32, ::arrow::Int32Builder, static_cast<int32_t>(val)) + GENERATE_CASE(INT64, ::arrow::Int64Builder, static_cast<int64_t>(val)) + GENERATE_CASE(UINT8, ::arrow::UInt8Builder, static_cast<uint8_t>(val)) + GENERATE_CASE(UINT16, ::arrow::UInt16Builder, static_cast<uint16_t>(val)) + GENERATE_CASE(UINT32, ::arrow::UInt32Builder, static_cast<uint32_t>(val)) + GENERATE_CASE(UINT64, ::arrow::UInt64Builder, static_cast<uint64_t>(val)) + GENERATE_CASE(HALF_FLOAT, ::arrow::HalfFloatBuilder, + static_cast<uint16_t>(val % 1000)) + GENERATE_CASE(FLOAT, ::arrow::FloatBuilder, static_cast<float>(val % 1000) / 1000.0f) + GENERATE_CASE(DOUBLE, ::arrow::DoubleBuilder, + static_cast<double>(val % 100000) / 1000.0) + case ::arrow::Type::DECIMAL128: { + const auto& decimal_type = checked_cast<const ::arrow::Decimal128Type&>(*type); + // Limit the value to fit within the specified precision + int32_t max_exponent = decimal_type.precision() - decimal_type.scale(); + int64_t max_value = static_cast<int64_t>(std::pow(10, max_exponent) - 1); + auto value_func = [&](uint64_t val) { + return ::arrow::Decimal128(val % max_value); + }; + return GenerateArray<::arrow::Decimal128Builder>(type, nullable, length, seed, + value_func); + } + case ::arrow::Type::DECIMAL256: { + const auto& decimal_type = checked_cast<const ::arrow::Decimal256Type&>(*type); + // Limit the value to fit within the specified precision, capped at 9 to avoid + // int64_t overflow + int32_t max_exponent = std::min(9, decimal_type.precision() - decimal_type.scale()); + int64_t max_value = static_cast<int64_t>(std::pow(10, max_exponent) - 1); + auto value_func = [&](uint64_t val) { + return ::arrow::Decimal256(val % max_value); + }; + return GenerateArray<::arrow::Decimal256Builder>(type, nullable, length, seed, + value_func); + } + + // Temporal types + GENERATE_CASE(DATE32, ::arrow::Date32Builder, static_cast<int32_t>(val)) + GENERATE_CASE(TIME32, ::arrow::Time32Builder, + std::abs(static_cast<int32_t>(val) % 86400000)) + GENERATE_CASE(TIME64, ::arrow::Time64Builder, + std::abs(static_cast<int64_t>(val) % 86400000000)) + GENERATE_CASE(TIMESTAMP, ::arrow::TimestampBuilder, static_cast<int64_t>(val)) + GENERATE_CASE(DURATION, ::arrow::DurationBuilder, static_cast<int64_t>(val)) + + // Binary and string types. + GENERATE_CASE(STRING, ::arrow::StringBuilder, + std::string("str_") + std::to_string(val)) + GENERATE_CASE(LARGE_STRING, ::arrow::LargeStringBuilder, + std::string("str_") + std::to_string(val)) + GENERATE_CASE(BINARY, ::arrow::BinaryBuilder, + std::string("bin_") + std::to_string(val)) + GENERATE_CASE(LARGE_BINARY, ::arrow::LargeBinaryBuilder, + std::string("bin_") + std::to_string(val)) + case ::arrow::Type::FIXED_SIZE_BINARY: { + auto size = + checked_cast<const ::arrow::FixedSizeBinaryType*>(type.get())->byte_width(); + auto value_func = [size](uint64_t val) { + return std::string("bin_") + std::to_string(val).substr(0, size - 4); + }; + return GenerateArray<::arrow::FixedSizeBinaryBuilder>(type, nullable, length, seed, + value_func); + } + + case ::arrow::Type::STRUCT: { + auto struct_type = checked_cast<const ::arrow::StructType*>(type.get()); + std::vector<std::shared_ptr<Array>> child_arrays; + for (auto i = 0; i < struct_type->num_fields(); i++) { + ARROW_ASSIGN_OR_RAISE(auto child_array, GenerateArray(struct_type->field(i), + length, seed + i * 10)); + child_arrays.push_back(child_array); + } + auto struct_array = + std::make_shared<::arrow::StructArray>(type, length, child_arrays); + return struct_array; + } + + case ::arrow::Type::LIST: { + // Repeat the same pattern in the list array: + // null, empty list, list of 1 element, list of 3 elements + if (length % 4 != 0) { + return Status::Invalid( + "Length must be divisible by 4 when generating list arrays, but got: ", + length); + } + auto values_array_length = length * 4; + auto list_type = checked_cast<const ::arrow::ListType*>(type.get()); + auto value_field = ::arrow::field("item", list_type->value_type()); + ARROW_ASSIGN_OR_RAISE(auto values_array, + GenerateArray(value_field, values_array_length, seed)); + auto offset_builder = ::arrow::Int32Builder(); + auto bitmap_builder = ::arrow::TypedBufferBuilder<bool>(); + + RETURN_NOT_OK(offset_builder.Reserve(length + 1)); + RETURN_NOT_OK(bitmap_builder.Reserve(length)); + + int32_t num_nulls = 0; + RETURN_NOT_OK(offset_builder.Append(0)); + for (auto offset = 0; offset < length; offset += 4) { + if (nullable) { + // add a null + RETURN_NOT_OK(bitmap_builder.Append(false)); + RETURN_NOT_OK(offset_builder.Append(offset)); + num_nulls += 1; + } else { + // add an empty list + RETURN_NOT_OK(bitmap_builder.Append(true)); + RETURN_NOT_OK(offset_builder.Append(offset)); + } + // add an empty list + RETURN_NOT_OK(bitmap_builder.Append(true)); + RETURN_NOT_OK(offset_builder.Append(offset)); + // add a list of 1 element + RETURN_NOT_OK(bitmap_builder.Append(true)); + RETURN_NOT_OK(offset_builder.Append(offset + 1)); + // add a list of 3 elements + RETURN_NOT_OK(bitmap_builder.Append(true)); + RETURN_NOT_OK(offset_builder.Append(offset + 4)); + } + + std::shared_ptr<Array> offsets_array; + RETURN_NOT_OK(offset_builder.Finish(&offsets_array)); + std::shared_ptr<Buffer> bitmap_buffer; + RETURN_NOT_OK(bitmap_builder.Finish(&bitmap_buffer)); + ARROW_ASSIGN_OR_RAISE( + auto list_array, ::arrow::ListArray::FromArrays( + type, *offsets_array, *values_array, default_memory_pool(), + bitmap_buffer, num_nulls)); + RETURN_NOT_OK(list_array->ValidateFull()); + return list_array; + } + + case ::arrow::Type::EXTENSION: { + auto extension_type = checked_cast<const ::arrow::ExtensionType*>(type.get()); + auto storage_type = extension_type->storage_type(); + auto storage_field = ::arrow::field("storage", storage_type, true); + ARROW_ASSIGN_OR_RAISE(auto storage_array, + GenerateArray(storage_field, length, seed)); + return ::arrow::ExtensionType::WrapArray(type, storage_array); + } + + default: + return ::arrow::Status::NotImplemented("Unsupported data type " + type->ToString()); + } +} + +TEST(TestGenerateArray, Integer) { + auto field = ::arrow::field("a", ::arrow::int32()); + ASSERT_OK_AND_ASSIGN(auto array, GenerateArray(field, /*length=*/10, /*seed=*/0)); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), 10); + ASSERT_TRUE(array->type()->Equals(::arrow::int32())); + ASSERT_EQ(array->null_count(), 1); +} + +TEST(TestGenerateArray, ListOfInteger) { + auto field = ::arrow::field("a", ::arrow::list(::arrow::int32())); + auto length = 12; + ASSERT_OK_AND_ASSIGN(auto array, GenerateArray(field, length, /*seed=*/0)); + ASSERT_OK(array->ValidateFull()); + ASSERT_EQ(array->length(), length); + + for (size_t i = 0; i < 12; i += 4) { + // Assert the first element is null + ASSERT_TRUE(array->IsNull(i)); + + // Assert the second element is an empty list + ASSERT_TRUE(array->IsValid(i + 1)); + auto list_array = std::static_pointer_cast<::arrow::ListArray>(array); + ASSERT_EQ(list_array->value_length(i + 1), 0); + + // Assert the third element has length 1 + ASSERT_TRUE(array->IsValid(i + 2)); + ASSERT_EQ(list_array->value_length(i + 2), 1); + + // Assert the fourth element has length 3 + ASSERT_TRUE(array->IsValid(i + 3)); + ASSERT_EQ(list_array->value_length(i + 3), 3); + } + + ASSERT_NOT_OK(GenerateArray(field, 3, /*seed=*/0)); + ASSERT_OK(GenerateArray(field, 8, /*seed=*/0)); +} + +Result<std::shared_ptr<Table>> GenerateTable( + const std::shared_ptr<::arrow::Schema>& schema, int64_t size, int64_t seed = 0) { + std::vector<std::shared_ptr<Array>> arrays; + for (const auto& field : schema->fields()) { + ARROW_ASSIGN_OR_RAISE(auto array, GenerateArray(field, size, ++seed)); + arrays.push_back(array); + } + return Table::Make(schema, arrays, size); +} + +Result<std::shared_ptr<Table>> ConcatAndCombine( + const std::vector<std::shared_ptr<Table>>& parts) { + // Concatenate and combine chunks so the table doesn't carry information about + // the modification points + ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(parts)); + return table->CombineChunks(); +} + +Result<std::shared_ptr<Table>> ReadTableFromBuffer(const std::shared_ptr<Buffer>& data) { + std::shared_ptr<Table> result; + FileReaderBuilder builder; + std::unique_ptr<FileReader> reader; + auto props = default_arrow_reader_properties(); + + RETURN_NOT_OK(builder.Open(std::make_shared<BufferReader>(data))); + RETURN_NOT_OK(builder.memory_pool(::arrow::default_memory_pool()) + ->properties(props) + ->Build(&reader)); + RETURN_NOT_OK(reader->ReadTable(&result)); + return result; +} + +Result<std::shared_ptr<Buffer>> WriteTableToBuffer( + const std::shared_ptr<Table>& table, int64_t min_chunk_size, int64_t max_chunk_size, + int64_t row_group_length = 1024 * 1024, bool enable_dictionary = false, + ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1) { + auto sink = CreateOutputStream(); + + auto builder = WriterProperties::Builder(); + builder.enable_content_defined_chunking()->content_defined_chunking_options( + {min_chunk_size, max_chunk_size, /*norm_level=*/0}); + builder.data_page_version(data_page_version); + if (enable_dictionary) { + builder.enable_dictionary(); + } else { + builder.disable_dictionary(); + } + auto write_props = builder.build(); + auto arrow_props = ArrowWriterProperties::Builder().store_schema()->build(); + RETURN_NOT_OK(WriteTable(*table, default_memory_pool(), sink, row_group_length, + write_props, arrow_props)); + ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish()); + + // check whether the schema has extension types, if not we can easily ensure that + // the parquet seralization is roundtripable with CDC enabled + bool validate_roundtrip = true; + for (const auto& field : table->schema()->fields()) { + if (field->type()->id() == ::arrow::Type::EXTENSION) { + validate_roundtrip = false; Review Comment: Why extension type does not support roundtrip validation? What if the arrow schema has been preserved in the parquet metadata? ########## cpp/src/parquet/chunker_internal_test.cc: ########## @@ -0,0 +1,1397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <algorithm> +#include <iostream> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/table.h" +#include "arrow/testing/extension_type.h" +#include "arrow/testing/generator.h" +#include "arrow/type_fwd.h" +#include "arrow/util/float16.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/chunker_internal.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" + +namespace parquet::internal { + +using ::arrow::Array; +using ::arrow::ChunkedArray; +using ::arrow::ConcatenateTables; +using ::arrow::DataType; +using ::arrow::default_memory_pool; +using ::arrow::Field; +using ::arrow::Result; +using ::arrow::Schema; +using ::arrow::Table; +using ::arrow::io::BufferReader; +using ::parquet::arrow::FileReader; +using ::parquet::arrow::FileReaderBuilder; +using ::parquet::arrow::MakeSimpleTable; +using ::parquet::arrow::NonNullArray; +using ::parquet::arrow::WriteTable; + +using ::testing::Bool; +using ::testing::Combine; +using ::testing::Values; + +// generate determinisic and platform-independent data +inline uint64_t hash(uint64_t seed, uint64_t index) { + uint64_t h = (index + seed) * 0xc4ceb9fe1a85ec53ull; + h ^= h >> 33; + h *= 0xff51afd7ed558ccdull; + h ^= h >> 33; + h *= 0xc4ceb9fe1a85ec53ull; + h ^= h >> 33; + return h; +} + +template <typename BuilderType, typename ValueFunc> +Result<std::shared_ptr<Array>> GenerateArray(const std::shared_ptr<DataType>& type, + bool nullable, int64_t length, uint64_t seed, + ValueFunc value_func) { + BuilderType builder(type, default_memory_pool()); + + if (nullable) { + for (int64_t i = 0; i < length; ++i) { + uint64_t val = hash(seed, i); + if (val % 10 == 0) { + RETURN_NOT_OK(builder.AppendNull()); + } else { + RETURN_NOT_OK(builder.Append(value_func(val))); + } + } + } else { + for (int64_t i = 0; i < length; ++i) { + uint64_t val = hash(seed, i); + RETURN_NOT_OK(builder.Append(value_func(val))); + } + } + + std::shared_ptr<Array> array; + RETURN_NOT_OK(builder.Finish(&array)); + RETURN_NOT_OK(array->ValidateFull()); + return array; +} + +#define GENERATE_CASE(TYPE_ID, BUILDER_TYPE, VALUE_EXPR) \ + case ::arrow::Type::TYPE_ID: { \ + auto value_func = [](uint64_t val) { return VALUE_EXPR; }; \ + return GenerateArray<BUILDER_TYPE>(type, nullable, length, seed, value_func); \ + } + +Result<std::shared_ptr<Array>> GenerateArray(const std::shared_ptr<Field>& field, + int64_t length, int64_t seed) { + const std::shared_ptr<DataType>& type = field->type(); + bool nullable = field->nullable(); + + switch (type->id()) { + GENERATE_CASE(BOOL, ::arrow::BooleanBuilder, (val % 2 == 0)) + + // Numeric types. + GENERATE_CASE(INT8, ::arrow::Int8Builder, static_cast<int8_t>(val)) + GENERATE_CASE(INT16, ::arrow::Int16Builder, static_cast<int16_t>(val)) + GENERATE_CASE(INT32, ::arrow::Int32Builder, static_cast<int32_t>(val)) + GENERATE_CASE(INT64, ::arrow::Int64Builder, static_cast<int64_t>(val)) + GENERATE_CASE(UINT8, ::arrow::UInt8Builder, static_cast<uint8_t>(val)) + GENERATE_CASE(UINT16, ::arrow::UInt16Builder, static_cast<uint16_t>(val)) + GENERATE_CASE(UINT32, ::arrow::UInt32Builder, static_cast<uint32_t>(val)) + GENERATE_CASE(UINT64, ::arrow::UInt64Builder, static_cast<uint64_t>(val)) + GENERATE_CASE(HALF_FLOAT, ::arrow::HalfFloatBuilder, + static_cast<uint16_t>(val % 1000)) + GENERATE_CASE(FLOAT, ::arrow::FloatBuilder, static_cast<float>(val % 1000) / 1000.0f) + GENERATE_CASE(DOUBLE, ::arrow::DoubleBuilder, + static_cast<double>(val % 100000) / 1000.0) + case ::arrow::Type::DECIMAL128: { + const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type); + // Limit the value to fit within the specified precision + int32_t max_exponent = decimal_type.precision() - decimal_type.scale(); + int64_t max_value = static_cast<int64_t>(std::pow(10, max_exponent) - 1); + auto value_func = [&](uint64_t val) { + return ::arrow::Decimal128(val % max_value); + }; + return GenerateArray<::arrow::Decimal128Builder>(type, nullable, length, seed, + value_func); + } + case ::arrow::Type::DECIMAL256: { + const auto& decimal_type = static_cast<const ::arrow::Decimal256Type&>(*type); + // Limit the value to fit within the specified precision, capped at 9 to avoid + // int64_t overflow + int32_t max_exponent = std::min(9, decimal_type.precision() - decimal_type.scale()); + int64_t max_value = static_cast<int64_t>(std::pow(10, max_exponent) - 1); + auto value_func = [&](uint64_t val) { + return ::arrow::Decimal256(val % max_value); + }; + return GenerateArray<::arrow::Decimal256Builder>(type, nullable, length, seed, + value_func); + } + + // Temporal types + GENERATE_CASE(DATE32, ::arrow::Date32Builder, static_cast<int32_t>(val)) + GENERATE_CASE(TIME32, ::arrow::Time32Builder, + std::abs(static_cast<int32_t>(val) % 86400000)) + GENERATE_CASE(TIME64, ::arrow::Time64Builder, + std::abs(static_cast<int64_t>(val) % 86400000000)) + GENERATE_CASE(TIMESTAMP, ::arrow::TimestampBuilder, static_cast<int64_t>(val)) + GENERATE_CASE(DURATION, ::arrow::DurationBuilder, static_cast<int64_t>(val)) + + // Binary and string types. + GENERATE_CASE(STRING, ::arrow::StringBuilder, + std::string("str_") + std::to_string(val)) + GENERATE_CASE(LARGE_STRING, ::arrow::LargeStringBuilder, + std::string("str_") + std::to_string(val)) + GENERATE_CASE(BINARY, ::arrow::BinaryBuilder, + std::string("bin_") + std::to_string(val)) + GENERATE_CASE(LARGE_BINARY, ::arrow::LargeBinaryBuilder, + std::string("bin_") + std::to_string(val)) + case ::arrow::Type::FIXED_SIZE_BINARY: { + auto size = static_cast<::arrow::FixedSizeBinaryType*>(type.get())->byte_width(); + auto value_func = [size](uint64_t val) { + return std::string("bin_") + std::to_string(val).substr(0, size - 4); + }; + return GenerateArray<::arrow::FixedSizeBinaryBuilder>(type, nullable, length, seed, + value_func); + } + + case ::arrow::Type::STRUCT: { + auto struct_type = static_cast<::arrow::StructType*>(type.get()); + std::vector<std::shared_ptr<Array>> child_arrays; + for (auto i = 0; i < struct_type->num_fields(); i++) { + ARROW_ASSIGN_OR_RAISE(auto child_array, + GenerateArray(struct_type->field(i), length, + seed + static_cast<uint64_t>(i + 300))); + child_arrays.push_back(child_array); + } + auto struct_array = + std::make_shared<::arrow::StructArray>(type, length, child_arrays); + return struct_array; + } + + case ::arrow::Type::LIST: { + auto list_type = static_cast<::arrow::ListType*>(type.get()); + auto value_field = ::arrow::field("item", list_type->value_type()); + ARROW_ASSIGN_OR_RAISE(auto values_array, GenerateArray(value_field, length, seed)); + auto offset_builder = ::arrow::Int32Builder(); + auto bitmap_builder = ::arrow::TypedBufferBuilder<bool>(); + + int32_t num_nulls = 0; + int32_t num_elements = 0; + uint8_t element_size = 0; + int32_t current_offset = 0; + RETURN_NOT_OK(offset_builder.Append(current_offset)); + while (current_offset < length) { + num_elements++; + auto is_valid = !(nullable && (num_elements % 10 == 0)); + if (is_valid) { + RETURN_NOT_OK(bitmap_builder.Append(true)); + current_offset += element_size; + if (current_offset > length) { + RETURN_NOT_OK(offset_builder.Append(static_cast<int32_t>(length))); + break; + } else { + RETURN_NOT_OK(offset_builder.Append(current_offset)); + } + } else { + RETURN_NOT_OK(offset_builder.Append(static_cast<int32_t>(current_offset))); + RETURN_NOT_OK(bitmap_builder.Append(false)); + num_nulls++; + } + + if (element_size > 4) { + element_size = 0; + } else { + element_size++; + } + } + + std::shared_ptr<Array> offsets_array; + RETURN_NOT_OK(offset_builder.Finish(&offsets_array)); + std::shared_ptr<Buffer> bitmap_buffer; + RETURN_NOT_OK(bitmap_builder.Finish(&bitmap_buffer)); + ARROW_ASSIGN_OR_RAISE( + auto list_array, ::arrow::ListArray::FromArrays( + type, *offsets_array, *values_array, default_memory_pool(), + bitmap_buffer, num_nulls)); + RETURN_NOT_OK(list_array->ValidateFull()); + return list_array; + } + + case ::arrow::Type::EXTENSION: { + auto extension_type = dynamic_cast<::arrow::ExtensionType*>(type.get()); + auto storage_type = extension_type->storage_type(); + auto storage_field = ::arrow::field("storage", storage_type, true); + ARROW_ASSIGN_OR_RAISE(auto storage_array, + GenerateArray(storage_field, length, seed)); + return ::arrow::ExtensionType::WrapArray(type, storage_array); + } + + default: + return ::arrow::Status::NotImplemented("Unsupported data type " + type->ToString()); + } +} + +Result<std::shared_ptr<Table>> GenerateTable( + const std::shared_ptr<::arrow::Schema>& schema, int64_t size, int64_t seed = 0) { + std::vector<std::shared_ptr<Array>> arrays; + for (const auto& field : schema->fields()) { + ARROW_ASSIGN_OR_RAISE(auto array, GenerateArray(field, size, seed)); + arrays.push_back(array); + } + return Table::Make(schema, arrays, size); +} + +Result<std::shared_ptr<Table>> ConcatAndCombine( + const std::vector<std::shared_ptr<Table>>& parts) { + // Concatenate and combine chunks so the table doesn't carry information about + // the modification points + ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(parts)); + return table->CombineChunks(); +} + +Result<std::shared_ptr<Buffer>> WriteTableToBuffer( + const std::shared_ptr<Table>& table, int64_t min_chunk_size, int64_t max_chunk_size, + bool enable_dictionary = false, int64_t row_group_size = 1024 * 1024, + ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1) { + auto sink = CreateOutputStream(); + + auto builder = WriterProperties::Builder(); + builder.enable_content_defined_chunking()->content_defined_chunking_options( + min_chunk_size, max_chunk_size, /*norm_factor=*/0); + builder.data_page_version(data_page_version); + if (enable_dictionary) { + builder.enable_dictionary(); + } else { + builder.disable_dictionary(); + } + auto write_props = builder.build(); + auto arrow_props = ArrowWriterProperties::Builder().store_schema()->build(); + RETURN_NOT_OK(WriteTable(*table, default_memory_pool(), sink, row_group_size, + write_props, arrow_props)); + return sink->Finish(); +} + +Result<std::shared_ptr<Table>> ReadTableFromBuffer(const std::shared_ptr<Buffer>& data) { + std::shared_ptr<Table> result; + FileReaderBuilder builder; + std::unique_ptr<FileReader> reader; + RETURN_NOT_OK(builder.Open(std::make_shared<BufferReader>(data))); + RETURN_NOT_OK(builder.memory_pool(::arrow::default_memory_pool()) + ->properties(default_arrow_reader_properties()) + ->Build(&reader)); + RETURN_NOT_OK(reader->ReadTable(&result)); + return result; +} + +// Type to represent a list of chunks where each element is the size of the chunk. +using ChunkList = std::vector<int64_t>; + +// Type to represent the sizes and lengths of the data pages in a column. + +struct RowGroupInfo { + ChunkList page_lengths; + ChunkList page_sizes; + bool has_dictionary_page = false; +}; + +using ParquetInfo = std::vector<RowGroupInfo>; + +ParquetInfo GetColumnParquetInfo(const std::shared_ptr<Buffer>& data, + int column_index = 0) { + // Read the parquet data out of the buffer and get the sizes and lengths of the + // data pages in given column. We assert on the sizes and lengths of the pages + // to ensure that the chunking is done correctly. + ParquetInfo result; + + auto buffer_reader = std::make_shared<BufferReader>(data); + auto parquet_reader = ParquetFileReader::Open(std::move(buffer_reader)); + + auto metadata = parquet_reader->metadata(); + for (int rg = 0; rg < metadata->num_row_groups(); rg++) { + auto page_reader = parquet_reader->RowGroup(rg)->GetColumnPageReader(column_index); + RowGroupInfo rg_info; + while (auto page = page_reader->NextPage()) { + if (page->type() == PageType::DATA_PAGE || page->type() == PageType::DATA_PAGE_V2) { + auto data_page = static_cast<DataPage*>(page.get()); + rg_info.page_sizes.push_back(data_page->size()); + rg_info.page_lengths.push_back(data_page->num_values()); + } else if (page->type() == PageType::DICTIONARY_PAGE) { + rg_info.has_dictionary_page = true; + } + } + result.push_back(rg_info); + } + + return result; +} + +Result<ParquetInfo> WriteAndGetParquetInfo( + const std::shared_ptr<Table>& table, uint64_t min_chunk_size, uint64_t max_chunk_size, + bool enable_dictionary = false, + ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1, + int64_t row_group_size = 1024 * 1024, + + int column_index = 0) { + // Write the table to a buffer and read it back to get the page sizes + ARROW_ASSIGN_OR_RAISE( + auto buffer, + WriteTableToBuffer(table, min_chunk_size, max_chunk_size, enable_dictionary, + row_group_size, data_page_version)); + ARROW_ASSIGN_OR_RAISE(auto readback, ReadTableFromBuffer(buffer)); + + RETURN_NOT_OK(readback->ValidateFull()); + if (readback->schema()->Equals(*table->schema())) { + ARROW_RETURN_IF(!readback->Equals(*table), + Status::Invalid("Readback table not equal to original")); + } + return GetColumnParquetInfo(buffer, column_index); +} + +// A git-hunk like side-by-side data structure to represent the differences between two +// vectors of uint64_t values. +using ChunkDiff = std::pair<ChunkList, ChunkList>; + +/** + * Finds the differences between two sequences of chunk lengths or sizes. + * Uses a longest common subsequence algorithm to identify matching elements + * and extract the differences between the sequences. + * + * @param first The first sequence of chunk values + * @param second The second sequence of chunk values + * @return A vector of differences, where each difference is a pair of + * subsequences (one from each input) that differ + */ +std::vector<ChunkDiff> FindDifferences(const ChunkList& first, const ChunkList& second) { + // Compute the longest common subsequence using dynamic programming + size_t n = first.size(), m = second.size(); + std::vector<std::vector<size_t>> dp(n + 1, std::vector<size_t>(m + 1, 0)); + + // Fill the dynamic programming table + for (size_t i = 0; i < n; i++) { + for (size_t j = 0; j < m; j++) { + if (first[i] == second[j]) { + // If current elements match, extend the LCS + dp[i + 1][j + 1] = dp[i][j] + 1; + } else { + // If current elements don't match, take the best option + dp[i + 1][j + 1] = std::max(dp[i + 1][j], dp[i][j + 1]); + } + } + } + + // Backtrack through the dynamic programming table to reconstruct the common + // parts and their positions in the original sequences + std::vector<std::pair<size_t, size_t>> common; + for (size_t i = n, j = m; i > 0 && j > 0;) { + if (first[i - 1] == second[j - 1]) { + // Found a common element, add to common list + common.emplace_back(i - 1, j - 1); + i--, j--; + } else if (dp[i - 1][j] >= dp[i][j - 1]) { + // Move in the direction of the larger LCS value + i--; + } else { + j--; + } + } + // Reverse to get indices in ascending order + std::reverse(common.begin(), common.end()); + + // Build the differences by finding sequences between common elements + std::vector<ChunkDiff> result; + size_t last_i = 0, last_j = 0; + for (auto& c : common) { + auto ci = c.first; + auto cj = c.second; + // If there's a gap between the last common element and this one, + // record the difference + if (ci > last_i || cj > last_j) { + result.push_back({{first.begin() + last_i, first.begin() + ci}, + {second.begin() + last_j, second.begin() + cj}}); + } + // Move past this common element + last_i = ci + 1; + last_j = cj + 1; + } + + // Handle any remaining elements after the last common element + if (last_i < n || last_j < m) { + result.push_back( + {{first.begin() + last_i, first.end()}, {second.begin() + last_j, second.end()}}); + } + + // Post-process: merge adjacent diffs to avoid splitting single changes into multiple + // parts + std::vector<ChunkDiff> merged; + for (auto& diff : result) { + if (!merged.empty()) { + auto& prev = merged.back(); + // Check if we can merge with the previous diff + bool can_merge_a = prev.first.empty() && !prev.second.empty() && + !diff.first.empty() && diff.second.empty(); + bool can_merge_b = prev.second.empty() && !prev.first.empty() && + !diff.second.empty() && diff.first.empty(); + + if (can_merge_a) { + // Combine into one diff: keep prev's second, use diff's first + prev.first = std::move(diff.first); + continue; + } else if (can_merge_b) { + // Combine into one diff: keep prev's first, use diff's second + prev.second = std::move(diff.second); + continue; + } + } + // If we can't merge, add this diff to the result + merged.push_back(std::move(diff)); + } + return merged; +} + +void PrintDifferences(const ChunkList& original, const ChunkList& modified, + std::vector<ChunkDiff>& diffs) { + // Utility function to print the original and modified sequences, and the diffs + // between them. Used in case of failing assertions to display the differences. + std::cout << "Original: "; + for (const auto& val : original) { + std::cout << val << " "; + } + std::cout << std::endl; + + std::cout << "Modified: "; + for (const auto& val : modified) { + std::cout << val << " "; + } + std::cout << std::endl; + + for (const auto& diff : diffs) { + std::cout << "First: "; + for (const auto& val : diff.first) { + std::cout << val << " "; + } + std::cout << std::endl; + + std::cout << "Second: "; + for (const auto& val : diff.second) { + std::cout << val << " "; + } + std::cout << std::endl; + } +} + +TEST(TestFindDifferences, Basic) { + ChunkList first = {1, 2, 3, 4, 5}; + ChunkList second = {1, 7, 8, 4, 5}; + + auto diffs = FindDifferences(first, second); + + ASSERT_EQ(diffs.size(), 1); + ASSERT_EQ(diffs[0].first, ChunkList({2, 3})); + ASSERT_EQ(diffs[0].second, ChunkList({7, 8})); +} + +TEST(TestFindDifferences, MultipleDifferences) { + ChunkList first = {1, 2, 3, 4, 5, 6, 7}; + ChunkList second = {1, 8, 9, 4, 10, 6, 11}; + auto diffs = FindDifferences(first, second); + + ASSERT_EQ(diffs.size(), 3); + + ASSERT_EQ(diffs[0].first, ChunkList({2, 3})); + ASSERT_EQ(diffs[0].second, ChunkList({8, 9})); + + ASSERT_EQ(diffs[1].first, ChunkList({5})); + ASSERT_EQ(diffs[1].second, ChunkList({10})); + + ASSERT_EQ(diffs[2].first, ChunkList({7})); + ASSERT_EQ(diffs[2].second, ChunkList({11})); +} + +TEST(TestFindDifferences, DifferentLengths) { + ChunkList first = {1, 2, 3}; + ChunkList second = {1, 2, 3, 4, 5}; + auto diffs = FindDifferences(first, second); + + ASSERT_EQ(diffs.size(), 1); + ASSERT_TRUE(diffs[0].first.empty()); + ASSERT_EQ(diffs[0].second, ChunkList({4, 5})); +} + +TEST(TestFindDifferences, EmptyArrays) { + ChunkList first = {}; + ChunkList second = {}; + auto diffs = FindDifferences(first, second); + ASSERT_TRUE(diffs.empty()); +} + +TEST(TestFindDifferences, LongSequenceWithSingleDifference) { + ChunkList first = { + 1994, 2193, 2700, 1913, 2052, + }; + ChunkList second = {2048, 43, 2080, 2700, 1913, 2052}; + auto diffs = FindDifferences(first, second); + + ASSERT_EQ(diffs.size(), 1); + ASSERT_EQ(diffs[0].first, ChunkList({1994, 2193})); + ASSERT_EQ(diffs[0].second, ChunkList({2048, 43, 2080})); + + // Verify that elements after the difference are identical + for (size_t i = 3; i < second.size(); i++) { + ASSERT_EQ(first[i - 1], second[i]); + } +} + +TEST(TestFindDifferences, LongSequenceWithMiddleChanges) { + ChunkList first = {2169, 1976, 2180, 2147, 1934, 1772, + 1914, 2075, 2154, 1940, 1934, 1970}; + ChunkList second = {2169, 1976, 2180, 2147, 2265, 1804, + 1717, 1925, 2122, 1940, 1934, 1970}; + auto diffs = FindDifferences(first, second); + + ASSERT_EQ(diffs.size(), 1); + ASSERT_EQ(diffs[0].first, ChunkList({1934, 1772, 1914, 2075, 2154})); + ASSERT_EQ(diffs[0].second, ChunkList({2265, 1804, 1717, 1925, 2122})); + + // Verify elements before and after the difference are identical + for (size_t i = 0; i < 4; i++) { + ASSERT_EQ(first[i], second[i]); + } + for (size_t i = 9; i < first.size(); i++) { + ASSERT_EQ(first[i], second[i]); + } +} + +TEST(TestFindDifferences, AdditionalCase) { + ChunkList original = {445, 312, 393, 401, 410, 138, 558, 457}; + ChunkList modified = {445, 312, 393, 393, 410, 138, 558, 457}; + + auto diffs = FindDifferences(original, modified); + ASSERT_EQ(diffs.size(), 1); + + ASSERT_EQ(diffs[0].first, ChunkList({401})); + ASSERT_EQ(diffs[0].second, ChunkList({393})); + + // Verify elements before and after the difference are identical + for (size_t i = 0; i < 3; i++) { + ASSERT_EQ(original[i], modified[i]); + } + for (size_t i = 4; i < original.size(); i++) { + ASSERT_EQ(original[i], modified[i]); + } +} + +void AssertPageLengthDifferences(const RowGroupInfo& original, + const RowGroupInfo& modified, + int8_t exact_number_of_equal_diffs, + int8_t exact_number_of_larger_diffs, + int8_t exact_number_of_smaller_diffs, + int64_t edit_length = 0) { + // Asserts that the differences between the original and modified page lengths + // are as expected. A longest common subsequence diff is calculated on the original + // and modified sequences of page lengths. The exact_number_of_equal_diffs, + // exact_number_of_larger_diffs, and exact_number_of_smaller_diffs parameters specify + // the expected number of differences with equal, larger, and smaller sums of the page + // lengths respectively. The edit_length parameter is used to verify that the page + // lenght differences are exactly equal to the edit_length. + auto diffs = FindDifferences(original.page_lengths, modified.page_lengths); + size_t expected_number_of_diffs = exact_number_of_equal_diffs + + exact_number_of_larger_diffs + + exact_number_of_smaller_diffs; + if (diffs.size() != expected_number_of_diffs) { + PrintDifferences(original.page_lengths, modified.page_lengths, diffs); + } + if (diffs.size() == 0) { + // no differences found, the arrays are equal + ASSERT_TRUE(original.page_lengths == modified.page_lengths); + } + ASSERT_EQ(diffs.size(), expected_number_of_diffs); + + uint8_t equal_diffs = 0; + int8_t larger_diffs = 0; + int8_t smaller_diffs = 0; + for (const auto& diff : diffs) { + uint64_t original_sum = 0, modified_sum = 0; + for (const auto& val : diff.first) original_sum += val; + for (const auto& val : diff.second) modified_sum += val; + + if (original_sum == modified_sum) { + equal_diffs++; + } else if (original_sum < modified_sum) { + larger_diffs++; + ASSERT_EQ(original_sum + edit_length, modified_sum); + } else if (original_sum > modified_sum) { + smaller_diffs++; + ASSERT_EQ(original_sum, modified_sum + edit_length); + } + ASSERT_LE(diff.first.size(), 2); + ASSERT_LE(diff.second.size(), 2); + } + + ASSERT_EQ(equal_diffs, exact_number_of_equal_diffs); + ASSERT_EQ(larger_diffs, exact_number_of_larger_diffs); + ASSERT_EQ(smaller_diffs, exact_number_of_smaller_diffs); +} + +void AssertPageLengthDifferences(const RowGroupInfo& original, + const RowGroupInfo& modified, + uint8_t max_number_of_equal_diffs) { + // A less restrictive version of the above assertion function mainly used to + // assert the update case. + auto diffs = FindDifferences(original.page_lengths, modified.page_lengths); + if (diffs.size() > max_number_of_equal_diffs) { + PrintDifferences(original.page_lengths, modified.page_lengths, diffs); + } + ASSERT_LE(diffs.size(), max_number_of_equal_diffs); + + for (const auto& diff : diffs) { + uint64_t left_sum = 0, right_sum = 0; + for (const auto& val : diff.first) left_sum += val; + for (const auto& val : diff.second) right_sum += val; + ASSERT_EQ(left_sum, right_sum); + ASSERT_LE(diff.first.size(), 2); + ASSERT_LE(diff.second.size(), 2); + } + + if (diffs.size() == 0) { + // no differences found, the arrays are equal + ASSERT_TRUE(original.page_lengths == modified.page_lengths); + } +} + +uint64_t ElementCount(int64_t size, int32_t byte_width, bool nullable) { + if (nullable) { + // in case of nullable types the def_levels are also fed through the chunker + // to identify changes in the null bitmap, this will increase the byte width + // and decrease the number of elements per chunk + byte_width += 2; + } + return size / byte_width; +} + +void AssertAllBetween(const ChunkList& chunks, int64_t min, int64_t max) { + // except the last chunk since it is not guaranteed to be within the range + for (size_t i = 0; i < chunks.size() - 1; i++) { + ASSERT_GE(chunks[i], min); + ASSERT_LE(chunks[i], max); + } + ASSERT_LE(chunks.back(), max); +} + +void AssertChunkSizes(const std::shared_ptr<::arrow::DataType>& dtype, + const RowGroupInfo& base_info, const RowGroupInfo& modified_info, + bool nullable, bool enable_dictionary, int64_t min_chunk_size, + int64_t max_chunk_size) { + if (dtype->id() != ::arrow::Type::BOOL) { + ASSERT_EQ(base_info.has_dictionary_page, enable_dictionary); + ASSERT_EQ(modified_info.has_dictionary_page, enable_dictionary); + } + if (::arrow::is_fixed_width(dtype->id())) { + // for nullable types we cannot calculate the exact number of elements because + // not all elements are fed through the chunker (null elements are skipped) + auto byte_width = (dtype->id() == ::arrow::Type::BOOL) ? 1 : dtype->byte_width(); + auto min_length = ElementCount(min_chunk_size, byte_width, nullable); + auto max_length = ElementCount(max_chunk_size, byte_width, nullable); + AssertAllBetween(base_info.page_lengths, min_length, max_length); + AssertAllBetween(modified_info.page_lengths, min_length, max_length); + } else if (::arrow::is_base_binary_like(dtype->id()) && !enable_dictionary) { + AssertAllBetween(base_info.page_sizes, min_chunk_size, max_chunk_size); + AssertAllBetween(modified_info.page_sizes, min_chunk_size, max_chunk_size); + } +} + +constexpr int64_t kMinChunkSize = 8 * 1024; +constexpr int64_t kMaxChunkSize = 32 * 1024; +constexpr int64_t kPartSize = 128 * 1024; +constexpr int64_t kEditSize = 128; + +struct CaseConfig { + // Arrow data type to generate the testing data for + std::shared_ptr<::arrow::DataType> dtype; + // Whether the data type is nullable + bool is_nullable; + // Approximate number of bytes per record to calculate the number of elements to + // generate + size_t bytes_per_record; + // Data page version to use + ParquetDataPageVersion data_page_version = ParquetDataPageVersion::V1; +}; + +// Define PrintTo for MyStruct +void PrintTo(const CaseConfig& param, std::ostream* os) { + *os << "{ " << param.dtype->ToString(); + if (param.is_nullable) { + *os << " nullable"; + } + *os << " }"; +} + +class TestCDC : public ::testing::Test { + public: + uint64_t GetMask(const ContentDefinedChunker& cdc) const { return cdc.GetMask(); } +}; + +TEST_F(TestCDC, RollingHashMaskCalculation) { + auto le = LevelInfo(); + auto min_size = 256 * 1024; + auto max_size = 1024 * 1024; + + auto cdc0 = ContentDefinedChunker(le, min_size, max_size, 0); + ASSERT_EQ(GetMask(cdc0), 0xFFFE000000000000); + + auto cdc1 = ContentDefinedChunker(le, min_size, max_size, 1); + ASSERT_EQ(GetMask(cdc1), 0xFFFC000000000000); + + auto cdc2 = ContentDefinedChunker(le, min_size, max_size, 2); + ASSERT_EQ(GetMask(cdc2), 0xFFF8000000000000); + + auto cdc3 = ContentDefinedChunker(le, min_size, max_size, 3); + ASSERT_EQ(GetMask(cdc3), 0xFFF0000000000000); + + auto cdc4 = ContentDefinedChunker(le, min_size, max_size, -1); + ASSERT_EQ(GetMask(cdc4), 0xFFFF000000000000); + + // this is the smallest possible mask always matching, by using 8 hashtables + // we are going to have a match every 8 bytes; this is an unrealistic case + // but checking for the correctness of the mask calculation + auto cdc5 = ContentDefinedChunker(le, 0, 16, 0); + ASSERT_EQ(GetMask(cdc5), 0x0000000000000000); + + auto cdc6 = ContentDefinedChunker(le, 0, 32, 1); + ASSERT_EQ(GetMask(cdc6), 0x0000000000000000); + + auto cdc7 = ContentDefinedChunker(le, 0, 16, -1); + ASSERT_EQ(GetMask(cdc7), 0x8000000000000000); + + // another unrealistic case, checking for the validation + auto cdc8 = ContentDefinedChunker(le, 128, 384, -60); + ASSERT_EQ(GetMask(cdc8), 0xFFFFFFFFFFFFFFFF); +} + +TEST_F(TestCDC, WriteSingleColumnParquetFile) { + // Define the schema with a single column "number" + auto schema = std::dynamic_pointer_cast<schema::GroupNode>(schema::GroupNode::Make( + "root", Repetition::REQUIRED, + {schema::PrimitiveNode::Make("number", Repetition::REQUIRED, Type::INT32)})); + + auto sink = CreateOutputStream(); + auto builder = WriterProperties::Builder(); + auto props = builder.enable_content_defined_chunking()->build(); + + auto writer = ParquetFileWriter::Open(sink, schema, props); + auto row_group_writer = writer->AppendRowGroup(); + + // Create a column writer for the "number" column + auto column_writer = row_group_writer->NextColumn(); + auto& int_column_writer = dynamic_cast<Int32Writer&>(*column_writer); + + std::vector<int32_t> numbers = {1, 2, 3, 4, 5}; + std::vector<uint8_t> valid_bits = {1, 0, 1, 0, 1}; + EXPECT_THROW( + int_column_writer.WriteBatch(numbers.size(), nullptr, nullptr, numbers.data()), + ParquetException); Review Comment: Yes we do. You can find `EXPECT_THROW_THAT` from the Parquet subdirectory. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +namespace { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_level parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_level Normalization level (default 0) +// @return The mask used to compare against the rolling hash +uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, int norm_level) { + if (min_chunk_size < 0) { + throw ParquetException("min_chunk_size must be non-negative"); + } + if (max_chunk_size <= min_chunk_size) { + throw ParquetException("max_chunk_size must be greater than min_chunk_size"); + } + + // calculate the average size of the chunks + int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2; + // since we are skipping the first `min_chunk_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_chunk_size` bytes; also divide by the number of gearhash tables to have a + // a more gaussian-like distribution + int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables; + + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the floor(log2(target_size)) + int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) - 1); + + // a user defined `norm_level` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_level we increase the probability of matching + // the mask, forcing the distribution closer to the average size; norm_level is 0 by + // default + int effective_bits = mask_bits - norm_level; + + if (effective_bits < 1 || effective_bits > 63) { + throw ParquetException( + "The number of bits in the CDC mask must be between 1 and 63, got " + + std::to_string(effective_bits)); + } else { + // create the mask by setting the top bits + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); + } +} + +} // namespace + +class ContentDefinedChunker::Impl { + public: + Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t max_chunk_size, + int norm_level) + : level_info_(level_info), + min_chunk_size_(min_chunk_size), + max_chunk_size_(max_chunk_size), + rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, norm_level)) {} + + uint64_t GetRollingHashMask() const { return rolling_hash_mask_; } + + void Roll(bool value) { + if (++chunk_size_ < min_chunk_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the Review Comment: The trade-off here is that all writers to the same CAS should apply same `min_chunk_size` config so we can safely ignore the heading bytes before `min_chunk_size`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org