This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new d5cda4afd2 GH-44084: [C++] Improve merge step in chunked sorting
(#44217)
d5cda4afd2 is described below
commit d5cda4afd25503c360efe08989c3c58e62369e2f
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Nov 26 13:53:06 2024 +0100
GH-44084: [C++] Improve merge step in chunked sorting (#44217)
### Rationale for this change
When merge-sorting the chunks of a chunked array or table, we would
currently repeatedly resolve the chunk indices for each individual value
lookup. This requires `O(n*log k)` chunk resolutions with `n` being the chunked
array or table length, and `k` the number of chunks.
Instead, this PR translates the logical indices to physical all at once,
without even requiring expensive chunk resolution as the logical indices are
initially chunk-partitioned.
This change yields significant speedups on chunked array and table sorting:
```
benchmark baseline
contender change %
counters
ChunkedArraySortIndicesInt64Narrow/1048576/100 345.419 MiB/sec
628.334 MiB/sec 81.905 {'family_index': 0,
'per_family_instance_index': 6, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/1048576/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 242, 'null_percent': 1.0}
TableSortIndicesInt64Narrow/1048576/0/1/32 25.997M items/sec
44.550M items/sec 71.366 {'family_index': 3, 'per_family_instance_index':
11, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/1/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 17, 'chunks': 32.0,
'columns': 1.0, 'null_percent': 0.0}
ChunkedArraySortIndicesInt64Wide/32768/10000 91.182 MiB/sec
153.756 MiB/sec 68.625 {'family_index': 1,
'per_family_instance_index': 0, 'run_name':
'ChunkedArraySortIndicesInt64Wide/32768/10000', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2067, 'null_percent': 0.01}
ChunkedArraySortIndicesInt64Wide/32768/10 96.536 MiB/sec
161.648 MiB/sec 67.449 {'family_index': 1,
'per_family_instance_index': 2, 'run_name':
'ChunkedArraySortIndicesInt64Wide/32768/10', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2238, 'null_percent': 10.0}
TableSortIndicesInt64Narrow/1048576/100/1/32 24.290M items/sec
40.513M items/sec 66.791 {'family_index': 3, 'per_family_instance_index':
9, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/1/32', 'repetitions':
1, 'repetition_index': 0, 'threads': 1, 'iterations': 16, 'chunks': 32.0,
'columns': 1.0, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Wide/32768/100 90.030 MiB/sec
149.633 MiB/sec 66.203 {'family_index': 1,
'per_family_instance_index': 1, 'run_name':
'ChunkedArraySortIndicesInt64Wide/32768/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2017, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Wide/32768/0 91.982 MiB/sec
152.840 MiB/sec 66.163 {'family_index':
1, 'per_family_instance_index': 5, 'run_name':
'ChunkedArraySortIndicesInt64Wide/32768/0', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2115, 'null_percent': 0.0}
ChunkedArraySortIndicesInt64Narrow/8388608/100 240.335 MiB/sec
387.423 MiB/sec 61.201 {'family_index': 0,
'per_family_instance_index': 7, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/8388608/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 21, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Wide/32768/2 172.376 MiB/sec
274.133 MiB/sec 59.032 {'family_index': 1,
'per_family_instance_index': 3, 'run_name':
'ChunkedArraySortIndicesInt64Wide/32768/2', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 3770, 'null_percent': 50.0}
TableSortIndicesInt64Wide/1048576/4/1/32 7.407M items/sec
11.621M items/sec 56.904 {'family_index': 4,
'per_family_instance_index': 10, 'run_name':
'TableSortIndicesInt64Wide/1048576/4/1/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 5, 'chunks': 32.0,
'columns': 1.0, 'null_percent': 25.0}
TableSortIndicesInt64Wide/1048576/100/1/32 5.788M items/sec
9.062M items/sec 56.565 {'family_index': 4, 'per_family_instance_index':
9, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/1/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 1.0, 'null_percent': 1.0}
TableSortIndicesInt64Wide/1048576/0/1/32 5.785M items/sec
9.049M items/sec 56.409 {'family_index': 4,
'per_family_instance_index': 11, 'run_name':
'TableSortIndicesInt64Wide/1048576/0/1/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 1.0, 'null_percent': 0.0}
ChunkedArraySortIndicesInt64Narrow/32768/2 194.743 MiB/sec
291.432 MiB/sec 49.649 {'family_index': 0,
'per_family_instance_index': 3, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/32768/2', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4340, 'null_percent': 50.0}
TableSortIndicesInt64Narrow/1048576/4/1/32 25.686M items/sec
38.087M items/sec 48.279 {'family_index': 3, 'per_family_instance_index':
10, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/1/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 17, 'chunks': 32.0,
'columns': 1.0, 'null_percent': 25.0}
TableSortIndicesInt64Wide/1048576/0/8/32 5.766M items/sec
8.374M items/sec 45.240 {'family_index': 4,
'per_family_instance_index': 5, 'run_name':
'TableSortIndicesInt64Wide/1048576/0/8/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 8.0, 'null_percent': 0.0}
TableSortIndicesInt64Wide/1048576/0/16/32 5.752M items/sec
8.352M items/sec 45.202 {'family_index': 4, 'per_family_instance_index':
2, 'run_name': 'TableSortIndicesInt64Wide/1048576/0/16/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 16.0, 'null_percent': 0.0}
ChunkedArraySortIndicesInt64Narrow/32768/10000 121.253 MiB/sec
175.286 MiB/sec 44.562 {'family_index': 0,
'per_family_instance_index': 0, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/32768/10000', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2673, 'null_percent': 0.01}
TableSortIndicesInt64Wide/1048576/100/2/32 5.549M items/sec
7.984M items/sec 43.876 {'family_index': 4, 'per_family_instance_index':
6, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/2/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 2.0, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Wide/1048576/100 69.599 MiB/sec
99.666 MiB/sec 43.200 {'family_index': 1,
'per_family_instance_index': 6, 'run_name':
'ChunkedArraySortIndicesInt64Wide/1048576/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 49, 'null_percent': 1.0}
TableSortIndicesInt64Narrow/1048576/0/1/4 55.940M items/sec
79.984M items/sec 42.982 {'family_index': 3,
'per_family_instance_index': 23, 'run_name':
'TableSortIndicesInt64Narrow/1048576/0/1/4', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 37, 'chunks': 4.0,
'columns': 1.0, 'null_percent': 0.0}
TableSortIndicesInt64Wide/1048576/100/16/32 5.554M items/sec
7.909M items/sec 42.417 {'family_index': 4, 'per_family_instance_index':
0, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/16/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 16.0, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Narrow/32768/10 127.758 MiB/sec
181.407 MiB/sec 41.992 {'family_index': 0,
'per_family_instance_index': 2, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/32768/10', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2856, 'null_percent': 10.0}
TableSortIndicesInt64Wide/1048576/100/8/32 5.572M items/sec
7.775M items/sec 39.548 {'family_index': 4, 'per_family_instance_index':
3, 'run_name': 'TableSortIndicesInt64Wide/1048576/100/8/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 8.0, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Narrow/32768/100 119.600 MiB/sec
166.454 MiB/sec 39.176 {'family_index': 0,
'per_family_instance_index': 1, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/32768/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2667, 'null_percent': 1.0}
TableSortIndicesInt64Wide/1048576/0/2/32 5.781M items/sec
8.016M items/sec 38.669 {'family_index': 4,
'per_family_instance_index': 8, 'run_name':
'TableSortIndicesInt64Wide/1048576/0/2/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 4, 'chunks': 32.0,
'columns': 2.0, 'null_percent': 0.0}
TableSortIndicesInt64Narrow/1048576/100/1/4 52.252M items/sec
72.193M items/sec 38.162 {'family_index': 3, 'per_family_instance_index':
21, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/1/4', 'repetitions':
1, 'repetition_index': 0, 'threads': 1, 'iterations': 35, 'chunks': 4.0,
'columns': 1.0, 'null_percent': 1.0}
ChunkedArraySortIndicesInt64Narrow/32768/0 121.868 MiB/sec
168.364 MiB/sec 38.152 {'family_index': 0,
'per_family_instance_index': 5, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/32768/0', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2691, 'null_percent': 0.0}
TableSortIndicesInt64Wide/1048576/4/2/32 5.017M items/sec
6.720M items/sec 33.934 {'family_index': 4,
'per_family_instance_index': 7, 'run_name':
'TableSortIndicesInt64Wide/1048576/4/2/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0,
'columns': 2.0, 'null_percent': 25.0}
ChunkedArraySortIndicesInt64Wide/8388608/100 54.785 MiB/sec
72.642 MiB/sec 32.593 {'family_index': 1,
'per_family_instance_index': 7, 'run_name':
'ChunkedArraySortIndicesInt64Wide/8388608/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 5, 'null_percent': 1.0}
TableSortIndicesInt64Wide/1048576/4/8/32 4.222M items/sec
5.483M items/sec 29.861 {'family_index': 4,
'per_family_instance_index': 4, 'run_name':
'TableSortIndicesInt64Wide/1048576/4/8/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0,
'columns': 8.0, 'null_percent': 25.0}
ChunkedArraySortIndicesString/32768/10 146.866 MiB/sec
190.314 MiB/sec 29.583 {'family_index':
2, 'per_family_instance_index': 2, 'run_name':
'ChunkedArraySortIndicesString/32768/10', 'repetitions': 1, 'repetition_index':
0, 'threads': 1, 'iterations': 3494, 'null_percent': 10.0}
TableSortIndicesInt64Wide/1048576/4/16/32 4.225M items/sec
5.433M items/sec 28.599 {'family_index': 4, 'per_family_instance_index':
1, 'run_name': 'TableSortIndicesInt64Wide/1048576/4/16/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0,
'columns': 16.0, 'null_percent': 25.0}
TableSortIndicesInt64Narrow/1048576/100/16/32 2.193M items/sec
2.711M items/sec 23.652 {'family_index': 3, 'per_family_instance_index': 0,
'run_name': 'TableSortIndicesInt64Narrow/1048576/100/16/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0,
'columns': 16.0, 'null_percent': 1.0}
ChunkedArraySortIndicesString/32768/100 156.401 MiB/sec
191.910 MiB/sec 22.704 {'family_index':
2, 'per_family_instance_index': 1, 'run_name':
'ChunkedArraySortIndicesString/32768/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 3488, 'null_percent': 1.0}
TableSortIndicesInt64Narrow/1048576/4/1/4 47.342M items/sec
58.062M items/sec 22.644 {'family_index': 3, 'per_family_instance_index':
22, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/1/4', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 32, 'chunks': 4.0,
'columns': 1.0, 'null_percent': 25.0}
ChunkedArraySortIndicesString/32768/0 161.457 MiB/sec
195.782 MiB/sec 21.259
{'family_index': 2, 'per_family_instance_index': 5, 'run_name':
'ChunkedArraySortIndicesString/32768/0', 'repetitions': 1, 'repetition_index':
0, 'threads': 1, 'iterations': 3644, 'null_percent': 0.0}
TableSortIndicesInt64Narrow/1048576/4/16/32 1.915M items/sec
2.309M items/sec 20.561 {'family_index': 3, 'per_family_instance_index': 1,
'run_name': 'TableSortIndicesInt64Narrow/1048576/4/16/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 1, 'chunks': 32.0,
'columns': 16.0, 'null_percent': 25.0}
TableSortIndicesInt64Narrow/1048576/0/16/32 2.561M items/sec
3.079M items/sec 20.208 {'family_index': 3, 'per_family_instance_index':
2, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/16/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0,
'columns': 16.0, 'null_percent': 0.0}
ChunkedArraySortIndicesString/32768/10000 157.786 MiB/sec
189.412 MiB/sec 20.043 {'family_index': 2,
'per_family_instance_index': 0, 'run_name':
'ChunkedArraySortIndicesString/32768/10000', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 3539, 'null_percent': 0.01}
ChunkedArraySortIndicesString/32768/2 139.241 MiB/sec
164.172 MiB/sec 17.904 {'family_index':
2, 'per_family_instance_index': 3, 'run_name':
'ChunkedArraySortIndicesString/32768/2', 'repetitions': 1, 'repetition_index':
0, 'threads': 1, 'iterations': 3155, 'null_percent': 50.0}
TableSortIndicesInt64Narrow/1048576/0/8/32 2.595M items/sec
3.038M items/sec 17.081 {'family_index': 3, 'per_family_instance_index':
5, 'run_name': 'TableSortIndicesInt64Narrow/1048576/0/8/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0,
'columns': 8.0, 'null_percent': 0.0}
TableSortIndicesInt64Narrow/1048576/4/8/32 1.999M items/sec
2.298M items/sec 14.936 {'family_index': 3, 'per_family_instance_index':
4, 'run_name': 'TableSortIndicesInt64Narrow/1048576/4/8/32', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 1, 'chunks': 32.0,
'columns': 8.0, 'null_percent': 25.0}
ChunkedArraySortIndicesString/8388608/100 81.026 MiB/sec
93.120 MiB/sec 14.926 {'family_index':
2, 'per_family_instance_index': 7, 'run_name':
'ChunkedArraySortIndicesString/8388608/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 7, 'null_percent': 1.0}
TableSortIndicesInt64Narrow/1048576/100/8/32 2.382M items/sec
2.719M items/sec 14.168 {'family_index': 3, 'per_family_instance_index':
3, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/8/32', 'repetitions':
1, 'repetition_index': 0, 'threads': 1, 'iterations': 2, 'chunks': 32.0,
'columns': 8.0, 'null_percent': 1.0}
ChunkedArraySortIndicesString/1048576/100 107.722 MiB/sec
122.229 MiB/sec 13.467 {'family_index':
2, 'per_family_instance_index': 6, 'run_name':
'ChunkedArraySortIndicesString/1048576/100', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 77, 'null_percent': 1.0}
TableSortIndicesInt64Narrow/1048576/100/2/32 4.019M items/sec
4.477M items/sec 11.383 {'family_index': 3, 'per_family_instance_index':
6, 'run_name': 'TableSortIndicesInt64Narrow/1048576/100/2/32', 'repetitions':
1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'chunks': 32.0,
'columns': 2.0, 'null_percent': 1.0}
TableSortIndicesInt64Wide/1048576/4/1/4 11.595M items/sec
12.791M items/sec 10.314 {'family_index': 4,
'per_family_instance_index': 22, 'run_name':
'TableSortIndicesInt64Wide/1048576/4/1/4', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 8, 'chunks': 4.0, 'columns':
1.0, 'null_percent': 25.0}
TableSortIndicesInt64Wide/1048576/0/1/4 9.231M items/sec
10.181M items/sec 10.294 {'family_index': 4,
'per_family_instance_index': 23, 'run_name':
'TableSortIndicesInt64Wide/1048576/0/1/4', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 6, 'chunks': 4.0, 'columns':
1.0, 'null_percent': 0.0}
```
However, performance also regresses when the input is all-nulls (which is
probably rare):
```
benchmark baseline
contender change %
counters
ChunkedArraySortIndicesString/32768/1 5.636 GiB/sec
4.336 GiB/sec -23.068 {'family_index': 2,
'per_family_instance_index': 4, 'run_name':
'ChunkedArraySortIndicesString/32768/1', 'repetitions': 1, 'repetition_index':
0, 'threads': 1, 'iterations': 127778, 'null_percent': 100.0}
ChunkedArraySortIndicesInt64Narrow/32768/1 3.963 GiB/sec
2.852 GiB/sec -28.025 {'family_index': 0,
'per_family_instance_index': 4, 'run_name':
'ChunkedArraySortIndicesInt64Narrow/32768/1', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 91209, 'null_percent': 100.0}
ChunkedArraySortIndicesInt64Wide/32768/1 4.038 GiB/sec
2.869 GiB/sec -28.954 {'family_index': 1,
'per_family_instance_index': 4, 'run_name':
'ChunkedArraySortIndicesInt64Wide/32768/1', 'repetitions': 1,
'repetition_index': 0, 'threads': 1, 'iterations': 94090, 'null_percent': 100.0}
```
### Are these changes tested?
Yes, by existing tests.
### Are there any user-facing changes?
No.
* GitHub Issue: #44084
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/chunk_resolver.cc | 10 +-
cpp/src/arrow/chunk_resolver.h | 10 +-
cpp/src/arrow/compute/kernels/chunked_internal.cc | 122 +++++++++
cpp/src/arrow/compute/kernels/chunked_internal.h | 121 +++++++--
cpp/src/arrow/compute/kernels/vector_rank.cc | 4 +-
cpp/src/arrow/compute/kernels/vector_sort.cc | 283 ++++++++++++---------
.../arrow/compute/kernels/vector_sort_internal.h | 141 +++++++---
8 files changed, 496 insertions(+), 196 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 5f6b568460..4e40056839 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -731,6 +731,7 @@ set(ARROW_COMPUTE_SRCS
compute/light_array_internal.cc
compute/ordering.cc
compute/registry.cc
+ compute/kernels/chunked_internal.cc
compute/kernels/codegen_internal.cc
compute/kernels/ree_util_internal.cc
compute/kernels/scalar_cast_boolean.cc
diff --git a/cpp/src/arrow/chunk_resolver.cc b/cpp/src/arrow/chunk_resolver.cc
index ca74ffa06c..7fc259f38c 100644
--- a/cpp/src/arrow/chunk_resolver.cc
+++ b/cpp/src/arrow/chunk_resolver.cc
@@ -28,6 +28,8 @@
namespace arrow {
+using util::span;
+
namespace {
template <typename T>
int64_t GetLength(const T& array) {
@@ -42,7 +44,7 @@ int64_t GetLength<std::shared_ptr<RecordBatch>>(
}
template <typename T>
-inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
+inline std::vector<int64_t> MakeChunksOffsets(span<T> chunks) {
std::vector<int64_t> offsets(chunks.size() + 1);
int64_t offset = 0;
std::transform(chunks.begin(), chunks.end(), offsets.begin(),
@@ -112,13 +114,13 @@ void ResolveManyInline(uint32_t num_offsets, const
int64_t* signed_offsets,
} // namespace
ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
- : offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}
+ : offsets_(MakeChunksOffsets(span(chunks))), cached_chunk_(0) {}
-ChunkResolver::ChunkResolver(const std::vector<const Array*>& chunks) noexcept
+ChunkResolver::ChunkResolver(span<const Array* const> chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}
ChunkResolver::ChunkResolver(const RecordBatchVector& batches) noexcept
- : offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {}
+ : offsets_(MakeChunksOffsets(span(batches))), cached_chunk_(0) {}
ChunkResolver::ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)),
diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h
index ab0e753d00..3d6458167f 100644
--- a/cpp/src/arrow/chunk_resolver.h
+++ b/cpp/src/arrow/chunk_resolver.h
@@ -26,6 +26,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
+#include "arrow/util/span.h"
namespace arrow {
@@ -76,11 +77,14 @@ class ARROW_EXPORT ChunkResolver {
public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;
-
- explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;
-
+ explicit ChunkResolver(util::span<const Array* const> chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;
+ /// \brief Construct a ChunkResolver from a vector of chunks.size() + 1
offsets.
+ ///
+ /// The first offset must be 0 and the last offset must be the logical
length of the
+ /// chunked array. Each offset before the last represents the starting
logical index of
+ /// the corresponding chunk.
explicit ChunkResolver(std::vector<int64_t> offsets) noexcept
: offsets_(std::move(offsets)), cached_chunk_(0) {
#ifndef NDEBUG
diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.cc
b/cpp/src/arrow/compute/kernels/chunked_internal.cc
new file mode 100644
index 0000000000..e72b8e1f5b
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/chunked_internal.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/chunked_internal.h"
+
+#include <algorithm>
+
+#include "arrow/record_batch.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::compute::internal {
+
+std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
+ std::vector<const Array*> pointers(arrays.size());
+ std::transform(arrays.begin(), arrays.end(), pointers.begin(),
+ [&](const std::shared_ptr<Array>& array) { return
array.get(); });
+ return pointers;
+}
+
+std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
+ util::span<const Array* const> chunks) {
+ std::vector<int64_t> chunk_lengths(chunks.size());
+ for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
+ chunk_lengths[i] = chunks[i]->length();
+ }
+ return chunk_lengths;
+}
+
+std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
+ const RecordBatchVector& chunks) {
+ std::vector<int64_t> chunk_lengths(chunks.size());
+ for (int64_t i = 0; i < static_cast<int64_t>(chunks.size()); ++i) {
+ chunk_lengths[i] = chunks[i]->num_rows();
+ }
+ return chunk_lengths;
+}
+
+Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
+ChunkedIndexMapper::LogicalToPhysical() {
+ // Check that indices would fall in bounds for CompressedChunkLocation
+ if (ARROW_PREDICT_FALSE(chunk_lengths_.size() >
+ CompressedChunkLocation::kMaxChunkIndex + 1)) {
+ return Status::NotImplemented("Chunked array has more than ",
+ CompressedChunkLocation::kMaxChunkIndex + 1,
" chunks");
+ }
+ for (int64_t chunk_length : chunk_lengths_) {
+ if (ARROW_PREDICT_FALSE(static_cast<uint64_t>(chunk_length) >
+ CompressedChunkLocation::kMaxIndexInChunk + 1)) {
+ return Status::NotImplemented("Individual chunk in chunked array has
more than ",
+ CompressedChunkLocation::kMaxIndexInChunk
+ 1,
+ " elements");
+ }
+ }
+
+ const int64_t num_indices = static_cast<int64_t>(indices_end_ -
indices_begin_);
+ DCHECK_EQ(num_indices, std::accumulate(chunk_lengths_.begin(),
chunk_lengths_.end(),
+ static_cast<int64_t>(0)));
+ CompressedChunkLocation* physical_begin =
+ reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
+ DCHECK_EQ(physical_begin + num_indices,
+ reinterpret_cast<CompressedChunkLocation*>(indices_end_));
+
+ int64_t chunk_offset = 0;
+ for (int64_t chunk_index = 0; chunk_index <
static_cast<int64_t>(chunk_lengths_.size());
+ ++chunk_index) {
+ const int64_t chunk_length = chunk_lengths_[chunk_index];
+ for (int64_t i = 0; i < chunk_length; ++i) {
+ // Logical indices are expected to be chunk-partitioned, which avoids
costly
+ // chunked index resolution.
+ DCHECK_GE(indices_begin_[chunk_offset + i],
static_cast<uint64_t>(chunk_offset));
+ DCHECK_LT(indices_begin_[chunk_offset + i],
+ static_cast<uint64_t>(chunk_offset + chunk_length));
+ physical_begin[chunk_offset + i] = CompressedChunkLocation{
+ static_cast<uint64_t>(chunk_index),
+ indices_begin_[chunk_offset + i] -
static_cast<uint64_t>(chunk_offset)};
+ }
+ chunk_offset += chunk_length;
+ }
+
+ return std::pair{physical_begin, physical_begin + num_indices};
+}
+
+Status ChunkedIndexMapper::PhysicalToLogical() {
+ std::vector<int64_t> chunk_offsets(chunk_lengths_.size());
+ {
+ int64_t offset = 0;
+ for (int64_t i = 0; i < static_cast<int64_t>(chunk_lengths_.size()); ++i) {
+ chunk_offsets[i] = offset;
+ offset += chunk_lengths_[i];
+ }
+ }
+
+ const int64_t num_indices = static_cast<int64_t>(indices_end_ -
indices_begin_);
+ CompressedChunkLocation* physical_begin =
+ reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
+ for (int64_t i = 0; i < num_indices; ++i) {
+ const auto loc = physical_begin[i];
+ DCHECK_LT(loc.chunk_index(), chunk_offsets.size());
+ DCHECK_LT(loc.index_in_chunk(),
+ static_cast<uint64_t>(chunk_lengths_[loc.chunk_index()]));
+ indices_begin_[i] =
+ chunk_offsets[loc.chunk_index()] +
static_cast<int64_t>(loc.index_in_chunk());
+ }
+
+ return Status::OK();
+}
+
+} // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.h
b/cpp/src/arrow/compute/kernels/chunked_internal.h
index f7cb615f3e..5bc8233016 100644
--- a/cpp/src/arrow/compute/kernels/chunked_internal.h
+++ b/cpp/src/arrow/compute/kernels/chunked_internal.h
@@ -20,26 +20,32 @@
#include <algorithm>
#include <cstdint>
#include <memory>
+#include <utility>
#include <vector>
#include "arrow/array.h"
#include "arrow/chunk_resolver.h"
#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/util/span.h"
-namespace arrow {
-namespace compute {
-namespace internal {
+namespace arrow::compute::internal {
// The target chunk in a chunked array.
struct ResolvedChunk {
// The target array in chunked array.
const Array* array;
// The index in the target array.
- const int64_t index;
+ int64_t index;
ResolvedChunk(const Array* array, int64_t index) : array(array),
index(index) {}
- public:
+ friend bool operator==(const ResolvedChunk& left, const ResolvedChunk&
right) {
+ return left.array == right.array && left.index == right.index;
+ }
+ friend bool operator!=(const ResolvedChunk& left, const ResolvedChunk&
right) {
+ return left.array != right.array || left.index != right.index;
+ }
+
bool IsNull() const { return array->IsNull(index); }
template <typename ArrowType, typename ViewType = GetViewType<ArrowType>>
@@ -50,20 +56,63 @@ struct ResolvedChunk {
}
};
+// A compressed (chunk_index, index_in_chunk) pair.
+// The goal of compression is to make it fit in 64 bits, allowing in place
+// replacement of logical uint64_t indices with physical indices.
+// (see ChunkedIndexMapper)
+struct CompressedChunkLocation {
+ static constexpr int kChunkIndexBits = 24;
+ static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;
+
+ static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1;
+ static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1;
+
+ CompressedChunkLocation() = default;
+
+ constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; }
+ constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits;
}
+
+ explicit constexpr CompressedChunkLocation(uint64_t chunk_index,
+ uint64_t index_in_chunk)
+ : data_((index_in_chunk << kChunkIndexBits) | chunk_index) {}
+
+ template <typename IndexType>
+ explicit operator TypedChunkLocation<IndexType>() {
+ return {static_cast<IndexType>(chunk_index()),
+ static_cast<IndexType>(index_in_chunk())};
+ }
+
+ private:
+ uint64_t data_;
+};
+
+static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation));
+
class ChunkedArrayResolver {
private:
ChunkResolver resolver_;
- std::vector<const Array*> chunks_;
+ util::span<const Array* const> chunks_;
+ std::vector<const Array*> owned_chunks_;
public:
- explicit ChunkedArrayResolver(const std::vector<const Array*>& chunks)
+ explicit ChunkedArrayResolver(std::vector<const Array*>&& chunks)
+ : resolver_(chunks), chunks_(chunks), owned_chunks_(std::move(chunks)) {}
+ explicit ChunkedArrayResolver(util::span<const Array* const> chunks)
: resolver_(chunks), chunks_(chunks) {}
- ChunkedArrayResolver(ChunkedArrayResolver&& other) = default;
- ChunkedArrayResolver& operator=(ChunkedArrayResolver&& other) = default;
+ ARROW_DEFAULT_MOVE_AND_ASSIGN(ChunkedArrayResolver);
- ChunkedArrayResolver(const ChunkedArrayResolver& other) = default;
- ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) = default;
+ ChunkedArrayResolver(const ChunkedArrayResolver& other)
+ : resolver_(other.resolver_), owned_chunks_(other.owned_chunks_) {
+ // Rebind span to owned_chunks_ if necessary
+ chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
+ }
+ ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) {
+ resolver_ = other.resolver_;
+ owned_chunks_ = other.owned_chunks_;
+ chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
+ return *this;
+ }
ResolvedChunk Resolve(int64_t index) const {
const auto loc = resolver_.Resolve(index);
@@ -71,13 +120,45 @@ class ChunkedArrayResolver {
}
};
-inline std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
- std::vector<const Array*> pointers(arrays.size());
- std::transform(arrays.begin(), arrays.end(), pointers.begin(),
- [&](const std::shared_ptr<Array>& array) { return
array.get(); });
- return pointers;
-}
+std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays);
+
+// A class that turns logical (linear) indices into physical (chunked) indices,
+// and vice-versa.
+class ChunkedIndexMapper {
+ public:
+ ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t*
indices_begin,
+ uint64_t* indices_end)
+ : ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}
+ ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t*
indices_begin,
+ uint64_t* indices_end)
+ : chunk_lengths_(GetChunkLengths(chunks)),
+ indices_begin_(indices_begin),
+ indices_end_(indices_end) {}
+ ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin,
+ uint64_t* indices_end)
+ : chunk_lengths_(GetChunkLengths(chunks)),
+ indices_begin_(indices_begin),
+ indices_end_(indices_end) {}
+
+ // Turn the original uint64_t logical indices into physical. This reuses the
+ // same memory area, so the logical indices cannot be used anymore until
+ // PhysicalToLogical() is called.
+ //
+ // This assumes that the logical indices are originally chunk-partitioned.
+ Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
+ LogicalToPhysical();
+
+ // Turn the physical indices back into logical, making the uint64_t indices
+ // usable again.
+ Status PhysicalToLogical();
+
+ private:
+ static std::vector<int64_t> GetChunkLengths(util::span<const Array* const>
chunks);
+ static std::vector<int64_t> GetChunkLengths(const RecordBatchVector& chunks);
+
+ std::vector<int64_t> chunk_lengths_;
+ uint64_t* indices_begin_;
+ uint64_t* indices_end_;
+};
-} // namespace internal
-} // namespace compute
-} // namespace arrow
+} // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/kernels/vector_rank.cc
b/cpp/src/arrow/compute/kernels/vector_rank.cc
index c4e5270141..b374862fe6 100644
--- a/cpp/src/arrow/compute/kernels/vector_rank.cc
+++ b/cpp/src/arrow/compute/kernels/vector_rank.cc
@@ -21,6 +21,8 @@
namespace arrow::compute::internal {
+using ::arrow::util::span;
+
namespace {
// ----------------------------------------------------------------------
@@ -237,7 +239,7 @@ class Ranker<ChunkedArray> : public
RankerMixin<ChunkedArray, Ranker<ChunkedArra
physical_chunks_, order_, null_placement_));
const auto arrays = GetArrayPointers(physical_chunks_);
- auto value_selector = [resolver = ChunkedArrayResolver(arrays)](int64_t
index) {
+ auto value_selector = [resolver =
ChunkedArrayResolver(span(arrays))](int64_t index) {
return resolver.Resolve(index).Value<InType>();
};
ARROW_ASSIGN_OR_RAISE(*output_, CreateRankings(ctx_, sorted,
null_placement_,
diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc
b/cpp/src/arrow/compute/kernels/vector_sort.cc
index 395ed86a06..d81187837d 100644
--- a/cpp/src/arrow/compute/kernels/vector_sort.cc
+++ b/cpp/src/arrow/compute/kernels/vector_sort.cc
@@ -24,6 +24,7 @@
namespace arrow {
using internal::checked_cast;
+using util::span;
namespace compute {
namespace internal {
@@ -82,6 +83,7 @@ class ChunkedArraySorter : public TypeVisitor {
*output_ = {indices_end_, indices_end_, indices_end_, indices_end_};
return Status::OK();
}
+ const int64_t num_indices = static_cast<int64_t>(indices_end_ -
indices_begin_);
const auto arrays = GetArrayPointers(physical_chunks_);
// Sort each chunk independently and merge to sorted indices.
@@ -101,45 +103,63 @@ class ChunkedArraySorter : public TypeVisitor {
begin_offset, options,
ctx_));
begin_offset = end_offset;
}
- DCHECK_EQ(end_offset, indices_end_ - indices_begin_);
+ DCHECK_EQ(end_offset, num_indices);
// Then merge them by pairs, recursively
if (sorted.size() > 1) {
- auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle,
- uint64_t* nulls_end, uint64_t* temp_indices,
- int64_t null_count) {
+ ChunkedIndexMapper chunked_mapper(arrays, indices_begin_, indices_end_);
+ ARROW_ASSIGN_OR_RAISE(auto chunked_indices_pair,
+ chunked_mapper.LogicalToPhysical());
+ auto [chunked_indices_begin, chunked_indices_end] = chunked_indices_pair;
+
+ std::vector<ChunkedNullPartitionResult> chunk_sorted(num_chunks);
+ for (int i = 0; i < num_chunks; ++i) {
+ chunk_sorted[i] = sorted[i].TranslateTo(indices_begin_,
chunked_indices_begin);
+ }
+
+ auto merge_nulls = [&](CompressedChunkLocation* nulls_begin,
+ CompressedChunkLocation* nulls_middle,
+ CompressedChunkLocation* nulls_end,
+ CompressedChunkLocation* temp_indices, int64_t
null_count) {
if (has_null_like_values<typename ArrayType::TypeClass>::value) {
- PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end,
- ChunkedArrayResolver(arrays),
null_count,
- null_placement_);
+ PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end, arrays,
+ null_count, null_placement_);
}
};
- auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle,
- uint64_t* range_end, uint64_t* temp_indices) {
- MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
- temp_indices);
- };
-
- MergeImpl merge_impl{null_placement_, std::move(merge_nulls),
- std::move(merge_non_nulls)};
+ auto merge_non_nulls =
+ [&](CompressedChunkLocation* range_begin, CompressedChunkLocation*
range_middle,
+ CompressedChunkLocation* range_end, CompressedChunkLocation*
temp_indices) {
+ MergeNonNulls<ArrayType>(range_begin, range_middle, range_end,
arrays,
+ temp_indices);
+ };
+
+ ChunkedMergeImpl merge_impl{null_placement_, std::move(merge_nulls),
+ std::move(merge_non_nulls)};
// std::merge is only called on non-null values, so size temp indices
accordingly
- RETURN_NOT_OK(merge_impl.Init(ctx_, indices_end_ - indices_begin_ -
null_count));
+ RETURN_NOT_OK(merge_impl.Init(ctx_, num_indices - null_count));
- while (sorted.size() > 1) {
- auto out_it = sorted.begin();
- auto it = sorted.begin();
- while (it < sorted.end() - 1) {
+ while (chunk_sorted.size() > 1) {
+ // Merge all pairs of chunks
+ auto out_it = chunk_sorted.begin();
+ auto it = chunk_sorted.begin();
+ while (it < chunk_sorted.end() - 1) {
const auto& left = *it++;
const auto& right = *it++;
DCHECK_EQ(left.overall_end(), right.overall_begin());
const auto merged = merge_impl.Merge(left, right, null_count);
*out_it++ = merged;
}
- if (it < sorted.end()) {
+ if (it < chunk_sorted.end()) {
*out_it++ = *it++;
}
- sorted.erase(out_it, sorted.end());
+ chunk_sorted.erase(out_it, chunk_sorted.end());
}
+
+ // Reverse everything
+ sorted.resize(1);
+ sorted[0] = chunk_sorted[0].TranslateTo(chunked_indices_begin,
indices_begin_);
+
+ RETURN_NOT_OK(chunked_mapper.PhysicalToLogical());
}
DCHECK_EQ(sorted.size(), 1);
@@ -153,34 +173,39 @@ class ChunkedArraySorter : public TypeVisitor {
}
template <typename ArrayType>
- void MergeNonNulls(uint64_t* range_begin, uint64_t* range_middle, uint64_t*
range_end,
- const std::vector<const Array*>& arrays, uint64_t*
temp_indices) {
+ void MergeNonNulls(CompressedChunkLocation* range_begin,
+ CompressedChunkLocation* range_middle,
+ CompressedChunkLocation* range_end, span<const Array*
const> arrays,
+ CompressedChunkLocation* temp_indices) {
using ArrowType = typename ArrayType::TypeClass;
- const ChunkedArrayResolver left_resolver(arrays);
- const ChunkedArrayResolver right_resolver(arrays);
if (order_ == SortOrder::Ascending) {
std::merge(range_begin, range_middle, range_middle, range_end,
temp_indices,
- [&](uint64_t left, uint64_t right) {
- const auto chunk_left = left_resolver.Resolve(left);
- const auto chunk_right = right_resolver.Resolve(right);
- return chunk_left.Value<ArrowType>() <
chunk_right.Value<ArrowType>();
+ [&](CompressedChunkLocation left, CompressedChunkLocation
right) {
+ return ChunkValue<ArrowType>(arrays, left) <
+ ChunkValue<ArrowType>(arrays, right);
});
} else {
std::merge(range_begin, range_middle, range_middle, range_end,
temp_indices,
- [&](uint64_t left, uint64_t right) {
- const auto chunk_left = left_resolver.Resolve(left);
- const auto chunk_right = right_resolver.Resolve(right);
+ [&](CompressedChunkLocation left, CompressedChunkLocation
right) {
// We don't use 'left > right' here to reduce required
// operator. If we use 'right < left' here, '<' is only
// required.
- return chunk_right.Value<ArrowType>() <
chunk_left.Value<ArrowType>();
+ return ChunkValue<ArrowType>(arrays, right) <
+ ChunkValue<ArrowType>(arrays, left);
});
}
// Copy back temp area into main buffer
std::copy(temp_indices, temp_indices + (range_end - range_begin),
range_begin);
}
+ template <typename ArrowType>
+ auto ChunkValue(span<const Array* const> arrays, CompressedChunkLocation
loc) const {
+ return ResolvedChunk(arrays[loc.chunk_index()],
+ static_cast<int64_t>(loc.index_in_chunk()))
+ .template Value<ArrowType>();
+ }
+
uint64_t* indices_begin_;
uint64_t* indices_end_;
const std::shared_ptr<DataType>& physical_type_;
@@ -610,8 +635,6 @@ class TableSorter {
batches_(MakeBatches(table, &status_)),
options_(options),
null_placement_(options.null_placement),
- left_resolver_(batches_),
- right_resolver_(batches_),
sort_keys_(ResolveSortKeys(table, batches_, options.sort_keys,
&status_)),
indices_begin_(indices_begin),
indices_end_(indices_end),
@@ -674,14 +697,24 @@ class TableSorter {
// Then merge them by pairs, recursively
if (sorted.size() > 1) {
+ ChunkedIndexMapper chunked_mapper(batches_, indices_begin_,
indices_end_);
+ ARROW_ASSIGN_OR_RAISE(auto chunked_indices_pair,
+ chunked_mapper.LogicalToPhysical());
+ auto [chunked_indices_begin, chunked_indices_end] = chunked_indices_pair;
+
+ std::vector<ChunkedNullPartitionResult> chunk_sorted(num_batches);
+ for (int64_t i = 0; i < num_batches; ++i) {
+ chunk_sorted[i] = sorted[i].TranslateTo(indices_begin_,
chunked_indices_begin);
+ }
+
struct Visitor {
TableSorter* sorter;
- std::vector<NullPartitionResult>* sorted;
+ std::vector<ChunkedNullPartitionResult>* chunk_sorted;
int64_t null_count;
-#define VISIT(TYPE) \
- Status Visit(const TYPE& type) { \
- return sorter->MergeInternal<TYPE>(std::move(*sorted), null_count); \
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { \
+ return sorter->MergeInternal<TYPE>(chunk_sorted, null_count); \
}
VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
@@ -693,104 +726,101 @@ class TableSorter {
type.ToString());
}
};
- Visitor visitor{this, &sorted, null_count};
+ Visitor visitor{this, &chunk_sorted, null_count};
RETURN_NOT_OK(VisitTypeInline(*sort_keys_[0].type, &visitor));
+
+ DCHECK_EQ(chunk_sorted.size(), 1);
+ DCHECK_EQ(chunk_sorted[0].overall_begin(), chunked_indices_begin);
+ DCHECK_EQ(chunk_sorted[0].overall_end(), chunked_indices_end);
+
+ RETURN_NOT_OK(chunked_mapper.PhysicalToLogical());
}
return Status::OK();
}
// Recursive merge routine, typed on the first sort key
- template <typename Type>
- Status MergeInternal(std::vector<NullPartitionResult> sorted, int64_t
null_count) {
- auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle,
- uint64_t* nulls_end, uint64_t* temp_indices,
- int64_t null_count) {
- MergeNulls<Type>(nulls_begin, nulls_middle, nulls_end, temp_indices,
null_count);
+ template <typename ArrowType>
+ Status MergeInternal(std::vector<ChunkedNullPartitionResult>* sorted,
+ int64_t null_count) {
+ auto merge_nulls = [&](CompressedChunkLocation* nulls_begin,
+ CompressedChunkLocation* nulls_middle,
+ CompressedChunkLocation* nulls_end,
+ CompressedChunkLocation* temp_indices, int64_t
null_count) {
+ MergeNulls<ArrowType>(nulls_begin, nulls_middle, nulls_end, temp_indices,
+ null_count);
};
- auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle,
- uint64_t* range_end, uint64_t* temp_indices) {
- MergeNonNulls<Type>(range_begin, range_middle, range_end, temp_indices);
- };
-
- MergeImpl merge_impl(options_.null_placement, std::move(merge_nulls),
- std::move(merge_non_nulls));
+ auto merge_non_nulls =
+ [&](CompressedChunkLocation* range_begin, CompressedChunkLocation*
range_middle,
+ CompressedChunkLocation* range_end, CompressedChunkLocation*
temp_indices) {
+ MergeNonNulls<ArrowType>(range_begin, range_middle, range_end,
temp_indices);
+ };
+
+ ChunkedMergeImpl merge_impl(options_.null_placement,
std::move(merge_nulls),
+ std::move(merge_non_nulls));
RETURN_NOT_OK(merge_impl.Init(ctx_, table_.num_rows()));
- while (sorted.size() > 1) {
- auto out_it = sorted.begin();
- auto it = sorted.begin();
- while (it < sorted.end() - 1) {
+ while (sorted->size() > 1) {
+ auto out_it = sorted->begin();
+ auto it = sorted->begin();
+ while (it < sorted->end() - 1) {
const auto& left = *it++;
const auto& right = *it++;
DCHECK_EQ(left.overall_end(), right.overall_begin());
*out_it++ = merge_impl.Merge(left, right, null_count);
}
- if (it < sorted.end()) {
+ if (it < sorted->end()) {
*out_it++ = *it++;
}
- sorted.erase(out_it, sorted.end());
+ sorted->erase(out_it, sorted->end());
}
- DCHECK_EQ(sorted.size(), 1);
- DCHECK_EQ(sorted[0].overall_begin(), indices_begin_);
- DCHECK_EQ(sorted[0].overall_end(), indices_end_);
return comparator_.status();
}
- // Merge rows with a null or a null-like in the first sort key
- template <typename Type>
- enable_if_t<has_null_like_values<Type>::value> MergeNulls(uint64_t*
nulls_begin,
- uint64_t*
nulls_middle,
- uint64_t*
nulls_end,
- uint64_t*
temp_indices,
- int64_t
null_count) {
- auto& comparator = comparator_;
- const auto& first_sort_key = sort_keys_[0];
-
- ChunkLocation left_loc;
- ChunkLocation right_loc;
- std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end,
temp_indices,
- [&](uint64_t left, uint64_t right) {
- // First column is either null or nan
- left_loc = left_resolver_.ResolveWithHint(left,
/*hint=*/left_loc);
- right_loc = right_resolver_.ResolveWithHint(right,
/*hint=*/right_loc);
- auto chunk_left = first_sort_key.GetChunk(left_loc);
- auto chunk_right = first_sort_key.GetChunk(right_loc);
- const auto left_is_null = chunk_left.IsNull();
- const auto right_is_null = chunk_right.IsNull();
- if (left_is_null == right_is_null) {
- return comparator.Compare(left_loc, right_loc, 1);
- } else if (options_.null_placement == NullPlacement::AtEnd) {
- return right_is_null;
- } else {
- return left_is_null;
- }
- });
- // Copy back temp area into main buffer
- std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin),
nulls_begin);
- }
-
- template <typename Type>
- enable_if_t<!has_null_like_values<Type>::value> MergeNulls(uint64_t*
nulls_begin,
- uint64_t*
nulls_middle,
- uint64_t*
nulls_end,
- uint64_t*
temp_indices,
- int64_t
null_count) {
- MergeNullsOnly(nulls_begin, nulls_middle, nulls_end, temp_indices,
null_count);
+ template <typename ArrowType>
+ void MergeNulls(CompressedChunkLocation* nulls_begin,
+ CompressedChunkLocation* nulls_middle,
+ CompressedChunkLocation* nulls_end,
+ CompressedChunkLocation* temp_indices, int64_t null_count) {
+ if constexpr (has_null_like_values<ArrowType>::value) {
+ // Merge rows with a null or a null-like in the first sort key
+ auto& comparator = comparator_;
+ const auto& first_sort_key = sort_keys_[0];
+
+ std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end,
temp_indices,
+ [&](CompressedChunkLocation left, CompressedChunkLocation
right) {
+ // First column is either null or nan
+ const auto left_loc = ChunkLocation{left};
+ const auto right_loc = ChunkLocation{right};
+ const auto chunk_left = first_sort_key.GetChunk(left_loc);
+ const auto chunk_right = first_sort_key.GetChunk(right_loc);
+ const auto left_is_null = chunk_left.IsNull();
+ const auto right_is_null = chunk_right.IsNull();
+ if (left_is_null == right_is_null) {
+ return comparator.Compare(left_loc, right_loc, 1);
+ } else if (options_.null_placement == NullPlacement::AtEnd)
{
+ return right_is_null;
+ } else {
+ return left_is_null;
+ }
+ });
+ // Copy back temp area into main buffer
+ std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin),
nulls_begin);
+ } else {
+ MergeNullsOnly(nulls_begin, nulls_middle, nulls_end, temp_indices,
null_count);
+ }
}
- void MergeNullsOnly(uint64_t* nulls_begin, uint64_t* nulls_middle, uint64_t*
nulls_end,
- uint64_t* temp_indices, int64_t null_count) {
+ void MergeNullsOnly(CompressedChunkLocation* nulls_begin,
+ CompressedChunkLocation* nulls_middle,
+ CompressedChunkLocation* nulls_end,
+ CompressedChunkLocation* temp_indices, int64_t
null_count) {
// Untyped implementation
auto& comparator = comparator_;
- ChunkLocation left_loc;
- ChunkLocation right_loc;
std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end,
temp_indices,
- [&](uint64_t left, uint64_t right) {
+ [&](CompressedChunkLocation left, CompressedChunkLocation
right) {
// First column is always null
- left_loc = left_resolver_.ResolveWithHint(left,
/*hint=*/left_loc);
- right_loc = right_resolver_.ResolveWithHint(right,
/*hint=*/right_loc);
- return comparator.Compare(left_loc, right_loc, 1);
+ return comparator.Compare(ChunkLocation{left},
ChunkLocation{right}, 1);
});
// Copy back temp area into main buffer
std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin),
nulls_begin);
@@ -799,27 +829,24 @@ class TableSorter {
//
// Merge rows with a non-null in the first sort key
//
- template <typename Type>
- enable_if_t<!is_null_type<Type>::value> MergeNonNulls(uint64_t* range_begin,
- uint64_t* range_middle,
- uint64_t* range_end,
- uint64_t*
temp_indices) {
+ template <typename ArrowType>
+ enable_if_t<!is_null_type<ArrowType>::value> MergeNonNulls(
+ CompressedChunkLocation* range_begin, CompressedChunkLocation*
range_middle,
+ CompressedChunkLocation* range_end, CompressedChunkLocation*
temp_indices) {
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];
- ChunkLocation left_loc;
- ChunkLocation right_loc;
std::merge(range_begin, range_middle, range_middle, range_end,
temp_indices,
- [&](uint64_t left, uint64_t right) {
+ [&](CompressedChunkLocation left, CompressedChunkLocation
right) {
// Both values are never null nor NaN.
- left_loc = left_resolver_.ResolveWithHint(left,
/*hint=*/left_loc);
- right_loc = right_resolver_.ResolveWithHint(right,
/*hint=*/right_loc);
+ const auto left_loc = ChunkLocation{left};
+ const auto right_loc = ChunkLocation{right};
auto chunk_left = first_sort_key.GetChunk(left_loc);
auto chunk_right = first_sort_key.GetChunk(right_loc);
DCHECK(!chunk_left.IsNull());
DCHECK(!chunk_right.IsNull());
- auto value_left = chunk_left.Value<Type>();
- auto value_right = chunk_right.Value<Type>();
+ const auto value_left = chunk_left.Value<ArrowType>();
+ const auto value_right = chunk_right.Value<ArrowType>();
if (value_left == value_right) {
// If the left value equals to the right value,
// we need to compare the second and following
@@ -834,13 +861,16 @@ class TableSorter {
}
}
});
+
// Copy back temp area into main buffer
std::copy(temp_indices, temp_indices + (range_end - range_begin),
range_begin);
}
- template <typename Type>
- enable_if_null<Type> MergeNonNulls(uint64_t* range_begin, uint64_t*
range_middle,
- uint64_t* range_end, uint64_t*
temp_indices) {
+ template <typename ArrowType>
+ enable_if_null<ArrowType> MergeNonNulls(CompressedChunkLocation* range_begin,
+ CompressedChunkLocation*
range_middle,
+ CompressedChunkLocation* range_end,
+ CompressedChunkLocation*
temp_indices) {
const int64_t null_count = range_end - range_begin;
MergeNullsOnly(range_begin, range_middle, range_end, temp_indices,
null_count);
}
@@ -851,7 +881,6 @@ class TableSorter {
const RecordBatchVector batches_;
const SortOptions& options_;
const NullPlacement null_placement_;
- const ::arrow::ChunkResolver left_resolver_, right_resolver_;
const std::vector<ResolvedSortKey> sort_keys_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
diff --git a/cpp/src/arrow/compute/kernels/vector_sort_internal.h
b/cpp/src/arrow/compute/kernels/vector_sort_internal.h
index bee7f838a0..97a2db1d11 100644
--- a/cpp/src/arrow/compute/kernels/vector_sort_internal.h
+++ b/cpp/src/arrow/compute/kernels/vector_sort_internal.h
@@ -55,15 +55,17 @@ namespace internal {
// NOTE: std::partition is usually faster than std::stable_partition.
struct NonStablePartitioner {
- template <typename Predicate>
- uint64_t* operator()(uint64_t* indices_begin, uint64_t* indices_end,
Predicate&& pred) {
+ template <typename Predicate, typename IndexType>
+ IndexType* operator()(IndexType* indices_begin, IndexType* indices_end,
+ Predicate&& pred) {
return std::partition(indices_begin, indices_end,
std::forward<Predicate>(pred));
}
};
struct StablePartitioner {
- template <typename Predicate>
- uint64_t* operator()(uint64_t* indices_begin, uint64_t* indices_end,
Predicate&& pred) {
+ template <typename Predicate, typename IndexType>
+ IndexType* operator()(IndexType* indices_begin, IndexType* indices_end,
+ Predicate&& pred) {
return std::stable_partition(indices_begin, indices_end,
std::forward<Predicate>(pred));
}
@@ -142,22 +144,24 @@ int CompareTypeValues(const Value& left, const Value&
right, SortOrder order,
return ValueComparator<Type>::Compare(left, right, order, null_placement);
}
-struct NullPartitionResult {
- uint64_t* non_nulls_begin;
- uint64_t* non_nulls_end;
- uint64_t* nulls_begin;
- uint64_t* nulls_end;
+template <typename IndexType>
+struct GenericNullPartitionResult {
+ IndexType* non_nulls_begin;
+ IndexType* non_nulls_end;
+ IndexType* nulls_begin;
+ IndexType* nulls_end;
- uint64_t* overall_begin() const { return std::min(nulls_begin,
non_nulls_begin); }
+ IndexType* overall_begin() const { return std::min(nulls_begin,
non_nulls_begin); }
- uint64_t* overall_end() const { return std::max(nulls_end, non_nulls_end); }
+ IndexType* overall_end() const { return std::max(nulls_end, non_nulls_end); }
int64_t non_null_count() const { return non_nulls_end - non_nulls_begin; }
int64_t null_count() const { return nulls_end - nulls_begin; }
- static NullPartitionResult NoNulls(uint64_t* indices_begin, uint64_t*
indices_end,
- NullPlacement null_placement) {
+ static GenericNullPartitionResult NoNulls(IndexType* indices_begin,
+ IndexType* indices_end,
+ NullPlacement null_placement) {
if (null_placement == NullPlacement::AtStart) {
return {indices_begin, indices_end, indices_begin, indices_begin};
} else {
@@ -165,8 +169,9 @@ struct NullPartitionResult {
}
}
- static NullPartitionResult NullsOnly(uint64_t* indices_begin, uint64_t*
indices_end,
- NullPlacement null_placement) {
+ static GenericNullPartitionResult NullsOnly(IndexType* indices_begin,
+ IndexType* indices_end,
+ NullPlacement null_placement) {
if (null_placement == NullPlacement::AtStart) {
return {indices_end, indices_end, indices_begin, indices_end};
} else {
@@ -174,21 +179,37 @@ struct NullPartitionResult {
}
}
- static NullPartitionResult NullsAtEnd(uint64_t* indices_begin, uint64_t*
indices_end,
- uint64_t* midpoint) {
+ static GenericNullPartitionResult NullsAtEnd(IndexType* indices_begin,
+ IndexType* indices_end,
+ IndexType* midpoint) {
DCHECK_GE(midpoint, indices_begin);
DCHECK_LE(midpoint, indices_end);
return {indices_begin, midpoint, midpoint, indices_end};
}
- static NullPartitionResult NullsAtStart(uint64_t* indices_begin, uint64_t*
indices_end,
- uint64_t* midpoint) {
+ static GenericNullPartitionResult NullsAtStart(IndexType* indices_begin,
+ IndexType* indices_end,
+ IndexType* midpoint) {
DCHECK_GE(midpoint, indices_begin);
DCHECK_LE(midpoint, indices_end);
return {midpoint, indices_end, indices_begin, midpoint};
}
+
+ template <typename TargetIndexType>
+ GenericNullPartitionResult<TargetIndexType> TranslateTo(
+ IndexType* indices_begin, TargetIndexType* target_indices_begin) const {
+ return {
+ (non_nulls_begin - indices_begin) + target_indices_begin,
+ (non_nulls_end - indices_begin) + target_indices_begin,
+ (nulls_begin - indices_begin) + target_indices_begin,
+ (nulls_end - indices_begin) + target_indices_begin,
+ };
+ }
};
+using NullPartitionResult = GenericNullPartitionResult<uint64_t>;
+using ChunkedNullPartitionResult =
GenericNullPartitionResult<CompressedChunkLocation>;
+
// Move nulls (not null-like values) to end of array.
//
// `offset` is used when this is called on a chunk of a chunked array
@@ -265,7 +286,9 @@ NullPartitionResult PartitionNulls(uint64_t* indices_begin,
uint64_t* indices_en
}
//
-// Null partitioning on chunked arrays
+// Null partitioning on chunked arrays, in two flavors:
+// 1) with uint64_t indices and ChunkedArrayResolver
+// 2) with CompressedChunkLocation and span of chunks
//
template <typename Partitioner>
@@ -291,6 +314,36 @@ NullPartitionResult PartitionNullsOnly(uint64_t*
indices_begin, uint64_t* indice
}
}
+template <typename Partitioner>
+ChunkedNullPartitionResult PartitionNullsOnly(CompressedChunkLocation*
locations_begin,
+ CompressedChunkLocation*
locations_end,
+ util::span<const Array* const>
chunks,
+ int64_t null_count,
+ NullPlacement null_placement) {
+ if (null_count == 0) {
+ return ChunkedNullPartitionResult::NoNulls(locations_begin, locations_end,
+ null_placement);
+ }
+ Partitioner partitioner;
+ if (null_placement == NullPlacement::AtStart) {
+ auto nulls_end =
+ partitioner(locations_begin, locations_end,
[&](CompressedChunkLocation loc) {
+ return chunks[loc.chunk_index()]->IsNull(
+ static_cast<int64_t>(loc.index_in_chunk()));
+ });
+ return ChunkedNullPartitionResult::NullsAtStart(locations_begin,
locations_end,
+ nulls_end);
+ } else {
+ auto nulls_begin =
+ partitioner(locations_begin, locations_end,
[&](CompressedChunkLocation loc) {
+ return !chunks[loc.chunk_index()]->IsNull(
+ static_cast<int64_t>(loc.index_in_chunk()));
+ });
+ return ChunkedNullPartitionResult::NullsAtEnd(locations_begin,
locations_end,
+ nulls_begin);
+ }
+}
+
template <typename ArrayType, typename Partitioner>
enable_if_t<!has_null_like_values<typename ArrayType::TypeClass>::value,
NullPartitionResult>
@@ -334,17 +387,18 @@ NullPartitionResult PartitionNulls(uint64_t*
indices_begin, uint64_t* indices_en
std::max(q.nulls_end, p.nulls_end)};
}
-struct MergeImpl {
- using MergeNullsFunc = std::function<void(uint64_t* nulls_begin, uint64_t*
nulls_middle,
- uint64_t* nulls_end, uint64_t*
temp_indices,
- int64_t null_count)>;
+template <typename IndexType, typename NullPartitionResultType>
+struct GenericMergeImpl {
+ using MergeNullsFunc = std::function<void(IndexType* nulls_begin,
+ IndexType* nulls_middle,
IndexType* nulls_end,
+ IndexType* temp_indices, int64_t
null_count)>;
using MergeNonNullsFunc =
- std::function<void(uint64_t* range_begin, uint64_t* range_middle,
- uint64_t* range_end, uint64_t* temp_indices)>;
+ std::function<void(IndexType* range_begin, IndexType* range_middle,
+ IndexType* range_end, IndexType* temp_indices)>;
- MergeImpl(NullPlacement null_placement, MergeNullsFunc&& merge_nulls,
- MergeNonNullsFunc&& merge_non_nulls)
+ GenericMergeImpl(NullPlacement null_placement, MergeNullsFunc&& merge_nulls,
+ MergeNonNullsFunc&& merge_non_nulls)
: null_placement_(null_placement),
merge_nulls_(std::move(merge_nulls)),
merge_non_nulls_(std::move(merge_non_nulls)) {}
@@ -352,13 +406,14 @@ struct MergeImpl {
Status Init(ExecContext* ctx, int64_t temp_indices_length) {
ARROW_ASSIGN_OR_RAISE(
temp_buffer_,
- AllocateBuffer(sizeof(int64_t) * temp_indices_length,
ctx->memory_pool()));
- temp_indices_ = reinterpret_cast<uint64_t*>(temp_buffer_->mutable_data());
+ AllocateBuffer(sizeof(IndexType) * temp_indices_length,
ctx->memory_pool()));
+ temp_indices_ = reinterpret_cast<IndexType*>(temp_buffer_->mutable_data());
return Status::OK();
}
- NullPartitionResult Merge(const NullPartitionResult& left,
- const NullPartitionResult& right, int64_t
null_count) const {
+ NullPartitionResultType Merge(const NullPartitionResultType& left,
+ const NullPartitionResultType& right,
+ int64_t null_count) const {
if (null_placement_ == NullPlacement::AtStart) {
return MergeNullsAtStart(left, right, null_count);
} else {
@@ -366,9 +421,9 @@ struct MergeImpl {
}
}
- NullPartitionResult MergeNullsAtStart(const NullPartitionResult& left,
- const NullPartitionResult& right,
- int64_t null_count) const {
+ NullPartitionResultType MergeNullsAtStart(const NullPartitionResultType&
left,
+ const NullPartitionResultType&
right,
+ int64_t null_count) const {
// Input layout:
// [left nulls .... left non-nulls .... right nulls .... right non-nulls]
DCHECK_EQ(left.nulls_end, left.non_nulls_begin);
@@ -379,7 +434,7 @@ struct MergeImpl {
// [left nulls .... right nulls .... left non-nulls .... right non-nulls]
std::rotate(left.non_nulls_begin, right.nulls_begin, right.nulls_end);
- const auto p = NullPartitionResult::NullsAtStart(
+ const auto p = NullPartitionResultType::NullsAtStart(
left.nulls_begin, right.non_nulls_end,
left.nulls_begin + left.null_count() + right.null_count());
@@ -401,9 +456,9 @@ struct MergeImpl {
return p;
}
- NullPartitionResult MergeNullsAtEnd(const NullPartitionResult& left,
- const NullPartitionResult& right,
- int64_t null_count) const {
+ NullPartitionResultType MergeNullsAtEnd(const NullPartitionResultType& left,
+ const NullPartitionResultType& right,
+ int64_t null_count) const {
// Input layout:
// [left non-nulls .... left nulls .... right non-nulls .... right nulls]
DCHECK_EQ(left.non_nulls_end, left.nulls_begin);
@@ -414,7 +469,7 @@ struct MergeImpl {
// [left non-nulls .... right non-nulls .... left nulls .... right nulls]
std::rotate(left.nulls_begin, right.non_nulls_begin, right.non_nulls_end);
- const auto p = NullPartitionResult::NullsAtEnd(
+ const auto p = NullPartitionResultType::NullsAtEnd(
left.non_nulls_begin, right.nulls_end,
left.non_nulls_begin + left.non_null_count() + right.non_null_count());
@@ -441,9 +496,13 @@ struct MergeImpl {
MergeNullsFunc merge_nulls_;
MergeNonNullsFunc merge_non_nulls_;
std::unique_ptr<Buffer> temp_buffer_;
- uint64_t* temp_indices_ = nullptr;
+ IndexType* temp_indices_ = nullptr;
};
+using MergeImpl = GenericMergeImpl<uint64_t, NullPartitionResult>;
+using ChunkedMergeImpl =
+ GenericMergeImpl<CompressedChunkLocation, ChunkedNullPartitionResult>;
+
// TODO make this usable if indices are non trivial on input
// (see ConcreteRecordBatchColumnSorter)
// `offset` is used when this is called on a chunk of a chunked array