kou commented on a change in pull request #8612:
URL: https://github.com/apache/arrow/pull/8612#discussion_r526630171



##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +499,644 @@ void AddSortingKernels(VectorKernel base, 
VectorFunction* func) {
   }
 }
 
+// Sort a chunked array directly without sorting each array in the
+// chunked array. This is used for processing the second and following
+// sort keys in TableRadixSorter.
+//
+// This uses the same algorithm as ArrayCompareSorter.
+template <typename Type>
+class ChunkedArrayCompareSorter {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+
+ public:
+  // Returns where null starts.
+  uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end,
+                 std::vector<const Array*>& arrays, int64_t null_count,
+                 const ArraySortOptions& options) {
+    auto nulls_begin = PartitionNulls<ArrayType, StablePartitioner>(
+        indices_begin, indices_end, arrays, null_count);
+    if (options.order == SortOrder::Ascending) {
+      std::stable_sort(indices_begin, nulls_begin,
+                       [&arrays](uint64_t left, uint64_t right) {
+                         const auto chunk_left = 
ResolveChunk<ArrayType>(arrays, left);
+                         const auto chunk_right = 
ResolveChunk<ArrayType>(arrays, right);
+                         return chunk_left.GetView() < chunk_right.GetView();
+                       });
+    } else {
+      std::stable_sort(indices_begin, nulls_begin,
+                       [&arrays](uint64_t left, uint64_t right) {
+                         const auto chunk_left = 
ResolveChunk<ArrayType>(arrays, left);
+                         const auto chunk_right = 
ResolveChunk<ArrayType>(arrays, right);
+                         // We don't use 'left > right' here to reduce 
required operator.
+                         // If we use 'right < left' here, '<' is only 
required.
+                         return chunk_right.GetView() < chunk_left.GetView();
+                       });
+    }
+    return nulls_begin;
+  }
+};
+
+// Sort a chunked array by sorting each array in the chunked array.
+//
+// TODO: This is a naive implementation. We'll be able to improve
+// performance of this. For example, we'll be able to use threads for
+// sorting each array.
+class ChunkedArraySorter : public TypeVisitor {
+ public:
+  ChunkedArraySorter(uint64_t* indices_begin, uint64_t* indices_end,
+                     const ChunkedArray& chunked_array, const SortOrder order,
+                     bool can_use_array_sorter = true)
+      : TypeVisitor(),
+        indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        chunked_array_(chunked_array),
+        order_(order),
+        can_use_array_sorter_(can_use_array_sorter) {}
+
+  Status Sort() { return chunked_array_.type()->Accept(this); }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE##Type& type) override { return 
SortInternal<TYPE##Type>(); }
+
+  VISIT(Int8)
+  VISIT(Int16)
+  VISIT(Int32)
+  VISIT(Int64)
+  VISIT(UInt8)
+  VISIT(UInt16)
+  VISIT(UInt32)
+  VISIT(UInt64)
+  VISIT(Float)
+  VISIT(Double)
+  VISIT(String)
+  VISIT(Binary)
+  VISIT(LargeString)
+  VISIT(LargeBinary)
+
+#undef VISIT
+
+ private:
+  template <typename Type>
+  Status SortInternal() {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    ArraySortOptions options(order_);
+    const auto num_chunks = chunked_array_.num_chunks();
+    const auto& shared_arrays = chunked_array_.chunks();
+    std::vector<const Array*> arrays(num_chunks);
+    for (int i = 0; i < num_chunks; ++i) {
+      const auto& array = shared_arrays[i];
+      arrays[i] = array.get();
+    }
+    if (can_use_array_sorter_) {
+      // Sort each chunk from the beginning and merge to sorted indices.
+      // This is a naive implementation.
+      ArraySorter<Type> sorter;
+      int64_t begin_offset = 0;
+      int64_t end_offset = 0;
+      int64_t null_count = 0;
+      uint64_t* left_nulls_begin = indices_begin_;
+      for (int i = 0; i < num_chunks; ++i) {
+        const auto array = static_cast<const ArrayType*>(arrays[i]);
+        end_offset += array->length();
+        null_count += array->null_count();
+        uint64_t* right_nulls_begin;
+        right_nulls_begin =
+            sorter.impl.Sort(indices_begin_ + begin_offset, indices_begin_ + 
end_offset,
+                             *array, begin_offset, options);
+        if (i > 0) {
+          left_nulls_begin = Merge<ArrayType>(
+              indices_begin_, indices_begin_ + begin_offset, indices_begin_ + 
end_offset,
+              left_nulls_begin, right_nulls_begin, arrays, null_count, order_);
+        } else {
+          left_nulls_begin = right_nulls_begin;
+        }
+        begin_offset = end_offset;
+      }
+    } else {
+      // Sort the chunked array directory.
+      ChunkedArrayCompareSorter<Type> sorter;
+      sorter.Sort(indices_begin_, indices_end_, arrays, 
chunked_array_.null_count(),
+                  options);
+    }
+    return Status::OK();
+  }
+
+  // Merges two sorted indices arrays and returns where nulls starts.
+  // Where nulls starts is used when the next merge to detect the
+  // sorted indices locations.
+  template <typename ArrayType>
+  uint64_t* Merge(uint64_t* indices_begin, uint64_t* indices_middle,
+                  uint64_t* indices_end, uint64_t* left_nulls_begin,
+                  uint64_t* right_nulls_begin, std::vector<const Array*> 
arrays,
+                  int64_t null_count, const SortOrder order) {
+    auto left_num_non_nulls = left_nulls_begin - indices_begin;
+    auto right_num_non_nulls = right_nulls_begin - indices_middle;
+    auto nulls_begin = PartitionNulls<ArrayType, StablePartitioner>(
+        indices_begin, indices_end, arrays, null_count);
+    indices_middle = indices_begin + left_num_non_nulls;
+    indices_end = indices_middle + right_num_non_nulls;
+    if (order == SortOrder::Ascending) {
+      std::inplace_merge(indices_begin, indices_middle, indices_end,
+                         [&arrays](uint64_t left, uint64_t right) {
+                           const auto chunk_left = 
ResolveChunk<ArrayType>(arrays, left);
+                           const auto chunk_right =
+                               ResolveChunk<ArrayType>(arrays, right);
+                           return chunk_left.GetView() < chunk_right.GetView();
+                         });
+    } else {
+      std::inplace_merge(indices_begin, indices_middle, indices_end,
+                         [&arrays](uint64_t left, uint64_t right) {
+                           const auto chunk_left = 
ResolveChunk<ArrayType>(arrays, left);
+                           const auto chunk_right =
+                               ResolveChunk<ArrayType>(arrays, right);
+                           // We don't use 'left > right' here to reduce 
required
+                           // operator. If we use 'right < left' here, '<' is 
only
+                           // required.
+                           return chunk_right.GetView() < chunk_left.GetView();
+                         });
+    }
+    return nulls_begin;
+  }
+
+  uint64_t* indices_begin_;
+  uint64_t* indices_end_;
+  const ChunkedArray& chunked_array_;
+  const SortOrder order_;
+  const bool can_use_array_sorter_;
+};
+
+// Sort a table from the back like radix sort.
+class TableRadixSorter {
+ public:
+  Status Sort(uint64_t* indices_begin, uint64_t* indices_end, const Table& 
table,
+              const SortOptions& options) {
+    for (auto i = options.sort_keys.size(); i > 0; --i) {
+      const auto& sort_key = options.sort_keys[i - 1];
+      const auto& chunked_array = table.GetColumnByName(sort_key.name);
+      if (!chunked_array) {
+        return Status::Invalid("Nonexistent sort key column: ", sort_key.name);
+      }
+      // We can use ArraySorter only for the sort key that is
+      // processed first because ArraySorter doesn't care about
+      // existing indices.
+      const auto can_use_array_sorter = (i == 0);
+      ChunkedArraySorter sorter(indices_begin, indices_end, 
*chunked_array.get(),
+                                sort_key.order, can_use_array_sorter);
+      ARROW_RETURN_NOT_OK(sorter.Sort());
+    }
+    return Status::OK();
+  }
+};
+
+// Sort a table from the beginning.
+class TableFromTheBeginningSorter : public TypeVisitor {

Review comment:
       Can anyone suggest better name...?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to