pitrou commented on code in PR #14753:
URL: https://github.com/apache/arrow/pull/14753#discussion_r1039786012
##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t*
indices_begin, uint64_t* ind
const ChunkedArray& values, SortOrder sort_order,
NullPlacement null_placement);
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+ int field_index;
+ SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+ if (ref.IsNested()) {
+ return Status::KeyError("Nested keys not supported for SortKeys");
+ }
+ return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+ if (res.ok()) return res;
+ return res.status().WithMessage("Invalid sort key column: ",
res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
Review Comment:
The definition for this can be moved in a `.cc` file IMHO.
##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t*
indices_begin, uint64_t* ind
const ChunkedArray& values, SortOrder sort_order,
NullPlacement null_placement);
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+ int field_index;
+ SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+ if (ref.IsNested()) {
+ return Status::KeyError("Nested keys not supported for SortKeys");
+ }
+ return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+ if (res.ok()) return res;
+ return res.status().WithMessage("Invalid sort key column: ",
res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+ const Schema& schema, const std::vector<SortKey>& sort_keys) {
+ std::vector<SortField> fields;
+ std::unordered_set<int> seen;
+ fields.reserve(sort_keys.size());
+ seen.reserve(sort_keys.size());
+
+ for (const auto& sort_key : sort_keys) {
+ RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+ ARROW_ASSIGN_OR_RAISE(auto match,
+
PrependInvalidColumn(sort_key.target.FindOne(schema)));
+ if (seen.insert(match[0]).second) {
+ fields.push_back({match[0], sort_key.order});
+ }
+ }
+ return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+ const Schema& schema, const std::vector<SortKey>& sort_keys,
+ ResolvedSortKeyFactory&& factory) {
+ ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+ std::vector<ResolvedSortKey> resolved;
+ resolved.reserve(fields.size());
+ std::transform(fields.begin(), fields.end(), std::back_inserter(resolved),
factory);
+ return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+ const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys)
{
+ return ResolveSortKeys<ResolvedSortKey>(
+ *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+ return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+ });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,
+ const FieldRef& ref) {
+ if (ref.IsNested()) return nullptr;
+
+ if (auto name = ref.name()) {
+ return table.GetColumnByName(*name);
+ }
+
+ auto index = ref.field_path()->indices()[0];
+ if (index >= table.num_columns()) return nullptr;
+ return table.column(index);
+}
+
+// We could try to reproduce the concrete Array classes' facilities
+// (such as cached raw values pointer) in a separate hierarchy of
+// physical accessors, but doing so ends up too cumbersome.
+// Instead, we simply create the desired concrete Array objects.
+inline std::shared_ptr<Array> GetPhysicalArray(
+ const Array& array, const std::shared_ptr<DataType>& physical_type) {
+ auto new_data = array.data()->Copy();
+ new_data->type = physical_type;
+ return MakeArray(std::move(new_data));
+}
+
+inline ArrayVector GetPhysicalChunks(const ArrayVector& chunks,
+ const std::shared_ptr<DataType>&
physical_type) {
+ ArrayVector physical(chunks.size());
+ std::transform(chunks.begin(), chunks.end(), physical.begin(),
+ [&](const std::shared_ptr<Array>& array) {
+ return GetPhysicalArray(*array, physical_type);
+ });
+ return physical;
+}
+
+inline ArrayVector GetPhysicalChunks(const ChunkedArray& chunked_array,
+ const std::shared_ptr<DataType>&
physical_type) {
+ return GetPhysicalChunks(chunked_array.chunks(), physical_type);
+}
+
+// Compare two records in a single column (either from a batch or table)
+template <typename ResolvedSortKey>
+struct ColumnComparator {
+ using Location = typename ResolvedSortKey::LocationType;
+
+ ColumnComparator(const ResolvedSortKey& sort_key, NullPlacement
null_placement)
+ : sort_key_(sort_key), null_placement_(null_placement) {}
+
+ virtual ~ColumnComparator() = default;
+
+ virtual int Compare(const Location& left, const Location& right) const = 0;
+
+ ResolvedSortKey sort_key_;
+ NullPlacement null_placement_;
+};
+
+template <typename ResolvedSortKey, typename Type>
+struct ConcreteColumnComparator : public ColumnComparator<ResolvedSortKey> {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using Location = typename ResolvedSortKey::LocationType;
+
+ using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+ int Compare(const Location& left, const Location& right) const override {
+ const auto& sort_key = this->sort_key_;
+
+ const auto chunk_left = sort_key.template GetChunk<ArrayType>(left);
+ const auto chunk_right = sort_key.template GetChunk<ArrayType>(right);
+ if (sort_key.null_count > 0) {
+ const bool is_null_left = chunk_left.IsNull();
+ const bool is_null_right = chunk_right.IsNull();
+ if (is_null_left && is_null_right) {
+ return 0;
+ } else if (is_null_left) {
+ return this->null_placement_ == NullPlacement::AtStart ? -1 : 1;
+ } else if (is_null_right) {
+ return this->null_placement_ == NullPlacement::AtStart ? 1 : -1;
+ }
+ }
+ return CompareTypeValues<Type>(chunk_left.Value(), chunk_right.Value(),
+ sort_key.order, this->null_placement_);
+ }
+};
+
+template <typename ResolvedSortKey>
+struct ConcreteColumnComparator<ResolvedSortKey, NullType>
+ : public ColumnComparator<ResolvedSortKey> {
+ using Location = typename ResolvedSortKey::LocationType;
+
+ using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+ int Compare(const Location& left, const Location& right) const override {
return 0; }
+};
+
+// Compare two records in the same RecordBatch or Table
+// (indexing is handled through ResolvedSortKey)
+template <typename ResolvedSortKey>
+class MultipleKeyComparator {
+ public:
+ using Location = typename ResolvedSortKey::LocationType;
+
+ MultipleKeyComparator(const std::vector<ResolvedSortKey>& sort_keys,
+ NullPlacement null_placement)
+ : sort_keys_(sort_keys), null_placement_(null_placement) {
+ status_ &= MakeComparators();
+ }
+
+ Status status() const { return status_; }
+
+ // Returns true if the left-th value should be ordered before the
+ // right-th value, false otherwise. The start_sort_key_index-th
+ // sort key and subsequent sort keys are used for comparison.
+ bool Compare(const Location& left, const Location& right, size_t
start_sort_key_index) {
+ return CompareInternal(left, right, start_sort_key_index) < 0;
+ }
+
+ bool Equals(const Location& left, const Location& right, size_t
start_sort_key_index) {
+ return CompareInternal(left, right, start_sort_key_index) == 0;
+ }
+
+ private:
+ struct ColumnComparatorFactory {
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return VisitGeneric(type); }
+
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+ VISIT(NullType)
+
+#undef VISIT
+
+ Status Visit(const DataType& type) {
+ return Status::TypeError("Unsupported type for batch or table sorting: ",
+ type.ToString());
+ }
+
+ template <typename Type>
+ Status VisitGeneric(const Type& type) {
+ res.reset(
+ new ConcreteColumnComparator<ResolvedSortKey, Type>{sort_key,
null_placement});
+ return Status::OK();
+ }
+
+ const ResolvedSortKey& sort_key;
+ NullPlacement null_placement;
+ std::unique_ptr<ColumnComparator<ResolvedSortKey>> res;
+ };
+
+ Status MakeComparators() {
+ column_comparators_.reserve(sort_keys_.size());
+
+ for (const auto& sort_key : sort_keys_) {
+ ColumnComparatorFactory factory{sort_key, null_placement_, nullptr};
+ RETURN_NOT_OK(VisitTypeInline(*sort_key.type, &factory));
+ column_comparators_.push_back(std::move(factory.res));
+ }
+ return Status::OK();
+ }
+
+ // Compare two records in the same table and return -1, 0 or 1.
+ //
+ // -1: The left is less than the right.
+ // 0: The left equals to the right.
+ // 1: The left is greater than the right.
+ //
+ // This supports null and NaN. Null is processed in this and NaN
+ // is processed in CompareTypeValue().
+ int CompareInternal(const Location& left, const Location& right,
+ size_t start_sort_key_index) {
+ const auto num_sort_keys = sort_keys_.size();
+ for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+ const int r = column_comparators_[i]->Compare(left, right);
+ if (r != 0) {
+ return r;
+ }
+ }
+ return 0;
+ }
+
+ const std::vector<ResolvedSortKey>& sort_keys_;
+ const NullPlacement null_placement_;
+ std::vector<std::unique_ptr<ColumnComparator<ResolvedSortKey>>>
column_comparators_;
+ Status status_;
+};
+
+// Sort a batch using a single sort and multiple-key comparisons.
+class MultipleKeyRecordBatchSorter : public TypeVisitor {
Review Comment:
I'm not sure it's useful to expose this internal class in a `.h` file? By
the looks of it, only `MultipleKeyRecordBatchSorter::ResolvedSortKey` is used
by the select-k kernels?
##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t*
indices_begin, uint64_t* ind
const ChunkedArray& values, SortOrder sort_order,
NullPlacement null_placement);
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+ int field_index;
+ SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+ if (ref.IsNested()) {
+ return Status::KeyError("Nested keys not supported for SortKeys");
+ }
+ return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+ if (res.ok()) return res;
+ return res.status().WithMessage("Invalid sort key column: ",
res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+ const Schema& schema, const std::vector<SortKey>& sort_keys) {
+ std::vector<SortField> fields;
+ std::unordered_set<int> seen;
+ fields.reserve(sort_keys.size());
+ seen.reserve(sort_keys.size());
+
+ for (const auto& sort_key : sort_keys) {
+ RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+ ARROW_ASSIGN_OR_RAISE(auto match,
+
PrependInvalidColumn(sort_key.target.FindOne(schema)));
+ if (seen.insert(match[0]).second) {
+ fields.push_back({match[0], sort_key.order});
+ }
+ }
+ return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+ const Schema& schema, const std::vector<SortKey>& sort_keys,
+ ResolvedSortKeyFactory&& factory) {
+ ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+ std::vector<ResolvedSortKey> resolved;
+ resolved.reserve(fields.size());
+ std::transform(fields.begin(), fields.end(), std::back_inserter(resolved),
factory);
+ return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+ const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys)
{
+ return ResolveSortKeys<ResolvedSortKey>(
+ *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+ return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+ });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,
+ const FieldRef& ref) {
+ if (ref.IsNested()) return nullptr;
+
+ if (auto name = ref.name()) {
+ return table.GetColumnByName(*name);
+ }
+
+ auto index = ref.field_path()->indices()[0];
+ if (index >= table.num_columns()) return nullptr;
+ return table.column(index);
+}
+
+// We could try to reproduce the concrete Array classes' facilities
+// (such as cached raw values pointer) in a separate hierarchy of
+// physical accessors, but doing so ends up too cumbersome.
+// Instead, we simply create the desired concrete Array objects.
+inline std::shared_ptr<Array> GetPhysicalArray(
+ const Array& array, const std::shared_ptr<DataType>& physical_type) {
+ auto new_data = array.data()->Copy();
+ new_data->type = physical_type;
+ return MakeArray(std::move(new_data));
+}
+
+inline ArrayVector GetPhysicalChunks(const ArrayVector& chunks,
+ const std::shared_ptr<DataType>&
physical_type) {
+ ArrayVector physical(chunks.size());
+ std::transform(chunks.begin(), chunks.end(), physical.begin(),
+ [&](const std::shared_ptr<Array>& array) {
+ return GetPhysicalArray(*array, physical_type);
+ });
+ return physical;
+}
+
+inline ArrayVector GetPhysicalChunks(const ChunkedArray& chunked_array,
+ const std::shared_ptr<DataType>&
physical_type) {
+ return GetPhysicalChunks(chunked_array.chunks(), physical_type);
+}
+
+// Compare two records in a single column (either from a batch or table)
+template <typename ResolvedSortKey>
+struct ColumnComparator {
+ using Location = typename ResolvedSortKey::LocationType;
+
+ ColumnComparator(const ResolvedSortKey& sort_key, NullPlacement
null_placement)
+ : sort_key_(sort_key), null_placement_(null_placement) {}
+
+ virtual ~ColumnComparator() = default;
+
+ virtual int Compare(const Location& left, const Location& right) const = 0;
+
+ ResolvedSortKey sort_key_;
+ NullPlacement null_placement_;
+};
+
+template <typename ResolvedSortKey, typename Type>
+struct ConcreteColumnComparator : public ColumnComparator<ResolvedSortKey> {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+ using Location = typename ResolvedSortKey::LocationType;
+
+ using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+ int Compare(const Location& left, const Location& right) const override {
+ const auto& sort_key = this->sort_key_;
+
+ const auto chunk_left = sort_key.template GetChunk<ArrayType>(left);
+ const auto chunk_right = sort_key.template GetChunk<ArrayType>(right);
+ if (sort_key.null_count > 0) {
+ const bool is_null_left = chunk_left.IsNull();
+ const bool is_null_right = chunk_right.IsNull();
+ if (is_null_left && is_null_right) {
+ return 0;
+ } else if (is_null_left) {
+ return this->null_placement_ == NullPlacement::AtStart ? -1 : 1;
+ } else if (is_null_right) {
+ return this->null_placement_ == NullPlacement::AtStart ? 1 : -1;
+ }
+ }
+ return CompareTypeValues<Type>(chunk_left.Value(), chunk_right.Value(),
+ sort_key.order, this->null_placement_);
+ }
+};
+
+template <typename ResolvedSortKey>
+struct ConcreteColumnComparator<ResolvedSortKey, NullType>
+ : public ColumnComparator<ResolvedSortKey> {
+ using Location = typename ResolvedSortKey::LocationType;
+
+ using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+ int Compare(const Location& left, const Location& right) const override {
return 0; }
+};
+
+// Compare two records in the same RecordBatch or Table
+// (indexing is handled through ResolvedSortKey)
+template <typename ResolvedSortKey>
+class MultipleKeyComparator {
+ public:
+ using Location = typename ResolvedSortKey::LocationType;
+
+ MultipleKeyComparator(const std::vector<ResolvedSortKey>& sort_keys,
+ NullPlacement null_placement)
+ : sort_keys_(sort_keys), null_placement_(null_placement) {
+ status_ &= MakeComparators();
+ }
+
+ Status status() const { return status_; }
+
+ // Returns true if the left-th value should be ordered before the
+ // right-th value, false otherwise. The start_sort_key_index-th
+ // sort key and subsequent sort keys are used for comparison.
+ bool Compare(const Location& left, const Location& right, size_t
start_sort_key_index) {
+ return CompareInternal(left, right, start_sort_key_index) < 0;
+ }
+
+ bool Equals(const Location& left, const Location& right, size_t
start_sort_key_index) {
+ return CompareInternal(left, right, start_sort_key_index) == 0;
+ }
+
+ private:
+ struct ColumnComparatorFactory {
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return VisitGeneric(type); }
+
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+ VISIT(NullType)
+
+#undef VISIT
+
+ Status Visit(const DataType& type) {
+ return Status::TypeError("Unsupported type for batch or table sorting: ",
+ type.ToString());
+ }
+
+ template <typename Type>
+ Status VisitGeneric(const Type& type) {
+ res.reset(
+ new ConcreteColumnComparator<ResolvedSortKey, Type>{sort_key,
null_placement});
+ return Status::OK();
+ }
+
+ const ResolvedSortKey& sort_key;
+ NullPlacement null_placement;
+ std::unique_ptr<ColumnComparator<ResolvedSortKey>> res;
+ };
+
+ Status MakeComparators() {
+ column_comparators_.reserve(sort_keys_.size());
+
+ for (const auto& sort_key : sort_keys_) {
+ ColumnComparatorFactory factory{sort_key, null_placement_, nullptr};
+ RETURN_NOT_OK(VisitTypeInline(*sort_key.type, &factory));
+ column_comparators_.push_back(std::move(factory.res));
+ }
+ return Status::OK();
+ }
+
+ // Compare two records in the same table and return -1, 0 or 1.
+ //
+ // -1: The left is less than the right.
+ // 0: The left equals to the right.
+ // 1: The left is greater than the right.
+ //
+ // This supports null and NaN. Null is processed in this and NaN
+ // is processed in CompareTypeValue().
+ int CompareInternal(const Location& left, const Location& right,
+ size_t start_sort_key_index) {
+ const auto num_sort_keys = sort_keys_.size();
+ for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+ const int r = column_comparators_[i]->Compare(left, right);
+ if (r != 0) {
+ return r;
+ }
+ }
+ return 0;
+ }
+
+ const std::vector<ResolvedSortKey>& sort_keys_;
+ const NullPlacement null_placement_;
+ std::vector<std::unique_ptr<ColumnComparator<ResolvedSortKey>>>
column_comparators_;
+ Status status_;
+};
+
+// Sort a batch using a single sort and multiple-key comparisons.
+class MultipleKeyRecordBatchSorter : public TypeVisitor {
Review Comment:
(same for other classes above perhaps)
##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -0,0 +1,858 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <queue>
+
+#include "arrow/compute/kernels/vector_sort_internal.h"
+#include "arrow/compute/registry.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute::internal {
+
+namespace {
+
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+const SelectKOptions* GetDefaultSelectKOptions() {
+ static const auto kDefaultSelectKOptions = SelectKOptions::Defaults();
+ return &kDefaultSelectKOptions;
+}
+
+const FunctionDoc select_k_unstable_doc(
+ "Select the indices of the first `k` ordered elements from the input",
+ ("This function selects an array of indices of the first `k` ordered
elements\n"
+ "from the `input` array, record batch or table specified in the column
keys\n"
+ "(`options.sort_keys`). Output is not guaranteed to be stable.\n"
+ "Null values are considered greater than any other value and are\n"
+ "therefore ordered at the end. For floating-point types, NaNs are
considered\n"
+ "greater than any other non-null value, but smaller than null values."),
+ {"input"}, "SelectKOptions", /*options_required=*/true);
+
+Result<std::shared_ptr<ArrayData>> MakeMutableUInt64Array(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = length * sizeof(uint64_t);
+ ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(buffer_size, memory_pool));
+ return ArrayData::Make(uint64(), length, {nullptr, std::move(data)},
/*null_count=*/0);
+}
+
+template <SortOrder order>
+class SelectKComparator {
+ public:
+ template <typename Type>
+ bool operator()(const Type& lval, const Type& rval);
+};
+
+template <>
+class SelectKComparator<SortOrder::Ascending> {
+ public:
+ template <typename Type>
+ bool operator()(const Type& lval, const Type& rval) {
+ return lval < rval;
+ }
+};
+
+template <>
+class SelectKComparator<SortOrder::Descending> {
+ public:
+ template <typename Type>
+ bool operator()(const Type& lval, const Type& rval) {
+ return rval < lval;
+ }
+};
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, const SelectKOptions&
options,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(options.k),
+ order_(options.sort_keys[0].order),
+ physical_type_(GetPhysicalType(array.type())),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { \
+ if (order_ == SortOrder::Ascending) { \
+ return SelectKthInternal<TYPE, SortOrder::Ascending>(); \
+ } \
+ return SelectKthInternal<TYPE, SortOrder::Descending>(); \
+ }
+
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType, SortOrder sort_order>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+ ArrayType arr(array_.data());
+ std::vector<uint64_t> indices(arr.length());
+
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+ if (k_ > arr.length()) {
+ k_ = arr.length();
+ }
+
+ const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
+ indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
+ const auto end_iter = p.non_nulls_end;
+
+ auto kth_begin = std::min(indices_begin + k_, end_iter);
+
+ SelectKComparator<sort_order> comparator;
+ auto cmp = [&arr, &comparator](uint64_t left, uint64_t right) {
+ const auto lval = GetView::LogicalValue(arr.GetView(left));
+ const auto rval = GetView::LogicalValue(arr.GetView(right));
+ return comparator(lval, rval);
+ };
+ using HeapContainer =
+ std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
+ HeapContainer heap(indices_begin, kth_begin, cmp);
+ for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
+ uint64_t x_index = *iter;
+ if (cmp(x_index, heap.top())) {
+ heap.pop();
+ heap.push(x_index);
+ }
+ }
+ auto out_size = static_cast<int64_t>(heap.size());
+ ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(),
out_size,
+
ctx_->memory_pool()));
+
+ auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size
- 1;
+ while (heap.size() > 0) {
+ *out_cbegin = heap.top();
+ heap.pop();
+ --out_cbegin;
+ }
+ *output_ = Datum(take_indices);
+ return Status::OK();
+ }
+
+ ExecContext* ctx_;
+ const Array& array_;
+ int64_t k_;
+ SortOrder order_;
+ const std::shared_ptr<DataType> physical_type_;
+ Datum* output_;
+};
+
+template <typename ArrayType>
+struct TypedHeapItem {
+ uint64_t index;
+ uint64_t offset;
+ ArrayType* array;
+};
+
+class ChunkedArraySelecter : public TypeVisitor {
+ public:
+ ChunkedArraySelecter(ExecContext* ctx, const ChunkedArray& chunked_array,
+ const SelectKOptions& options, Datum* output)
+ : TypeVisitor(),
+ chunked_array_(chunked_array),
+ physical_type_(GetPhysicalType(chunked_array.type())),
+ physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
+ k_(options.k),
+ order_(options.sort_keys[0].order),
+ ctx_(ctx),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { \
+ if (order_ == SortOrder::Ascending) { \
+ return SelectKthInternal<TYPE, SortOrder::Ascending>(); \
+ } \
+ return SelectKthInternal<TYPE, SortOrder::Descending>(); \
+ }
+
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+#undef VISIT
+
+ template <typename InType, SortOrder sort_order>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+ using HeapItem = TypedHeapItem<ArrayType>;
+
+ const auto num_chunks = chunked_array_.num_chunks();
+ if (num_chunks == 0) {
+ return Status::OK();
+ }
+ if (k_ > chunked_array_.length()) {
+ k_ = chunked_array_.length();
+ }
+ std::function<bool(const HeapItem&, const HeapItem&)> cmp;
+ SelectKComparator<sort_order> comparator;
+
+ cmp = [&comparator](const HeapItem& left, const HeapItem& right) -> bool {
+ const auto lval = GetView::LogicalValue(left.array->GetView(left.index));
+ const auto rval =
GetView::LogicalValue(right.array->GetView(right.index));
+ return comparator(lval, rval);
+ };
+ using HeapContainer =
+ std::priority_queue<HeapItem, std::vector<HeapItem>, decltype(cmp)>;
+
+ HeapContainer heap(cmp);
+ std::vector<std::shared_ptr<ArrayType>> chunks_holder;
+ uint64_t offset = 0;
+ for (const auto& chunk : physical_chunks_) {
+ if (chunk->length() == 0) continue;
+ chunks_holder.emplace_back(std::make_shared<ArrayType>(chunk->data()));
+ ArrayType& arr = *chunks_holder[chunks_holder.size() - 1];
+
+ std::vector<uint64_t> indices(arr.length());
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+
+ const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
+ indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
+ const auto end_iter = p.non_nulls_end;
+
+ auto kth_begin = std::min(indices_begin + k_, end_iter);
+ uint64_t* iter = indices_begin;
+ for (; iter != kth_begin && heap.size() < static_cast<size_t>(k_);
++iter) {
+ heap.push(HeapItem{*iter, offset, &arr});
+ }
+ for (; iter != end_iter && !heap.empty(); ++iter) {
+ uint64_t x_index = *iter;
+ const auto& xval = GetView::LogicalValue(arr.GetView(x_index));
+ auto top_item = heap.top();
+ const auto& top_value =
+ GetView::LogicalValue(top_item.array->GetView(top_item.index));
+ if (comparator(xval, top_value)) {
+ heap.pop();
+ heap.push(HeapItem{x_index, offset, &arr});
+ }
+ }
+ offset += chunk->length();
+ }
+
+ auto out_size = static_cast<int64_t>(heap.size());
+ ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(),
out_size,
+
ctx_->memory_pool()));
+ auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size
- 1;
+ while (heap.size() > 0) {
+ auto top_item = heap.top();
+ *out_cbegin = top_item.index + top_item.offset;
+ heap.pop();
+ --out_cbegin;
+ }
+ *output_ = Datum(take_indices);
+ return Status::OK();
+ }
+
+ const ChunkedArray& chunked_array_;
+ const std::shared_ptr<DataType> physical_type_;
+ const ArrayVector physical_chunks_;
+ int64_t k_;
+ SortOrder order_;
+ ExecContext* ctx_;
+ Datum* output_;
+};
+
+class RecordBatchSelecter : public TypeVisitor {
+ private:
+ using ResolvedSortKey = MultipleKeyRecordBatchSorter::ResolvedSortKey;
+ using Comparator = MultipleKeyComparator<ResolvedSortKey>;
+
+ public:
+ RecordBatchSelecter(ExecContext* ctx, const RecordBatch& record_batch,
+ const SelectKOptions& options, Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ record_batch_(record_batch),
+ k_(options.k),
+ output_(output),
+ sort_keys_(ResolveSortKeys(record_batch, options.sort_keys)),
+ comparator_(sort_keys_, NullPlacement::AtEnd) {}
+
+ Status Run() { return sort_keys_[0].type->Accept(this); }
+
+ protected:
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { \
+ if (sort_keys_[0].order == SortOrder::Descending) \
+ return SelectKthInternal<TYPE, SortOrder::Descending>(); \
+ return SelectKthInternal<TYPE, SortOrder::Ascending>(); \
+ }
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+#undef VISIT
+
+ static std::vector<ResolvedSortKey> ResolveSortKeys(
+ const RecordBatch& batch, const std::vector<SortKey>& sort_keys) {
+ std::vector<ResolvedSortKey> resolved;
+ for (const auto& key : sort_keys) {
+ auto array = key.target.GetOne(batch).ValueOr(nullptr);
+ resolved.emplace_back(array, key.order);
+ }
+ return resolved;
+ }
+
+ template <typename InType, SortOrder sort_order>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+ auto& comparator = comparator_;
+ const auto& first_sort_key = sort_keys_[0];
+ const auto& arr = checked_cast<const ArrayType&>(first_sort_key.array);
+
+ const auto num_rows = record_batch_.num_rows();
+ if (num_rows == 0) {
+ return Status::OK();
+ }
+ if (k_ > record_batch_.num_rows()) {
+ k_ = record_batch_.num_rows();
+ }
+ std::function<bool(const uint64_t&, const uint64_t&)> cmp;
+ SelectKComparator<sort_order> select_k_comparator;
+ cmp = [&](const uint64_t& left, const uint64_t& right) -> bool {
+ const auto lval = GetView::LogicalValue(arr.GetView(left));
+ const auto rval = GetView::LogicalValue(arr.GetView(right));
+ if (lval == rval) {
+ // If the left value equals to the right value,
+ // we need to compare the second and following
+ // sort keys.
+ return comparator.Compare(left, right, 1);
+ }
+ return select_k_comparator(lval, rval);
+ };
+ using HeapContainer =
+ std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
+
+ std::vector<uint64_t> indices(arr.length());
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+
+ const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
+ indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
+ const auto end_iter = p.non_nulls_end;
+
+ auto kth_begin = std::min(indices_begin + k_, end_iter);
+
+ HeapContainer heap(indices_begin, kth_begin, cmp);
+ for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
+ uint64_t x_index = *iter;
+ auto top_item = heap.top();
+ if (cmp(x_index, top_item)) {
+ heap.pop();
+ heap.push(x_index);
+ }
+ }
+ auto out_size = static_cast<int64_t>(heap.size());
+ ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(),
out_size,
+
ctx_->memory_pool()));
+ auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size
- 1;
+ while (heap.size() > 0) {
+ *out_cbegin = heap.top();
+ heap.pop();
+ --out_cbegin;
+ }
+ *output_ = Datum(take_indices);
+ return Status::OK();
+ }
+
+ ExecContext* ctx_;
+ const RecordBatch& record_batch_;
+ int64_t k_;
+ Datum* output_;
+ std::vector<ResolvedSortKey> sort_keys_;
+ Comparator comparator_;
+};
+
+class TableSelecter : public TypeVisitor {
+ private:
+ struct ResolvedSortKey {
+ ResolvedSortKey(const std::shared_ptr<ChunkedArray>& chunked_array,
+ const SortOrder order)
+ : order(order),
+ type(GetPhysicalType(chunked_array->type())),
+ chunks(GetPhysicalChunks(*chunked_array, type)),
+ null_count(chunked_array->null_count()),
+ resolver(GetArrayPointers(chunks)) {}
+
+ using LocationType = int64_t;
+
+ // Find the target chunk and index in the target chunk from an
+ // index in chunked array.
+ template <typename ArrayType>
+ ResolvedChunk<ArrayType> GetChunk(int64_t index) const {
+ return resolver.Resolve<ArrayType>(index);
+ }
+
+ const SortOrder order;
+ const std::shared_ptr<DataType> type;
+ const ArrayVector chunks;
+ const int64_t null_count;
+ const ChunkedArrayResolver resolver;
+ };
+ using Comparator = MultipleKeyComparator<ResolvedSortKey>;
+
+ public:
+ TableSelecter(ExecContext* ctx, const Table& table, const SelectKOptions&
options,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ table_(table),
+ k_(options.k),
+ output_(output),
+ sort_keys_(ResolveSortKeys(table, options.sort_keys)),
+ comparator_(sort_keys_, NullPlacement::AtEnd) {}
+
+ Status Run() { return sort_keys_[0].type->Accept(this); }
+
+ protected:
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { \
+ if (sort_keys_[0].order == SortOrder::Descending) \
+ return SelectKthInternal<TYPE, SortOrder::Descending>(); \
+ return SelectKthInternal<TYPE, SortOrder::Ascending>(); \
+ }
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ static std::vector<ResolvedSortKey> ResolveSortKeys(
+ const Table& table, const std::vector<SortKey>& sort_keys) {
+ std::vector<ResolvedSortKey> resolved;
+ for (const auto& key : sort_keys) {
+ auto chunked_array = GetTableColumn(table, key.target);
+ resolved.emplace_back(chunked_array, key.order);
+ }
+ return resolved;
+ }
+
+ // Behaves like PartitionNulls() but this supports multiple sort keys.
+ template <typename Type>
+ NullPartitionResult PartitionNullsInternal(uint64_t* indices_begin,
+ uint64_t* indices_end,
+ const ResolvedSortKey&
first_sort_key) {
+ using ArrayType = typename TypeTraits<Type>::ArrayType;
+
+ const auto p = PartitionNullsOnly<StablePartitioner>(
+ indices_begin, indices_end, first_sort_key.resolver,
first_sort_key.null_count,
+ NullPlacement::AtEnd);
+ DCHECK_EQ(p.nulls_end - p.nulls_begin, first_sort_key.null_count);
+
+ const auto q = PartitionNullLikes<ArrayType, StablePartitioner>(
+ p.non_nulls_begin, p.non_nulls_end, first_sort_key.resolver,
+ NullPlacement::AtEnd);
+
+ auto& comparator = comparator_;
+ // Sort all NaNs by the second and following sort keys.
+ std::stable_sort(q.nulls_begin, q.nulls_end, [&](uint64_t left, uint64_t
right) {
+ return comparator.Compare(left, right, 1);
+ });
+ // Sort all nulls by the second and following sort keys.
+ std::stable_sort(p.nulls_begin, p.nulls_end, [&](uint64_t left, uint64_t
right) {
+ return comparator.Compare(left, right, 1);
+ });
+
+ return q;
+ }
+
+ // XXX this implementation is rather inefficient as it computes chunk indices
+ // at every comparison. Instead we should iterate over individual batches
+ // and remember ChunkLocation entries in the max-heap.
+
+ template <typename InType, SortOrder sort_order>
+ Status SelectKthInternal() {
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+ auto& comparator = comparator_;
+ const auto& first_sort_key = sort_keys_[0];
+
+ const auto num_rows = table_.num_rows();
+ if (num_rows == 0) {
+ return Status::OK();
+ }
+ if (k_ > table_.num_rows()) {
+ k_ = table_.num_rows();
+ }
+ std::function<bool(const uint64_t&, const uint64_t&)> cmp;
+ SelectKComparator<sort_order> select_k_comparator;
+ cmp = [&](const uint64_t& left, const uint64_t& right) -> bool {
+ auto chunk_left = first_sort_key.template GetChunk<ArrayType>(left);
+ auto chunk_right = first_sort_key.template GetChunk<ArrayType>(right);
+ auto value_left = chunk_left.Value();
+ auto value_right = chunk_right.Value();
+ if (value_left == value_right) {
+ return comparator.Compare(left, right, 1);
+ }
+ return select_k_comparator(value_left, value_right);
+ };
+ using HeapContainer =
+ std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
+
+ std::vector<uint64_t> indices(num_rows);
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+
+ const auto p =
+ this->PartitionNullsInternal<InType>(indices_begin, indices_end,
first_sort_key);
+ const auto end_iter = p.non_nulls_end;
+ auto kth_begin = std::min(indices_begin + k_, end_iter);
+
+ HeapContainer heap(indices_begin, kth_begin, cmp);
+ for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
+ uint64_t x_index = *iter;
+ uint64_t top_item = heap.top();
+ if (cmp(x_index, top_item)) {
+ heap.pop();
+ heap.push(x_index);
+ }
+ }
+ auto out_size = static_cast<int64_t>(heap.size());
+ ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(),
out_size,
+
ctx_->memory_pool()));
+ auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size
- 1;
+ while (heap.size() > 0) {
+ *out_cbegin = heap.top();
+ heap.pop();
+ --out_cbegin;
+ }
+ *output_ = Datum(take_indices);
+ return Status::OK();
+ }
+
+ ExecContext* ctx_;
+ const Table& table_;
+ int64_t k_;
+ Datum* output_;
+ std::vector<ResolvedSortKey> sort_keys_;
+ Comparator comparator_;
+};
+
+static Status CheckConsistency(const Schema& schema,
+ const std::vector<SortKey>& sort_keys) {
+ for (const auto& key : sort_keys) {
+ RETURN_NOT_OK(CheckNonNested(key.target));
+ RETURN_NOT_OK(PrependInvalidColumn(key.target.FindOne(schema)));
+ }
+ return Status::OK();
+}
+
+class SelectKUnstableMetaFunction : public MetaFunction {
+ public:
+ SelectKUnstableMetaFunction()
+ : MetaFunction("select_k_unstable", Arity::Unary(),
select_k_unstable_doc,
+ GetDefaultSelectKOptions()) {}
+
+ Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
+ const FunctionOptions* options,
+ ExecContext* ctx) const override {
+ const auto& select_k_options = static_cast<const
SelectKOptions&>(*options);
+ if (select_k_options.k < 0) {
+ return Status::Invalid("select_k_unstable requires a nonnegative `k`,
got ",
+ select_k_options.k);
+ }
+ if (select_k_options.sort_keys.size() == 0) {
+ return Status::Invalid("select_k_unstable requires a non-empty
`sort_keys`");
+ }
+ switch (args[0].kind()) {
+ case Datum::ARRAY:
+ return SelectKth(*args[0].make_array(), select_k_options, ctx);
+ case Datum::CHUNKED_ARRAY:
+ return SelectKth(*args[0].chunked_array(), select_k_options, ctx);
+ case Datum::RECORD_BATCH:
+ return SelectKth(*args[0].record_batch(), select_k_options, ctx);
+ case Datum::TABLE:
+ return SelectKth(*args[0].table(), select_k_options, ctx);
+ default:
+ break;
+ }
+ return Status::NotImplemented(
+ "Unsupported types for select_k operation: "
+ "values=",
+ args[0].ToString());
+ }
+
+ private:
+ Result<Datum> SelectKth(const Array& array, const SelectKOptions& options,
+ ExecContext* ctx) const {
+ Datum output;
+ ArraySelecter selecter(ctx, array, options, &output);
+ ARROW_RETURN_NOT_OK(selecter.Run());
+ return output;
+ }
+
+ Result<Datum> SelectKth(const ChunkedArray& chunked_array,
+ const SelectKOptions& options, ExecContext* ctx)
const {
+ Datum output;
+ ChunkedArraySelecter selecter(ctx, chunked_array, options, &output);
+ ARROW_RETURN_NOT_OK(selecter.Run());
+ return output;
+ }
+ Result<Datum> SelectKth(const RecordBatch& record_batch, const
SelectKOptions& options,
+ ExecContext* ctx) const {
+ ARROW_RETURN_NOT_OK(CheckConsistency(*record_batch.schema(),
options.sort_keys));
+ Datum output;
+ RecordBatchSelecter selecter(ctx, record_batch, options, &output);
+ ARROW_RETURN_NOT_OK(selecter.Run());
+ return output;
+ }
+ Result<Datum> SelectKth(const Table& table, const SelectKOptions& options,
+ ExecContext* ctx) const {
+ ARROW_RETURN_NOT_OK(CheckConsistency(*table.schema(), options.sort_keys));
+ Datum output;
+ TableSelecter selecter(ctx, table, options, &output);
+ ARROW_RETURN_NOT_OK(selecter.Run());
+ return output;
+ }
+};
+
+// ----------------------------------------------------------------------
+// Rank implementation
Review Comment:
Since you're moving the Rank implementation(s) already, might as well move
them to a dedicated `vector_rank.cc`?
##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t*
indices_begin, uint64_t* ind
const ChunkedArray& values, SortOrder sort_order,
NullPlacement null_placement);
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+ int field_index;
+ SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+ if (ref.IsNested()) {
+ return Status::KeyError("Nested keys not supported for SortKeys");
+ }
+ return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+ if (res.ok()) return res;
+ return res.status().WithMessage("Invalid sort key column: ",
res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+ const Schema& schema, const std::vector<SortKey>& sort_keys) {
+ std::vector<SortField> fields;
+ std::unordered_set<int> seen;
+ fields.reserve(sort_keys.size());
+ seen.reserve(sort_keys.size());
+
+ for (const auto& sort_key : sort_keys) {
+ RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+ ARROW_ASSIGN_OR_RAISE(auto match,
+
PrependInvalidColumn(sort_key.target.FindOne(schema)));
+ if (seen.insert(match[0]).second) {
+ fields.push_back({match[0], sort_key.order});
+ }
+ }
+ return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+ const Schema& schema, const std::vector<SortKey>& sort_keys,
+ ResolvedSortKeyFactory&& factory) {
+ ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+ std::vector<ResolvedSortKey> resolved;
+ resolved.reserve(fields.size());
+ std::transform(fields.begin(), fields.end(), std::back_inserter(resolved),
factory);
+ return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+ const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys)
{
+ return ResolveSortKeys<ResolvedSortKey>(
+ *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+ return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+ });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,
Review Comment:
Hmm... `FieldPath` and `FieldRef` already support looking up a column from a
RecordBatch, I think it would be easy to add similar support for Table there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]