wgtmac commented on code in PR #45360: URL: https://github.com/apache/arrow/pull/45360#discussion_r2081229168
########## cpp/src/parquet/properties.h: ########## @@ -245,6 +245,34 @@ class PARQUET_EXPORT ColumnProperties { bool page_index_enabled_; }; +// EXPERIMENTAL: Options for content-defined chunking. +struct PARQUET_EXPORT CdcOptions { + /// Minimum chunk size in bytes, default 256 KiB + /// The rolling hash will not be updated until this size is reached for each chunk. + /// Note that all data sent through the hash function is counted towards the chunk + /// size, including definition and repetition levels if present. + int64_t min_chunk_size; Review Comment: Should we set the default value to avoid unintentionally uninitialized value? ########## cpp/src/parquet/chunker_internal_codegen.py: ########## @@ -0,0 +1,124 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Produce the given number gearhash tables for rolling hash calculations. + +Each table consists of 256 64-bit integer values and by default 8 tables are +produced. The tables are written to a header file that can be included in the +C++ code. + +The generated numbers are deterministic "random" numbers created by MD5 hashing +a fixed seed and the table index. This ensures that the tables are the same +across different runs and platforms. The function of generating the numbers is +less important as long as they have sufficiently uniform distribution. + +Reference implementations: +- https://github.com/Borelset/destor/blob/master/src/chunking/fascdc_chunking.c +- https://github.com/nlfiedler/fastcdc-rs/blob/master/examples/table64.rs + +Usage: + python chunker_internal_codegen.py [ntables] + + ntables: Number of gearhash tables to generate (default 8), the + the C++ implementation expects 8 tables so this should not be + changed unless the C++ code is also updated. + + The generated header file is written to ./chunker_internal_generated.h +""" + +import hashlib +import pathlib +import sys +from io import StringIO + + +template = """\ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <cstdint> Review Comment: ```suggestion #pragma once #include <cstdint> ``` Leave a blank line in between? ########## cpp/src/parquet/CMakeLists.txt: ########## @@ -395,6 +396,8 @@ add_parquet_test(writer-test file_serialize_test.cc stream_writer_test.cc) +add_parquet_test(chunker-test SOURCES chunker_internal_test.cc) Review Comment: Why not reusing `writer-test` above? ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, Review Comment: Level sounds better since I also assumed that it will multiply some values. ########## cpp/src/parquet/properties.h: ########## @@ -245,6 +245,34 @@ class PARQUET_EXPORT ColumnProperties { bool page_index_enabled_; }; +// EXPERIMENTAL: Options for content-defined chunking. +struct PARQUET_EXPORT CdcOptions { + /// Minimum chunk size in bytes, default 256 KiB + /// The rolling hash will not be updated until this size is reached for each chunk. + /// Note that all data sent through the hash function is counted towards the chunk + /// size, including definition and repetition levels if present. + int64_t min_chunk_size; + /// Maximum chunk size in bytes, default is 1024 KiB + /// The chunker will create a new chunk whenever the chunk size exceeds this value. + /// Note that the parquet writer has a related `pagesize` property that controls + /// the maximum size of a parquet data page after encoding. While setting + /// `pagesize` to a smaller value than `max_chunk_size` doesn't affect the + /// chunking effectiveness, it results in more small parquet data pages. + int64_t max_chunk_size; + /// Number of bit adjustement to the gearhash mask in order to + /// center the chunk size around the average size more aggressively, default 0 + /// Increasing the normalization factor increases the probability of finding a chunk, + /// improving the deduplication ratio, but also increasing the number of small chunks + /// resulting in many small parquet data pages. The default value provides a good + /// balance between deduplication ratio and fragmentation. Use norm_factor=1 or + /// norm_factor=2 to reach a higher deduplication ratio at the expense of + /// fragmentation. Negative values can also be used to reduce the probability of + /// finding a chunk, resulting in larger chunks and fewer data pages. + int norm_factor = 0; Review Comment: Is there any bound to norm_factor? Should we document it? ########## cpp/src/parquet/properties.h: ########## @@ -275,10 +305,38 @@ class PARQUET_EXPORT WriterProperties { page_checksum_enabled_(properties.page_checksum_enabled()), size_statistics_level_(properties.size_statistics_level()), sorting_columns_(properties.sorting_columns()), - default_column_properties_(properties.default_column_properties()) {} + default_column_properties_(properties.default_column_properties()), + content_defined_chunking_enabled_( + properties.content_defined_chunking_enabled()), + content_defined_chunking_options_( + properties.content_defined_chunking_options()) {} virtual ~Builder() {} + /// \brief EXPERIMENTAL: Use content-defined page chunking for all columns. + /// + /// Optimize parquet files for content addressable storage (CAS) systems by writing + /// data pages according to content-defined chunk boundaries. This allows for more + /// efficient deduplication of data across files, hence more efficient network + /// transfers and storage. The chunking is based on a rolling hash algorithm that + /// identifies chunk boundaries based on the actual content of the data. + Builder* enable_content_defined_chunking() { + content_defined_chunking_enabled_ = true; + return this; + } + + /// \brief EXPERIMENTAL: Disable content-defined page chunking for all columns. + Builder* disable_content_defined_chunking() { + content_defined_chunking_enabled_ = false; + return this; + } + + /// \brief EXPERIMENTAL: Specify content-defined chunking options, see CdcOptions. + Builder* content_defined_chunking_options(const CdcOptions options) { Review Comment: ```suggestion Builder* content_defined_chunking_options(CdcOptions options) { ``` Or `const CdcOptions&` ? ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, + int norm_factor) { + if (min_chunk_size < 0) { + throw ParquetException("min_chunk_size must be positive"); + } + if (max_chunk_size <= min_chunk_size) { + throw ParquetException("max_chunk_size must be greater than min_chunk_size"); + } + + // calculate the average size of the chunks + int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2; + // since we are skipping the first `min_chunk_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_chunk_size` bytes; also divide by the number of gearhash tables to have a + // a more gaussian-like distribution + int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables; + + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the floor(log2(target_size)) + int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) - 1); + + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size; norm_factor is 0 by + // default + int effective_bits = mask_bits - norm_factor; + + if (effective_bits < 1 || effective_bits > 63) { + throw ParquetException( + "The number of bits in the CDC mask must be between 1 and 63, got " + + std::to_string(effective_bits)); + } else { + // create the mask by setting the top bits + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); + } +} + +class ContentDefinedChunker::Impl { + public: + Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t max_chunk_size, + int norm_factor) + : level_info_(level_info), + min_chunk_size_(min_chunk_size), + max_chunk_size_(max_chunk_size), + rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, norm_factor)) {} + + uint64_t GetRollingHashMask() const { return rolling_hash_mask_; } + + void Roll(const bool value) { Review Comment: ```suggestion void Roll(bool value) { ``` ########## cpp/src/parquet/column_writer.cc: ########## @@ -1337,13 +1368,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, bits_buffer_->ZeroPadding(); } - if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { - return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx, - maybe_parent_nulls); + if (properties_->content_defined_chunking_enabled()) { + DCHECK(content_defined_chunker_.has_value()); + auto chunks = content_defined_chunker_->GetChunks(def_levels, rep_levels, + num_levels, leaf_array); + for (size_t i = 0; i < chunks.size(); i++) { + auto chunk = chunks[i]; + auto chunk_array = leaf_array.Slice(chunk.value_offset); + auto chunk_def_levels = AddIfNotNull(def_levels, chunk.level_offset); + auto chunk_rep_levels = AddIfNotNull(rep_levels, chunk.level_offset); + if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { + ARROW_CHECK_OK(WriteArrowDictionary(chunk_def_levels, chunk_rep_levels, + chunk.levels_to_write, *chunk_array, ctx, + maybe_parent_nulls)); + } else { + ARROW_CHECK_OK(WriteArrowDense(chunk_def_levels, chunk_rep_levels, + chunk.levels_to_write, *chunk_array, ctx, + maybe_parent_nulls)); + } + bool is_last_chunk = i == (chunks.size() - 1); + if (num_buffered_values_ > 0 && !is_last_chunk) { + // Explicitly add a new data page according to the content-defined chunk + // boundaries. This way the same chunks will have the same byte-sequence + // in the resulting file, which can be identified by content addressible + // storage. + // Note that the last chunk doesn't trigger a new data page in order to + // allow subsequent WriteArrow() calls to continue writing to the same + // data page, the chunker's state is not being reset after the last chunk. + AddDataPage(); Review Comment: I'm not sure if I have raised the same concern in the previous review. Values of the same record cannot span two pages if we are using DataPageV2 or enabling page index. There are special logics inside `WriteArrowDictionary` and `WriteArrowDense` (see `pages_change_on_record_boundaries()`). Please make sure that it won't break. BTW, is it OK if a CDC chunk has been further split (by the column writer) into several data pages? I assume it should be fine when same writer configurations result in same pages. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, + int norm_factor) { + if (min_chunk_size < 0) { Review Comment: Can it be 0? If yes, the error message should be `non-negative` ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cstdint> +#include <iterator> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/logging_internal.h" +#include "arrow/visit_type_inline.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +using ::arrow::internal::checked_cast; + +static_assert(std::size(kGearhashTable) == kNumGearhashTables, + "should update CDC code to reflect number of generated hash tables"); +static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, + "each table should have 256 entries of 64 bit values"); + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_chunk_size, max_chunk_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// higher probability of matching and switching them based on the actual chunk size), we +/// rather use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_chunk_size The minimum chunk size (default 256KiB) +// @param max_chunk_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, Review Comment: Wrap it in an anonymous namespace and remove `static`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org