kszucs commented on code in PR #45360:
URL: https://github.com/apache/arrow/pull/45360#discussion_r1985082199


##########
cpp/src/parquet/chunker_internal.h:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <string>
+#include <vector>
+#include "arrow/array.h"
+#include "parquet/level_conversion.h"
+
+namespace parquet::internal {
+
+// Represents a chunk of data with level offsets and value offsets due to the
+// record shredding for nested data.
+struct Chunk {
+  int64_t level_offset;
+  int64_t value_offset;
+  int64_t levels_to_write;
+
+  Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write)
+      : level_offset(level_offset),
+        value_offset(value_offset),
+        levels_to_write(levels_to_write) {}
+};
+
+/// CDC (Content-Defined Chunking) is a technique that divides data into 
variable-sized
+/// chunks based on the content of the data itself, rather than using 
fixed-size
+/// boundaries.
+///
+/// For example, given this sequence of values in a column:
+///
+/// File1:    [1,2,3,   4,5,6,   7,8,9]
+///            chunk1   chunk2   chunk3
+///
+/// Assume there is an inserted value between 3 and 4:
+///
+/// File2:     [1,2,3,0,  4,5,6,   7,8,9]
+///            new-chunk  chunk2   chunk3
+///
+/// The chunking process will adjust to maintain stable boundaries across data
+/// modifications. Each chunk defines a new parquet data page which are 
contiguously
+/// written out to the file. Since each page compressed independently, the 
files' contents
+/// would look like the following with unique page identifiers:
+///
+/// File1:     [Page1][Page2][Page3]...
+/// File2:     [Page4][Page2][Page3]...
+///
+/// Then the parquet file is being uploaded to a content addressable storage 
system (CAS)
+/// which splits the bytes stream into content defined blobs. The CAS system 
will
+/// calculate a unique identifier for each blob, then store the blob in a 
key-value store.
+/// If the same blob is encountered again, the system can refer to the hash 
instead of
+/// physically storing the blob again. In the example above, the CAS system 
would store
+/// Page1, Page2, Page3, and Page4 only once and the required metadata to 
reassemble the
+/// files.
+/// While the deduplication is performed by the CAS system, the parquet 
chunker makes it
+/// possible to efficiently deduplicate the data by consistently dividing the 
data into
+/// chunks.
+///
+/// Implementation details:
+///
+/// Only the parquet writer must be aware of the content defined chunking, the 
reader
+/// doesn't need to know about it. Each parquet column writer holds a
+/// ContentDefinedChunker instance depending on the writer's properties. The 
chunker's
+/// state is maintained across the entire column without being reset between 
pages and row
+/// groups.
+///
+/// The chunker receives the record shredded column data (def_levels, 
rep_levels, values)
+/// and goes over the (def_level, rep_level, value) triplets one by one while 
adjusting
+/// the column-global rolling hash based on the triplet. Whenever the rolling 
hash matches
+/// a predefined mask, the chunker creates a new chunk. The chunker returns a 
vector of
+/// Chunk objects that represent the boundaries of the chunks.
+/// Note that the boundaries are deterministically calculated exclusively 
based on the
+/// data itself, so the same data will always produce the same chunks - given 
the same
+/// chunker configuration.
+///
+/// References:
+/// - FastCDC: a Fast and Efficient Content-Defined Chunking Approach for Data
+///   Deduplication
+///   https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf
+/// - Git is for Data (chunk size normalization used here is described in 
section 6.2.1):
+///   https://www.cidrdb.org/cidr2023/papers/p43-low.pdf
+class ContentDefinedChunker {
+ public:
+  /// Create a new ContentDefinedChunker instance
+  ///
+  /// @param level_info Information about definition and repetition levels
+  /// @param size_range Min/max chunk size as pair<min_size, max_size>, the 
chunker will
+  ///                   attempt to uniformly distribute the chunks between 
these extremes.
+  /// @param norm_factor Normalization factor to center the chunk size around 
the average
+  ///                    size more aggressively. By increasing the 
normalization factor,
+  ///                    probability of finding a chunk boundary increases.
+  ContentDefinedChunker(const LevelInfo& level_info,
+                        std::pair<uint64_t, uint64_t> size_range,
+                        uint8_t norm_factor = 0);
+
+  /// Get the chunk boundaries for the given column data
+  ///
+  /// @param def_levels Definition levels
+  /// @param rep_levels Repetition levels
+  /// @param num_levels Number of levels
+  /// @param values Column values as an Arrow array
+  /// @return Vector of Chunk objects representing the chunk boundaries
+  const std::vector<Chunk> GetBoundaries(const int16_t* def_levels,
+                                         const int16_t* rep_levels, int64_t 
num_levels,
+                                         const ::arrow::Array& values);
+
+ private:
+  // Update the rolling hash with a compile-time known sized value, set 
has_matched_ to
+  // true if the hash matches the mask.
+  template <typename T>
+  void Roll(const T value);
+

Review Comment:
   Well, everything is `internal` at the moment (both the header and the 
namespace), do we need to further hide the implementation? 



##########
cpp/src/parquet/chunker_internal.cc:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/chunker_internal.h"
+
+#include <cmath>
+#include <string>
+#include <vector>
+#include "arrow/array.h"
+#include "arrow/util/logging.h"
+#include "parquet/chunker_internal_hashtable.h"
+#include "parquet/exception.h"
+#include "parquet/level_conversion.h"
+
+namespace parquet::internal {
+
+// create a fake null array class with a GetView method returning 0 always
+class FakeNullArray {
+ public:
+  uint8_t GetView(int64_t i) const { return 0; }
+
+  std::shared_ptr<::arrow::DataType> type() const { return ::arrow::null(); }
+
+  int64_t null_count() const { return 0; }
+};
+
+static uint64_t GetMask(uint64_t min_size, uint64_t max_size, uint8_t 
norm_factor) {
+  // we aim for gaussian-like distribution of chunk sizes between min_size and 
max_size
+  uint64_t avg_size = (min_size + max_size) / 2;
+  // we skip calculating gearhash for the first `min_size` bytes, so we are 
looking for
+  // a smaller chunk as the average size
+  uint64_t target_size = avg_size - min_size;
+  size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size)));
+  // -3 because we are using 8 hash tables to have more gaussian-like 
distribution
+  // `norm_factor` narrows the chunk size distribution aroun avg_size
+  size_t effective_bits = mask_bits - 3 - norm_factor;
+  return std::numeric_limits<uint64_t>::max() << (64 - effective_bits);
+}
+
+ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info,
+                                             std::pair<uint64_t, uint64_t> 
size_range,
+                                             uint8_t norm_factor)
+    : level_info_(level_info),
+      min_size_(size_range.first),
+      max_size_(size_range.second),
+      hash_mask_(GetMask(size_range.first, size_range.second, norm_factor)) {}
+
+template <typename T>
+void ContentDefinedChunker::Roll(const T value) {
+  constexpr size_t BYTE_WIDTH = sizeof(T);
+  chunk_size_ += BYTE_WIDTH;
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  auto bytes = reinterpret_cast<const uint8_t*>(&value);
+  for (size_t i = 0; i < BYTE_WIDTH; ++i) {
+    rolling_hash_ = (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][bytes[i]];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+void ContentDefinedChunker::Roll(std::string_view value) {
+  chunk_size_ += value.size();
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  for (char c : value) {
+    rolling_hash_ =
+        (rolling_hash_ << 1) + 
GEARHASH_TABLE[nth_run_][static_cast<uint8_t>(c)];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+bool ContentDefinedChunker::NeedNewChunk() {
+  // decide whether to create a new chunk based on the rolling hash; 
has_matched_ is
+  // set to true if we encountered a match since the last NeedNewChunk() call
+  if (ARROW_PREDICT_FALSE(has_matched_)) {
+    has_matched_ = false;
+    // in order to have a normal distribution of chunk sizes, we only create a 
new chunk
+    // if the adjused mask matches the rolling hash 8 times in a row, each run 
uses a
+    // different gearhash table (gearhash's chunk size has exponential 
distribution, and
+    // we use central limit theorem to approximate normal distribution)
+    if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) {
+      nth_run_ = 0;
+      chunk_size_ = 0;
+      return true;
+    }
+  }
+  if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) {
+    // we have a hard limit on the maximum chunk size, not that we don't reset 
the rolling
+    // hash state here, so the next NeedNewChunk() call will continue from the 
current
+    // state
+    chunk_size_ = 0;
+    return true;
+  }
+  return false;
+}
+
+template <typename T>
+const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* 
def_levels,
+                                                          const int16_t* 
rep_levels,
+                                                          int64_t num_levels,
+                                                          const T& leaf_array) 
{
+  std::vector<Chunk> chunks;
+  bool has_def_levels = level_info_.def_level > 0;
+  bool has_rep_levels = level_info_.rep_level > 0;
+
+  if (!has_rep_levels && !has_def_levels) {
+    // fastest path for non-nested non-null data
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(leaf_array.GetView(offset));
+      ++offset;
+      if (NeedNewChunk()) {
+        chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
+        prev_offset = offset;
+      }
+    }
+    if (prev_offset < num_levels) {
+      chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
+    }
+  } else if (!has_rep_levels) {
+    // non-nested data with nulls
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(def_levels[offset]);
+      Roll(leaf_array.GetView(offset));

Review Comment:
   Done.



##########
cpp/src/parquet/chunker_internal.cc:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/chunker_internal.h"
+
+#include <cmath>
+#include <string>
+#include <vector>
+#include "arrow/array.h"
+#include "arrow/util/logging.h"
+#include "parquet/chunker_internal_hashtable.h"
+#include "parquet/exception.h"
+#include "parquet/level_conversion.h"
+
+namespace parquet::internal {
+
+// create a fake null array class with a GetView method returning 0 always
+class FakeNullArray {
+ public:
+  uint8_t GetView(int64_t i) const { return 0; }
+
+  std::shared_ptr<::arrow::DataType> type() const { return ::arrow::null(); }
+
+  int64_t null_count() const { return 0; }
+};
+
+static uint64_t GetMask(uint64_t min_size, uint64_t max_size, uint8_t 
norm_factor) {
+  // we aim for gaussian-like distribution of chunk sizes between min_size and 
max_size
+  uint64_t avg_size = (min_size + max_size) / 2;
+  // we skip calculating gearhash for the first `min_size` bytes, so we are 
looking for
+  // a smaller chunk as the average size
+  uint64_t target_size = avg_size - min_size;
+  size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size)));
+  // -3 because we are using 8 hash tables to have more gaussian-like 
distribution
+  // `norm_factor` narrows the chunk size distribution aroun avg_size
+  size_t effective_bits = mask_bits - 3 - norm_factor;
+  return std::numeric_limits<uint64_t>::max() << (64 - effective_bits);
+}
+
+ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info,
+                                             std::pair<uint64_t, uint64_t> 
size_range,
+                                             uint8_t norm_factor)
+    : level_info_(level_info),
+      min_size_(size_range.first),
+      max_size_(size_range.second),
+      hash_mask_(GetMask(size_range.first, size_range.second, norm_factor)) {}
+
+template <typename T>
+void ContentDefinedChunker::Roll(const T value) {
+  constexpr size_t BYTE_WIDTH = sizeof(T);
+  chunk_size_ += BYTE_WIDTH;
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  auto bytes = reinterpret_cast<const uint8_t*>(&value);
+  for (size_t i = 0; i < BYTE_WIDTH; ++i) {
+    rolling_hash_ = (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][bytes[i]];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+void ContentDefinedChunker::Roll(std::string_view value) {
+  chunk_size_ += value.size();
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  for (char c : value) {
+    rolling_hash_ =
+        (rolling_hash_ << 1) + 
GEARHASH_TABLE[nth_run_][static_cast<uint8_t>(c)];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+bool ContentDefinedChunker::NeedNewChunk() {
+  // decide whether to create a new chunk based on the rolling hash; 
has_matched_ is
+  // set to true if we encountered a match since the last NeedNewChunk() call
+  if (ARROW_PREDICT_FALSE(has_matched_)) {
+    has_matched_ = false;
+    // in order to have a normal distribution of chunk sizes, we only create a 
new chunk
+    // if the adjused mask matches the rolling hash 8 times in a row, each run 
uses a
+    // different gearhash table (gearhash's chunk size has exponential 
distribution, and
+    // we use central limit theorem to approximate normal distribution)
+    if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) {
+      nth_run_ = 0;
+      chunk_size_ = 0;
+      return true;
+    }
+  }
+  if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) {
+    // we have a hard limit on the maximum chunk size, not that we don't reset 
the rolling
+    // hash state here, so the next NeedNewChunk() call will continue from the 
current
+    // state
+    chunk_size_ = 0;
+    return true;
+  }
+  return false;
+}
+
+template <typename T>
+const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* 
def_levels,
+                                                          const int16_t* 
rep_levels,
+                                                          int64_t num_levels,
+                                                          const T& leaf_array) 
{
+  std::vector<Chunk> chunks;
+  bool has_def_levels = level_info_.def_level > 0;
+  bool has_rep_levels = level_info_.rep_level > 0;
+
+  if (!has_rep_levels && !has_def_levels) {
+    // fastest path for non-nested non-null data
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(leaf_array.GetView(offset));
+      ++offset;
+      if (NeedNewChunk()) {
+        chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
+        prev_offset = offset;
+      }
+    }
+    if (prev_offset < num_levels) {
+      chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
+    }
+  } else if (!has_rep_levels) {
+    // non-nested data with nulls
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(def_levels[offset]);
+      Roll(leaf_array.GetView(offset));

Review Comment:
   Fixed it, this possibly improves the performance as well.



##########
cpp/src/parquet/chunker_internal.cc:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/chunker_internal.h"
+
+#include <cmath>
+#include <string>
+#include <vector>
+#include "arrow/array.h"
+#include "arrow/util/logging.h"
+#include "parquet/chunker_internal_hashtable.h"
+#include "parquet/exception.h"
+#include "parquet/level_conversion.h"
+
+namespace parquet::internal {
+
+// create a fake null array class with a GetView method returning 0 always
+class FakeNullArray {
+ public:
+  uint8_t GetView(int64_t i) const { return 0; }
+
+  std::shared_ptr<::arrow::DataType> type() const { return ::arrow::null(); }
+
+  int64_t null_count() const { return 0; }
+};
+
+static uint64_t GetMask(uint64_t min_size, uint64_t max_size, uint8_t 
norm_factor) {
+  // we aim for gaussian-like distribution of chunk sizes between min_size and 
max_size
+  uint64_t avg_size = (min_size + max_size) / 2;
+  // we skip calculating gearhash for the first `min_size` bytes, so we are 
looking for
+  // a smaller chunk as the average size
+  uint64_t target_size = avg_size - min_size;
+  size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size)));
+  // -3 because we are using 8 hash tables to have more gaussian-like 
distribution
+  // `norm_factor` narrows the chunk size distribution aroun avg_size
+  size_t effective_bits = mask_bits - 3 - norm_factor;
+  return std::numeric_limits<uint64_t>::max() << (64 - effective_bits);
+}
+
+ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info,
+                                             std::pair<uint64_t, uint64_t> 
size_range,
+                                             uint8_t norm_factor)
+    : level_info_(level_info),
+      min_size_(size_range.first),
+      max_size_(size_range.second),
+      hash_mask_(GetMask(size_range.first, size_range.second, norm_factor)) {}
+
+template <typename T>
+void ContentDefinedChunker::Roll(const T value) {
+  constexpr size_t BYTE_WIDTH = sizeof(T);
+  chunk_size_ += BYTE_WIDTH;
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  auto bytes = reinterpret_cast<const uint8_t*>(&value);
+  for (size_t i = 0; i < BYTE_WIDTH; ++i) {
+    rolling_hash_ = (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][bytes[i]];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+void ContentDefinedChunker::Roll(std::string_view value) {
+  chunk_size_ += value.size();
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  for (char c : value) {
+    rolling_hash_ =
+        (rolling_hash_ << 1) + 
GEARHASH_TABLE[nth_run_][static_cast<uint8_t>(c)];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+bool ContentDefinedChunker::NeedNewChunk() {
+  // decide whether to create a new chunk based on the rolling hash; 
has_matched_ is
+  // set to true if we encountered a match since the last NeedNewChunk() call
+  if (ARROW_PREDICT_FALSE(has_matched_)) {
+    has_matched_ = false;
+    // in order to have a normal distribution of chunk sizes, we only create a 
new chunk
+    // if the adjused mask matches the rolling hash 8 times in a row, each run 
uses a
+    // different gearhash table (gearhash's chunk size has exponential 
distribution, and
+    // we use central limit theorem to approximate normal distribution)
+    if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) {
+      nth_run_ = 0;
+      chunk_size_ = 0;
+      return true;
+    }
+  }
+  if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) {
+    // we have a hard limit on the maximum chunk size, not that we don't reset 
the rolling
+    // hash state here, so the next NeedNewChunk() call will continue from the 
current
+    // state
+    chunk_size_ = 0;
+    return true;
+  }
+  return false;
+}
+
+template <typename T>
+const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* 
def_levels,
+                                                          const int16_t* 
rep_levels,
+                                                          int64_t num_levels,
+                                                          const T& leaf_array) 
{
+  std::vector<Chunk> chunks;
+  bool has_def_levels = level_info_.def_level > 0;
+  bool has_rep_levels = level_info_.rep_level > 0;
+
+  if (!has_rep_levels && !has_def_levels) {
+    // fastest path for non-nested non-null data
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(leaf_array.GetView(offset));
+      ++offset;
+      if (NeedNewChunk()) {
+        chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
+        prev_offset = offset;
+      }
+    }
+    if (prev_offset < num_levels) {
+      chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
+    }
+  } else if (!has_rep_levels) {
+    // non-nested data with nulls
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(def_levels[offset]);
+      Roll(leaf_array.GetView(offset));
+      ++offset;
+      if (NeedNewChunk()) {
+        chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
+        prev_offset = offset;
+      }
+    }
+    if (prev_offset < num_levels) {
+      chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
+    }
+  } else {
+    // nested data with nulls
+    bool has_leaf_value;
+    bool is_record_boundary;
+    int16_t def_level;
+    int16_t rep_level;
+    int64_t value_offset = 0;
+    int64_t record_level_offset = 0;
+    int64_t record_value_offset = 0;
+
+    for (int64_t level_offset = 0; level_offset < num_levels; ++level_offset) {
+      def_level = def_levels[level_offset];
+      rep_level = rep_levels[level_offset];
+
+      has_leaf_value = def_level >= level_info_.repeated_ancestor_def_level;
+      is_record_boundary = rep_level == 0;
+
+      Roll(def_level);
+      Roll(rep_level);
+      if (has_leaf_value) {
+        Roll(leaf_array.GetView(value_offset));

Review Comment:
   Updated this as well to only hash non-null leaf values.



##########
cpp/src/parquet/chunker_internal_test.cc:
##########
@@ -0,0 +1,908 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/table.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/float16.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
+#include "parquet/arrow/test_util.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/column_writer.h"
+#include "parquet/file_writer.h"
+
+namespace parquet {
+
+using ::arrow::Array;
+using ::arrow::ChunkedArray;
+using ::arrow::ConcatenateTables;
+using ::arrow::DataType;
+using ::arrow::default_memory_pool;
+using ::arrow::Field;
+using ::arrow::Result;
+using ::arrow::Table;
+using ::arrow::io::BufferReader;
+using ::parquet::arrow::FileReader;
+using ::parquet::arrow::FileReaderBuilder;
+using ::parquet::arrow::MakeSimpleTable;
+using ::parquet::arrow::NonNullArray;
+using ::parquet::arrow::WriteTable;
+
+using ::testing::Bool;
+using ::testing::Combine;
+using ::testing::Values;
+
+// generate determinisic and platform-independent data
+inline uint64_t hash(uint64_t seed, uint64_t index) {
+  uint64_t h = (index + seed) * 0xc4ceb9fe1a85ec53ull;
+  h ^= h >> 33;
+  h *= 0xff51afd7ed558ccdull;
+  h ^= h >> 33;
+  h *= 0xc4ceb9fe1a85ec53ull;
+  h ^= h >> 33;
+  return h;
+}
+
+#define GENERATE_CASE_BODY(BUILDER_TYPE, VALUE_EXPR)   \
+  {                                                    \
+    BUILDER_TYPE builder(type, default_memory_pool()); \
+    if (nullable) {                                    \
+      for (int64_t i = 0; i < length; ++i) {           \
+        uint64_t val = hash(seed, i);                  \
+        if (val % 10 == 0) {                           \
+          RETURN_NOT_OK(builder.AppendNull());         \
+        } else {                                       \
+          RETURN_NOT_OK(builder.Append(VALUE_EXPR));   \
+        }                                              \
+      }                                                \
+    } else {                                           \
+      for (int64_t i = 0; i < length; ++i) {           \
+        uint64_t val = hash(seed, i);                  \
+        RETURN_NOT_OK(builder.Append(VALUE_EXPR));     \
+      }                                                \
+    }                                                  \
+    std::shared_ptr<Array> array;                      \
+    RETURN_NOT_OK(builder.Finish(&array));             \
+    RETURN_NOT_OK(array->ValidateFull());              \
+    return array;                                      \
+  }
+
+// Macro to generate a case for a given scalar type.
+#define GENERATE_CASE(TYPE_ID, BUILDER_TYPE, VALUE_EXPR) \
+  case ::arrow::Type::TYPE_ID: {                         \
+    GENERATE_CASE_BODY(BUILDER_TYPE, VALUE_EXPR)         \
+  }
+
+Result<std::shared_ptr<Array>> GenerateArray(const std::shared_ptr<Field>& 
field,
+                                             int64_t length, uint64_t seed) {
+  const std::shared_ptr<DataType>& type = field->type();
+  bool nullable = field->nullable();
+
+  switch (type->id()) {
+    GENERATE_CASE(BOOL, ::arrow::BooleanBuilder, (val % 2 == 0))
+
+    // Numeric types.
+    GENERATE_CASE(INT8, ::arrow::Int8Builder, static_cast<int8_t>(val))
+    GENERATE_CASE(INT16, ::arrow::Int16Builder, static_cast<int16_t>(val))
+    GENERATE_CASE(INT32, ::arrow::Int32Builder, static_cast<int32_t>(val))
+    GENERATE_CASE(INT64, ::arrow::Int64Builder, static_cast<int64_t>(val))
+    GENERATE_CASE(UINT8, ::arrow::UInt8Builder, static_cast<uint8_t>(val))
+    GENERATE_CASE(UINT16, ::arrow::UInt16Builder, static_cast<uint16_t>(val))
+    GENERATE_CASE(UINT32, ::arrow::UInt32Builder, static_cast<uint32_t>(val))
+    GENERATE_CASE(UINT64, ::arrow::UInt64Builder, static_cast<uint64_t>(val))
+    GENERATE_CASE(HALF_FLOAT, ::arrow::HalfFloatBuilder,
+                  static_cast<uint16_t>(val % 1000))
+    GENERATE_CASE(FLOAT, ::arrow::FloatBuilder, static_cast<float>(val % 1000) 
/ 1000.0f)
+    GENERATE_CASE(DOUBLE, ::arrow::DoubleBuilder,
+                  static_cast<double>(val % 100000) / 1000.0)
+    case ::arrow::Type::DECIMAL128: {
+      const auto& decimal_type = static_cast<const 
::arrow::Decimal128Type&>(*type);
+      // Limit the value to fit within the specified precision
+      int32_t max_exponent = decimal_type.precision() - decimal_type.scale();
+      int64_t max_value = static_cast<int64_t>(std::pow(10, max_exponent) - 1);
+      GENERATE_CASE_BODY(::arrow::Decimal128Builder, ::arrow::Decimal128(val % 
max_value))
+    }
+    case ::arrow::Type::DECIMAL256: {
+      const auto& decimal_type = static_cast<const 
::arrow::Decimal256Type&>(*type);
+      // Limit the value to fit within the specified precision, capped at 9 to 
avoid
+      // int64_t overflow
+      int32_t max_exponent = std::min(9, decimal_type.precision() - 
decimal_type.scale());
+      int64_t max_value = static_cast<int64_t>(std::pow(10, max_exponent) - 1);
+      GENERATE_CASE_BODY(::arrow::Decimal256Builder, ::arrow::Decimal256(val % 
max_value))
+    }
+
+      // Temporal types
+      GENERATE_CASE(DATE32, ::arrow::Date32Builder, static_cast<int32_t>(val))
+      GENERATE_CASE(TIME32, ::arrow::Time32Builder,
+                    std::abs(static_cast<int32_t>(val) % 86400000))
+      GENERATE_CASE(TIME64, ::arrow::Time64Builder,
+                    std::abs(static_cast<int64_t>(val) % 86400000000))
+      GENERATE_CASE(TIMESTAMP, ::arrow::TimestampBuilder, 
static_cast<int64_t>(val))
+      GENERATE_CASE(DURATION, ::arrow::DurationBuilder, 
static_cast<int64_t>(val))
+
+      // Binary and string types.
+      GENERATE_CASE(STRING, ::arrow::StringBuilder,
+                    std::string("str_") + std::to_string(val))
+      GENERATE_CASE(LARGE_STRING, ::arrow::LargeStringBuilder,
+                    std::string("str_") + std::to_string(val))
+      GENERATE_CASE(BINARY, ::arrow::BinaryBuilder,
+                    std::string("bin_") + std::to_string(val))
+    case ::arrow::Type::FIXED_SIZE_BINARY: {
+      auto size = 
static_cast<::arrow::FixedSizeBinaryType*>(type.get())->byte_width();
+      GENERATE_CASE_BODY(::arrow::FixedSizeBinaryBuilder,
+                         std::string("bin_") + std::to_string(val).substr(0, 
size - 4))
+    }
+
+    case ::arrow::Type::STRUCT: {
+      auto struct_type = static_cast<::arrow::StructType*>(type.get());
+      std::vector<std::shared_ptr<Array>> child_arrays;
+      for (auto i = 0; i < struct_type->num_fields(); i++) {
+        ARROW_ASSIGN_OR_RAISE(auto child_array,
+                              GenerateArray(struct_type->field(i), length,
+                                            seed + static_cast<uint64_t>(i + 
300)));
+        child_arrays.push_back(child_array);
+      }
+      auto struct_array =
+          std::make_shared<::arrow::StructArray>(type, length, child_arrays);
+      return struct_array;
+    }
+
+    case ::arrow::Type::LIST: {
+      auto list_type = static_cast<::arrow::ListType*>(type.get());
+      auto value_field = ::arrow::field("item", list_type->value_type());
+      ARROW_ASSIGN_OR_RAISE(auto values_array, GenerateArray(value_field, 
length, seed));
+      auto offset_builder = ::arrow::Int32Builder();
+      auto bitmap_builder = ::arrow::TypedBufferBuilder<bool>();
+
+      int32_t num_nulls = 0;
+      int32_t num_elements = 0;
+      uint8_t element_size = 0;
+      int32_t current_offset = 0;
+      RETURN_NOT_OK(offset_builder.Append(current_offset));
+      while (current_offset < length) {
+        num_elements++;
+        auto is_valid = !(nullable && (num_elements % 10 == 0));
+        if (is_valid) {
+          RETURN_NOT_OK(bitmap_builder.Append(true));
+          current_offset += element_size;
+          if (current_offset > length) {
+            RETURN_NOT_OK(offset_builder.Append(static_cast<int32_t>(length)));
+            break;
+          } else {
+            RETURN_NOT_OK(offset_builder.Append(current_offset));
+          }
+        } else {
+          
RETURN_NOT_OK(offset_builder.Append(static_cast<int32_t>(current_offset)));
+          RETURN_NOT_OK(bitmap_builder.Append(false));
+          num_nulls++;
+        }
+
+        if (element_size > 4) {
+          element_size = 0;
+        } else {
+          element_size++;
+        }
+      }
+
+      std::shared_ptr<Array> offsets_array;
+      RETURN_NOT_OK(offset_builder.Finish(&offsets_array));
+      std::shared_ptr<Buffer> bitmap_buffer;
+      RETURN_NOT_OK(bitmap_builder.Finish(&bitmap_buffer));
+      ARROW_ASSIGN_OR_RAISE(
+          auto list_array, ::arrow::ListArray::FromArrays(
+                               type, *offsets_array, *values_array, 
default_memory_pool(),
+                               bitmap_buffer, num_nulls));
+      RETURN_NOT_OK(list_array->ValidateFull());
+      return list_array;
+    }
+
+    default:
+      return ::arrow::Status::NotImplemented("Unsupported data type " + 
type->ToString());
+  }
+}
+
+Result<std::shared_ptr<Table>> GenerateTable(
+    const std::shared_ptr<::arrow::Schema>& schema, int64_t size, uint64_t 
seed = 0) {
+  std::vector<std::shared_ptr<Array>> arrays;
+  for (const auto& field : schema->fields()) {
+    ARROW_ASSIGN_OR_RAISE(auto array, GenerateArray(field, size, seed));
+    arrays.push_back(array);
+  }
+  return Table::Make(schema, arrays, size);
+}
+
+Result<std::shared_ptr<Table>> ConcatAndCombine(
+    const std::vector<std::shared_ptr<Table>>& parts) {
+  // Concatenate and combine chunks so the table doesn't carry information 
about
+  // the modification points
+  ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(parts));
+  return table->CombineChunks();
+}
+
+Result<std::shared_ptr<Buffer>> WriteTableToBuffer(const 
std::shared_ptr<Table>& table,
+                                                   uint64_t min_chunk_size,
+                                                   uint64_t max_chunk_size,
+                                                   bool enable_dictionary = 
false,
+
+                                                   int64_t row_group_size = 
1024 * 1024) {
+  auto sink = CreateOutputStream();
+
+  auto builder = WriterProperties::Builder();
+  builder.enable_cdc()
+      ->cdc_size_range(min_chunk_size, max_chunk_size)
+      ->cdc_norm_factor(0);
+  if (enable_dictionary) {
+    builder.enable_dictionary();
+  } else {
+    builder.disable_dictionary();
+  }

Review Comment:
   Yes, I am planning to add more tests to cover other cases as well. 



##########
cpp/src/parquet/chunker_internal.cc:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/chunker_internal.h"
+
+#include <cmath>
+#include <string>
+#include <vector>
+#include "arrow/array.h"
+#include "arrow/util/logging.h"
+#include "parquet/chunker_internal_hashtable.h"
+#include "parquet/exception.h"
+#include "parquet/level_conversion.h"
+
+namespace parquet::internal {
+
+// create a fake null array class with a GetView method returning 0 always
+class FakeNullArray {
+ public:
+  uint8_t GetView(int64_t i) const { return 0; }
+
+  std::shared_ptr<::arrow::DataType> type() const { return ::arrow::null(); }
+
+  int64_t null_count() const { return 0; }
+};
+
+static uint64_t GetMask(uint64_t min_size, uint64_t max_size, uint8_t 
norm_factor) {
+  // we aim for gaussian-like distribution of chunk sizes between min_size and 
max_size
+  uint64_t avg_size = (min_size + max_size) / 2;
+  // we skip calculating gearhash for the first `min_size` bytes, so we are 
looking for
+  // a smaller chunk as the average size
+  uint64_t target_size = avg_size - min_size;
+  size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size)));
+  // -3 because we are using 8 hash tables to have more gaussian-like 
distribution
+  // `norm_factor` narrows the chunk size distribution aroun avg_size
+  size_t effective_bits = mask_bits - 3 - norm_factor;
+  return std::numeric_limits<uint64_t>::max() << (64 - effective_bits);
+}
+
+ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info,
+                                             std::pair<uint64_t, uint64_t> 
size_range,
+                                             uint8_t norm_factor)
+    : level_info_(level_info),
+      min_size_(size_range.first),
+      max_size_(size_range.second),
+      hash_mask_(GetMask(size_range.first, size_range.second, norm_factor)) {}
+
+template <typename T>
+void ContentDefinedChunker::Roll(const T value) {
+  constexpr size_t BYTE_WIDTH = sizeof(T);
+  chunk_size_ += BYTE_WIDTH;
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  auto bytes = reinterpret_cast<const uint8_t*>(&value);
+  for (size_t i = 0; i < BYTE_WIDTH; ++i) {
+    rolling_hash_ = (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][bytes[i]];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+void ContentDefinedChunker::Roll(std::string_view value) {
+  chunk_size_ += value.size();
+  if (chunk_size_ < min_size_) {
+    // short-circuit if we haven't reached the minimum chunk size, this speeds 
up the
+    // chunking process since the gearhash doesn't need to be updated
+    return;
+  }
+  for (char c : value) {
+    rolling_hash_ =
+        (rolling_hash_ << 1) + 
GEARHASH_TABLE[nth_run_][static_cast<uint8_t>(c)];
+    has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0);
+  }
+}
+
+bool ContentDefinedChunker::NeedNewChunk() {
+  // decide whether to create a new chunk based on the rolling hash; 
has_matched_ is
+  // set to true if we encountered a match since the last NeedNewChunk() call
+  if (ARROW_PREDICT_FALSE(has_matched_)) {
+    has_matched_ = false;
+    // in order to have a normal distribution of chunk sizes, we only create a 
new chunk
+    // if the adjused mask matches the rolling hash 8 times in a row, each run 
uses a
+    // different gearhash table (gearhash's chunk size has exponential 
distribution, and
+    // we use central limit theorem to approximate normal distribution)
+    if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) {
+      nth_run_ = 0;
+      chunk_size_ = 0;
+      return true;
+    }
+  }
+  if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) {
+    // we have a hard limit on the maximum chunk size, not that we don't reset 
the rolling
+    // hash state here, so the next NeedNewChunk() call will continue from the 
current
+    // state
+    chunk_size_ = 0;
+    return true;
+  }
+  return false;
+}
+
+template <typename T>
+const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* 
def_levels,
+                                                          const int16_t* 
rep_levels,
+                                                          int64_t num_levels,
+                                                          const T& leaf_array) 
{
+  std::vector<Chunk> chunks;
+  bool has_def_levels = level_info_.def_level > 0;
+  bool has_rep_levels = level_info_.rep_level > 0;
+
+  if (!has_rep_levels && !has_def_levels) {
+    // fastest path for non-nested non-null data
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(leaf_array.GetView(offset));
+      ++offset;
+      if (NeedNewChunk()) {
+        chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
+        prev_offset = offset;
+      }
+    }
+    if (prev_offset < num_levels) {
+      chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
+    }
+  } else if (!has_rep_levels) {
+    // non-nested data with nulls
+    int64_t offset = 0;
+    int64_t prev_offset = 0;
+    while (offset < num_levels) {
+      Roll(def_levels[offset]);
+      Roll(leaf_array.GetView(offset));
+      ++offset;
+      if (NeedNewChunk()) {
+        chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset);
+        prev_offset = offset;
+      }
+    }
+    if (prev_offset < num_levels) {
+      chunks.emplace_back(prev_offset, prev_offset, num_levels - prev_offset);
+    }
+  } else {
+    // nested data with nulls
+    bool has_leaf_value;
+    bool is_record_boundary;
+    int16_t def_level;
+    int16_t rep_level;
+    int64_t value_offset = 0;
+    int64_t record_level_offset = 0;
+    int64_t record_value_offset = 0;
+
+    for (int64_t level_offset = 0; level_offset < num_levels; ++level_offset) {
+      def_level = def_levels[level_offset];
+      rep_level = rep_levels[level_offset];
+
+      has_leaf_value = def_level >= level_info_.repeated_ancestor_def_level;
+      is_record_boundary = rep_level == 0;
+
+      Roll(def_level);
+      Roll(rep_level);
+      if (has_leaf_value) {
+        Roll(leaf_array.GetView(value_offset));
+      }
+
+      if (is_record_boundary && NeedNewChunk()) {
+        auto levels_to_write = level_offset - record_level_offset;
+        if (levels_to_write > 0) {
+          chunks.emplace_back(record_level_offset, record_value_offset, 
levels_to_write);
+          record_level_offset = level_offset;
+          record_value_offset = value_offset;
+        }
+      }
+
+      if (has_leaf_value) {
+        ++value_offset;
+      }
+    }
+
+    auto levels_to_write = num_levels - record_level_offset;
+    if (levels_to_write > 0) {
+      chunks.emplace_back(record_level_offset, record_value_offset, 
levels_to_write);
+    }
+  }
+
+  return chunks;
+}
+
+#define PRIMITIVE_CASE(TYPE_ID, ArrowType)               \
+  case ::arrow::Type::TYPE_ID:                           \
+    return Calculate(def_levels, rep_levels, num_levels, \
+                     static_cast<const ::arrow::ArrowType##Array&>(values));
+

Review Comment:
   I assume we should prefer working with the underlying buffer(s) directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to