This is an automated email from the ASF dual-hosted git repository.
felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2fa095c839 GH-39815: [C++] Document and micro-optimize
ChunkResolver::Resolve() (#39817)
2fa095c839 is described below
commit 2fa095c8393a9ff54fcf84c52f47f75152243b1e
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Mon Jan 29 11:09:12 2024 -0300
GH-39815: [C++] Document and micro-optimize ChunkResolver::Resolve()
(#39817)
### Rationale for this change
There has been interest in improving operations on chunked-arrays and even
though `ChunkResolver::Resolve()` is not a big contributor in most kernels, the
fact that it can be used from tight loops warrants careful attention to branch
prediction and memory effects of its implementation.
### What changes are included in this PR?
- Documentation of invariants and behavior of functions
- Multiple optimizations justified by microbenchmarks
- Addition of a variation of `Resolve` that takes a hint as parameter
- Fix of an out-of-bounds memory access that doesn't affect correctness
(it can only reduce effectiveness of cache in very rare situations, but is
nevertheless an issue)
### Are these changes tested?
Yes, by existing tests.
### Are there any user-facing changes?
- The `arrow::internal::ChunkResolver::Bisect()` function was `protected`
and is now `private` with a different signature
* Closes: #39815
Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
---
cpp/src/arrow/chunk_resolver.h | 156 +++++++++++++++++++--------
cpp/src/arrow/compute/kernels/vector_sort.cc | 25 +++--
2 files changed, 129 insertions(+), 52 deletions(-)
diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h
index 818070ffe3..d3ae315568 100644
--- a/cpp/src/arrow/chunk_resolver.h
+++ b/cpp/src/arrow/chunk_resolver.h
@@ -18,87 +18,151 @@
#pragma once
#include <atomic>
+#include <cassert>
#include <cstdint>
#include <vector>
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
-namespace arrow {
-namespace internal {
+namespace arrow::internal {
struct ChunkLocation {
- int64_t chunk_index, index_in_chunk;
+ /// \brief Index of the chunk in the array of chunks
+ ///
+ /// The value is always in the range `[0, chunks.size()]`. `chunks.size()`
is used
+ /// to represent out-of-bounds locations.
+ int64_t chunk_index;
+
+ /// \brief Index of the value in the chunk
+ ///
+ /// The value is undefined if chunk_index >= chunks.size()
+ int64_t index_in_chunk;
};
-// An object that resolves an array chunk depending on a logical index
+/// \brief An utility that incrementally resolves logical indices into
+/// physical indices in a chunked array.
struct ARROW_EXPORT ChunkResolver {
- explicit ChunkResolver(const ArrayVector& chunks);
+ private:
+ /// \brief Array containing `chunks.size() + 1` offsets.
+ ///
+ /// `offsets_[i]` is the starting logical index of chunk `i`. `offsets_[0]`
is always 0
+ /// and `offsets_[chunks.size()]` is the logical length of the chunked array.
+ std::vector<int64_t> offsets_;
- explicit ChunkResolver(const std::vector<const Array*>& chunks);
+ /// \brief Cache of the index of the last resolved chunk.
+ ///
+ /// \invariant `cached_chunk_ in [0, chunks.size()]`
+ mutable std::atomic<int64_t> cached_chunk_;
+ public:
+ explicit ChunkResolver(const ArrayVector& chunks);
+ explicit ChunkResolver(const std::vector<const Array*>& chunks);
explicit ChunkResolver(const RecordBatchVector& batches);
ChunkResolver(ChunkResolver&& other) noexcept
- : offsets_(std::move(other.offsets_)),
cached_chunk_(other.cached_chunk_.load()) {}
+ : offsets_(std::move(other.offsets_)),
+ cached_chunk_(other.cached_chunk_.load(std::memory_order_relaxed)) {}
ChunkResolver& operator=(ChunkResolver&& other) {
offsets_ = std::move(other.offsets_);
- cached_chunk_.store(other.cached_chunk_.load());
+ cached_chunk_.store(other.cached_chunk_.load(std::memory_order_relaxed));
return *this;
}
- /// \brief Return a ChunkLocation containing the chunk index and in-chunk
value index of
- /// the chunked array at logical index
- inline ChunkLocation Resolve(const int64_t index) const {
- // It is common for the algorithms below to make consecutive accesses at
- // a relatively small distance from each other, hence often falling in
- // the same chunk.
- // This is trivial when merging (assuming each side of the merge uses
- // its own resolver), but also in the inner recursive invocations of
+ /// \brief Resolve a logical index to a ChunkLocation.
+ ///
+ /// The returned ChunkLocation contains the chunk index and the within-chunk
index
+ /// equivalent to the logical index.
+ ///
+ /// \pre index >= 0
+ /// \post location.chunk_index in [0, chunks.size()]
+ /// \param index The logical index to resolve
+ /// \return ChunkLocation with a valid chunk_index if index is within
+ /// bounds, or with chunk_index == chunks.size() if logical index is
+ /// `>= chunked_array.length()`.
+ inline ChunkLocation Resolve(int64_t index) const {
+ const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed);
+ const auto chunk_index =
+ ResolveChunkIndex</*StoreCachedChunk=*/true>(index, cached_chunk);
+ return {chunk_index, index - offsets_[chunk_index]};
+ }
+
+ /// \brief Resolve a logical index to a ChunkLocation.
+ ///
+ /// The returned ChunkLocation contains the chunk index and the within-chunk
index
+ /// equivalent to the logical index.
+ ///
+ /// \pre index >= 0
+ /// \post location.chunk_index in [0, chunks.size()]
+ /// \param index The logical index to resolve
+ /// \param cached_chunk_index 0 or the chunk_index of the last ChunkLocation
+ /// returned by this ChunkResolver.
+ /// \return ChunkLocation with a valid chunk_index if index is within
+ /// bounds, or with chunk_index == chunks.size() if logical index is
+ /// `>= chunked_array.length()`.
+ inline ChunkLocation ResolveWithChunkIndexHint(int64_t index,
+ int64_t cached_chunk_index)
const {
+ assert(cached_chunk_index < static_cast<int64_t>(offsets_.size()));
+ const auto chunk_index =
+ ResolveChunkIndex</*StoreCachedChunk=*/false>(index,
cached_chunk_index);
+ return {chunk_index, index - offsets_[chunk_index]};
+ }
+
+ private:
+ template <bool StoreCachedChunk>
+ inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const {
+ // It is common for algorithms sequentially processing arrays to make
consecutive
+ // accesses at a relatively small distance from each other, hence often
falling in the
+ // same chunk.
+ //
+ // This is guaranteed when merging (assuming each side of the merge uses
its
+ // own resolver), and is the most common case in recursive invocations of
// partitioning.
- if (offsets_.size() <= 1) {
- return {0, index};
+ const auto num_offsets = static_cast<int64_t>(offsets_.size());
+ const int64_t* offsets = offsets_.data();
+ if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) &&
+ (cached_chunk + 1 == num_offsets || index < offsets[cached_chunk +
1])) {
+ return cached_chunk;
}
- const auto cached_chunk = cached_chunk_.load();
- const bool cache_hit =
- (index >= offsets_[cached_chunk] && index < offsets_[cached_chunk +
1]);
- if (ARROW_PREDICT_TRUE(cache_hit)) {
- return {cached_chunk, index - offsets_[cached_chunk]};
+ // lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
+ const auto chunk_index = Bisect(index, offsets, /*lo=*/0,
/*hi=*/num_offsets);
+ if constexpr (StoreCachedChunk) {
+ assert(chunk_index < static_cast<int64_t>(offsets_.size()));
+ cached_chunk_.store(chunk_index, std::memory_order_relaxed);
}
- auto chunk_index = Bisect(index);
- cached_chunk_.store(chunk_index);
- return {chunk_index, index - offsets_[chunk_index]};
+ return chunk_index;
}
- protected:
- // Find the chunk index corresponding to a value index using binary search
- inline int64_t Bisect(const int64_t index) const {
- // Like std::upper_bound(), but hand-written as it can help the compiler.
- // Search [lo, lo + n)
- int64_t lo = 0;
- auto n = static_cast<int64_t>(offsets_.size());
- while (n > 1) {
+ /// \brief Find the index of the chunk that contains the logical index.
+ ///
+ /// Any non-negative index is accepted. When `hi=num_offsets`, the largest
+ /// possible return value is `num_offsets-1` which is equal to
+ /// `chunks.size()`. The is returned when the logical index is out-of-bounds.
+ ///
+ /// \pre index >= 0
+ /// \pre lo < hi
+ /// \pre lo >= 0 && hi <= offsets_.size()
+ static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t
lo,
+ int64_t hi) {
+ // Similar to std::upper_bound(), but slightly different as our offsets
+ // array always starts with 0.
+ auto n = hi - lo;
+ // First iteration does not need to check for n > 1
+ // (lo < hi is guaranteed by the precondition).
+ assert(n > 1 && "lo < hi is a precondition of Bisect");
+ do {
const int64_t m = n >> 1;
const int64_t mid = lo + m;
- if (static_cast<int64_t>(index) >= offsets_[mid]) {
+ if (index >= offsets[mid]) {
lo = mid;
n -= m;
} else {
n = m;
}
- }
+ } while (n > 1);
return lo;
}
-
- private:
- // Collection of starting offsets used for binary search
- std::vector<int64_t> offsets_;
-
- // Tracks the most recently used chunk index to allow fast
- // access for consecutive indices corresponding to the same chunk
- mutable std::atomic<int64_t> cached_chunk_;
};
-} // namespace internal
-} // namespace arrow
+} // namespace arrow::internal
diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc
b/cpp/src/arrow/compute/kernels/vector_sort.cc
index e08a2bc103..d3914173b6 100644
--- a/cpp/src/arrow/compute/kernels/vector_sort.cc
+++ b/cpp/src/arrow/compute/kernels/vector_sort.cc
@@ -24,6 +24,7 @@
namespace arrow {
using internal::checked_cast;
+using internal::ChunkLocation;
namespace compute {
namespace internal {
@@ -748,11 +749,15 @@ class TableSorter {
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];
+ ChunkLocation left_loc{0, 0};
+ ChunkLocation right_loc{0, 0};
std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end,
temp_indices,
[&](uint64_t left, uint64_t right) {
// First column is either null or nan
- const auto left_loc = left_resolver_.Resolve(left);
- const auto right_loc = right_resolver_.Resolve(right);
+ left_loc =
+ left_resolver_.ResolveWithChunkIndexHint(left,
left_loc.chunk_index);
+ right_loc = right_resolver_.ResolveWithChunkIndexHint(
+ right, right_loc.chunk_index);
auto chunk_left =
first_sort_key.GetChunk<ArrayType>(left_loc);
auto chunk_right =
first_sort_key.GetChunk<ArrayType>(right_loc);
const auto left_is_null = chunk_left.IsNull();
@@ -783,11 +788,15 @@ class TableSorter {
// Untyped implementation
auto& comparator = comparator_;
+ ChunkLocation left_loc{0, 0};
+ ChunkLocation right_loc{0, 0};
std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end,
temp_indices,
[&](uint64_t left, uint64_t right) {
// First column is always null
- const auto left_loc = left_resolver_.Resolve(left);
- const auto right_loc = right_resolver_.Resolve(right);
+ left_loc =
+ left_resolver_.ResolveWithChunkIndexHint(left,
left_loc.chunk_index);
+ right_loc = right_resolver_.ResolveWithChunkIndexHint(
+ right, right_loc.chunk_index);
return comparator.Compare(left_loc, right_loc, 1);
});
// Copy back temp area into main buffer
@@ -807,11 +816,15 @@ class TableSorter {
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];
+ ChunkLocation left_loc{0, 0};
+ ChunkLocation right_loc{0, 0};
std::merge(range_begin, range_middle, range_middle, range_end,
temp_indices,
[&](uint64_t left, uint64_t right) {
// Both values are never null nor NaN.
- const auto left_loc = left_resolver_.Resolve(left);
- const auto right_loc = right_resolver_.Resolve(right);
+ left_loc =
+ left_resolver_.ResolveWithChunkIndexHint(left,
left_loc.chunk_index);
+ right_loc = right_resolver_.ResolveWithChunkIndexHint(
+ right, right_loc.chunk_index);
auto chunk_left =
first_sort_key.GetChunk<ArrayType>(left_loc);
auto chunk_right =
first_sort_key.GetChunk<ArrayType>(right_loc);
DCHECK(!chunk_left.IsNull());