lidavidm commented on a change in pull request #11019:
URL: https://github.com/apache/arrow/pull/11019#discussion_r700236238
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1778,6 +1784,711 @@ class SortIndicesMetaFunction : public MetaFunction {
}
};
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+using SelectKOptionsState = internal::OptionsWrapper<SelectKOptions>;
+const auto kDefaultTopKOptions = SelectKOptions::TopKDefault();
+const auto kDefaultBottomKOptions = SelectKOptions::BottomKDefault();
+
+const FunctionDoc top_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+const FunctionDoc bottom_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+Result<std::shared_ptr<ArrayData>> MakeMutableArrayForFixedSizedType(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = BitUtil::BytesForBits(
+ length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
+ std::vector<std::shared_ptr<Buffer>> buffers(2);
+ ARROW_ASSIGN_OR_RAISE(buffers[1], AllocateResizableBuffer(buffer_size,
memory_pool));
+ auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
+ return out;
+}
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, int64_t k, const
SortOrder order,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(k),
+ physical_type_(GetPhysicalType(array.type())),
+ order_(order),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
Review comment:
nit: you might prefer VisitTypeInline instead which would let you define
cases via templates instead of macros/overloads.
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1778,6 +1784,711 @@ class SortIndicesMetaFunction : public MetaFunction {
}
};
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+using SelectKOptionsState = internal::OptionsWrapper<SelectKOptions>;
+const auto kDefaultTopKOptions = SelectKOptions::TopKDefault();
+const auto kDefaultBottomKOptions = SelectKOptions::BottomKDefault();
+
+const FunctionDoc top_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+const FunctionDoc bottom_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+Result<std::shared_ptr<ArrayData>> MakeMutableArrayForFixedSizedType(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = BitUtil::BytesForBits(
+ length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
+ std::vector<std::shared_ptr<Buffer>> buffers(2);
+ ARROW_ASSIGN_OR_RAISE(buffers[1], AllocateResizableBuffer(buffer_size,
memory_pool));
+ auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
+ return out;
+}
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, int64_t k, const
SortOrder order,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(k),
+ physical_type_(GetPhysicalType(array.type())),
+ order_(order),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return SelectKthInternal<TYPE>(); }
+
+ VISIT_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+ ArrayType arr(array_.data());
+ std::vector<uint64_t> indices(arr.length());
+
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+ if (k_ > arr.length()) {
+ k_ = arr.length();
+ }
+ auto end_iter = PartitionNulls<ArrayType,
NonStablePartitioner>(indices_begin,
+
indices_end, arr, 0);
+ auto kth_begin = indices_begin + k_;
+ if (kth_begin > end_iter) {
+ kth_begin = end_iter;
+ }
+ std::function<bool(uint64_t, uint64_t)> cmp;
+ if (order_ == SortOrder::Ascending) {
Review comment:
It seems odd to me to have sort order be templated in so many places
only for it to ultimately result in a runtime choice. Either it should be
templated all the way through, or it should be a runtime parameter all the way
through.
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1778,6 +1784,711 @@ class SortIndicesMetaFunction : public MetaFunction {
}
};
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+using SelectKOptionsState = internal::OptionsWrapper<SelectKOptions>;
+const auto kDefaultTopKOptions = SelectKOptions::TopKDefault();
+const auto kDefaultBottomKOptions = SelectKOptions::BottomKDefault();
+
+const FunctionDoc top_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+const FunctionDoc bottom_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+Result<std::shared_ptr<ArrayData>> MakeMutableArrayForFixedSizedType(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = BitUtil::BytesForBits(
+ length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
+ std::vector<std::shared_ptr<Buffer>> buffers(2);
+ ARROW_ASSIGN_OR_RAISE(buffers[1], AllocateResizableBuffer(buffer_size,
memory_pool));
+ auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
+ return out;
+}
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, int64_t k, const
SortOrder order,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(k),
+ physical_type_(GetPhysicalType(array.type())),
+ order_(order),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return SelectKthInternal<TYPE>(); }
+
+ VISIT_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType>
Review comment:
I wonder if the templating here could be done in a way to generate less
code. For instance, TimestampType, Int64Type, and Date64Type all have the same
underlying semantics and should use the same generated code.
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1778,6 +1784,711 @@ class SortIndicesMetaFunction : public MetaFunction {
}
};
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+using SelectKOptionsState = internal::OptionsWrapper<SelectKOptions>;
+const auto kDefaultTopKOptions = SelectKOptions::TopKDefault();
+const auto kDefaultBottomKOptions = SelectKOptions::BottomKDefault();
+
+const FunctionDoc top_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+const FunctionDoc bottom_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+Result<std::shared_ptr<ArrayData>> MakeMutableArrayForFixedSizedType(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = BitUtil::BytesForBits(
+ length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
+ std::vector<std::shared_ptr<Buffer>> buffers(2);
+ ARROW_ASSIGN_OR_RAISE(buffers[1], AllocateResizableBuffer(buffer_size,
memory_pool));
+ auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
+ return out;
+}
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, int64_t k, const
SortOrder order,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(k),
+ physical_type_(GetPhysicalType(array.type())),
+ order_(order),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return SelectKthInternal<TYPE>(); }
+
+ VISIT_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+ ArrayType arr(array_.data());
+ std::vector<uint64_t> indices(arr.length());
+
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+ if (k_ > arr.length()) {
+ k_ = arr.length();
+ }
+ auto end_iter = PartitionNulls<ArrayType,
NonStablePartitioner>(indices_begin,
+
indices_end, arr, 0);
+ auto kth_begin = indices_begin + k_;
+ if (kth_begin > end_iter) {
+ kth_begin = end_iter;
+ }
+ std::function<bool(uint64_t, uint64_t)> cmp;
+ if (order_ == SortOrder::Ascending) {
+ cmp = [&arr](uint64_t left, uint64_t right) -> bool {
+ const auto lval = GetView::LogicalValue(arr.GetView(left));
+ const auto rval = GetView::LogicalValue(arr.GetView(right));
+ return lval < rval;
+ };
+ } else {
+ cmp = [&arr](uint64_t left, uint64_t right) -> bool {
+ const auto lval = GetView::LogicalValue(arr.GetView(left));
+ const auto rval = GetView::LogicalValue(arr.GetView(right));
+ return rval < lval;
+ };
+ }
+ arrow::internal::Heap<uint64_t, decltype(cmp)> heap(cmp);
+ uint64_t* iter = indices_begin;
+ for (; iter != kth_begin && heap.Size() < static_cast<size_t>(k_); ++iter)
{
+ heap.Push(*iter);
+ }
+ for (; iter != end_iter && heap.Size() > 0; ++iter) {
+ uint64_t x_index = *iter;
+ const auto lval = GetView::LogicalValue(arr.GetView(x_index));
+ const auto rval = GetView::LogicalValue(arr.GetView(heap.Top()));
+ if (order_ == SortOrder::Ascending) {
+ if (lval < rval) {
+ heap.ReplaceTop(x_index);
+ }
+ } else {
+ if (rval < lval) {
+ heap.ReplaceTop(x_index);
+ }
+ }
+ }
+
+ int64_t out_size = static_cast<int64_t>(heap.Size());
+ ARROW_ASSIGN_OR_RAISE(
+ auto take_indices,
+ MakeMutableArrayForFixedSizedType(uint64(), out_size,
ctx_->memory_pool()));
Review comment:
I wonder if we shouldn't just return the indices, just like the sort
kernel.
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1778,6 +1784,711 @@ class SortIndicesMetaFunction : public MetaFunction {
}
};
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+using SelectKOptionsState = internal::OptionsWrapper<SelectKOptions>;
+const auto kDefaultTopKOptions = SelectKOptions::TopKDefault();
+const auto kDefaultBottomKOptions = SelectKOptions::BottomKDefault();
+
+const FunctionDoc top_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+const FunctionDoc bottom_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+Result<std::shared_ptr<ArrayData>> MakeMutableArrayForFixedSizedType(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = BitUtil::BytesForBits(
+ length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
+ std::vector<std::shared_ptr<Buffer>> buffers(2);
+ ARROW_ASSIGN_OR_RAISE(buffers[1], AllocateResizableBuffer(buffer_size,
memory_pool));
+ auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
+ return out;
+}
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, int64_t k, const
SortOrder order,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(k),
+ physical_type_(GetPhysicalType(array.type())),
+ order_(order),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return SelectKthInternal<TYPE>(); }
+
+ VISIT_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+ ArrayType arr(array_.data());
+ std::vector<uint64_t> indices(arr.length());
+
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+ if (k_ > arr.length()) {
+ k_ = arr.length();
+ }
+ auto end_iter = PartitionNulls<ArrayType,
NonStablePartitioner>(indices_begin,
+
indices_end, arr, 0);
+ auto kth_begin = indices_begin + k_;
+ if (kth_begin > end_iter) {
+ kth_begin = end_iter;
+ }
+ std::function<bool(uint64_t, uint64_t)> cmp;
+ if (order_ == SortOrder::Ascending) {
Review comment:
(Otherwise, why generate separate code for both cases in so many places?)
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1778,6 +1784,711 @@ class SortIndicesMetaFunction : public MetaFunction {
}
};
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+using SelectKOptionsState = internal::OptionsWrapper<SelectKOptions>;
+const auto kDefaultTopKOptions = SelectKOptions::TopKDefault();
+const auto kDefaultBottomKOptions = SelectKOptions::BottomKDefault();
+
+const FunctionDoc top_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+const FunctionDoc bottom_k_doc(
+ "Return the indices that would partition an array array, record batch or
table\n"
+ "around a pivot",
+ ("@TODO"), {"input", "k"}, "PartitionNthOptions");
+
+Result<std::shared_ptr<ArrayData>> MakeMutableArrayForFixedSizedType(
+ std::shared_ptr<DataType> out_type, int64_t length, MemoryPool*
memory_pool) {
+ auto buffer_size = BitUtil::BytesForBits(
+ length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());
+ std::vector<std::shared_ptr<Buffer>> buffers(2);
+ ARROW_ASSIGN_OR_RAISE(buffers[1], AllocateResizableBuffer(buffer_size,
memory_pool));
+ auto out = std::make_shared<ArrayData>(out_type, length, buffers, 0);
+ return out;
+}
+
+class ArraySelecter : public TypeVisitor {
+ public:
+ ArraySelecter(ExecContext* ctx, const Array& array, int64_t k, const
SortOrder order,
+ Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ array_(array),
+ k_(k),
+ physical_type_(GetPhysicalType(array.type())),
+ order_(order),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return SelectKthInternal<TYPE>(); }
+
+ VISIT_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType>
+ Status SelectKthInternal() {
+ using GetView = GetViewType<InType>;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+ ArrayType arr(array_.data());
+ std::vector<uint64_t> indices(arr.length());
+
+ uint64_t* indices_begin = indices.data();
+ uint64_t* indices_end = indices_begin + indices.size();
+ std::iota(indices_begin, indices_end, 0);
+ if (k_ > arr.length()) {
+ k_ = arr.length();
+ }
+ auto end_iter = PartitionNulls<ArrayType,
NonStablePartitioner>(indices_begin,
+
indices_end, arr, 0);
+ auto kth_begin = indices_begin + k_;
+ if (kth_begin > end_iter) {
+ kth_begin = end_iter;
+ }
+ std::function<bool(uint64_t, uint64_t)> cmp;
+ if (order_ == SortOrder::Ascending) {
+ cmp = [&arr](uint64_t left, uint64_t right) -> bool {
+ const auto lval = GetView::LogicalValue(arr.GetView(left));
+ const auto rval = GetView::LogicalValue(arr.GetView(right));
+ return lval < rval;
+ };
+ } else {
+ cmp = [&arr](uint64_t left, uint64_t right) -> bool {
+ const auto lval = GetView::LogicalValue(arr.GetView(left));
+ const auto rval = GetView::LogicalValue(arr.GetView(right));
+ return rval < lval;
+ };
+ }
+ arrow::internal::Heap<uint64_t, decltype(cmp)> heap(cmp);
+ uint64_t* iter = indices_begin;
+ for (; iter != kth_begin && heap.Size() < static_cast<size_t>(k_); ++iter)
{
Review comment:
Isn't the second condition redundant because of how kth_begin is defined
above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]