westonpace commented on code in PR #13669:
URL: https://github.com/apache/arrow/pull/13669#discussion_r943999661


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -348,11 +349,13 @@ class ARROW_EXPORT ExecBatchBuilder {
   ExecBatch Flush();
 
   int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
+  bool is_full() const { return num_rows() == num_rows_max(); }
 
   static int num_rows_max() { return 1 << kLogNumRows; }
 
  private:
   static constexpr int kLogNumRows = 15;
+  static constexpr int64_t kAlignment = 512;

Review Comment:
   I'm not quite sure I understand when this is used vs `alignment_`.  Also, 
can you add a comment or clarify in some way how this is different than 
`MemoryPool::kDefaultAlignment` and what the purpose is for the constant?



##########
cpp/src/arrow/memory_pool.h:
##########
@@ -71,21 +73,25 @@ class ARROW_EXPORT MemoryPool {
   /// Allocate a new memory region of at least size bytes.
   ///
   /// The allocated region shall be 64-byte aligned.
-  virtual Status Allocate(int64_t size, uint8_t** out) = 0;
+  Status Allocate(int64_t size, uint8_t** out) { return Allocate(size, 
kDefaultAlignment, out); }

Review Comment:
   I may be reading this wrong but it looks like you added alignment support 
for jemalloc but not any of the other allocators.  Is that correct?  Is this a 
todo or am I missing something?



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/interfaces.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/exec/task_util.h"
+
+#pragma once
+
+namespace arrow
+{
+    namespace internal
+    {
+        class CpuInfo;
+    }
+
+    using io::IOContext;
+    namespace compute
+    {
+        struct ARROW_EXPORT QueryOptions
+        {
+            QueryOptions();
+            // 0 means unlimited
+            size_t max_memory_bytes;
+
+            /// \brief Should the plan use a legacy batching strategy
+            ///
+            /// This is currently in place only to support the Scanner::ToTable
+            /// method.  This method relies on batch indices from the scanner
+            /// remaining consistent.  This is impractical in the ExecPlan 
which
+            /// might slice batches as needed (e.g. for a join)
+            ///
+            /// However, it still works for simple plans and this is the only 
way
+            /// we have at the moment for maintaining implicit order.
+            bool use_legacy_batching;
+        };
+
+        class ARROW_EXPORT QueryContext
+        {
+        public:
+        QueryContext(
+            QueryOptions opts = {},
+            ExecContext exec_context = *default_exec_context());
+
+            Status Init(size_t max_num_threads);
+
+            const ::arrow::internal::CpuInfo *cpu_info() const;
+            const QueryOptions &options() const { return options_; }
+            MemoryPool *memory_pool() const { return 
exec_context_.memory_pool(); }
+            ::arrow::internal::Executor *executor() const { return 
exec_context_.executor(); }
+            ExecContext *exec_context() { return &exec_context_; }
+            IOContext *io_context() { return &io_context_; }
+            TaskScheduler *scheduler() { return task_scheduler_.get(); }
+            util::AsyncTaskGroup *task_group() { return &task_group_; }
+
+            size_t GetThreadIndex();
+            size_t max_concurrency() const;
+            Result<util::TempVectorStack *> GetTempStack(size_t thread_index);
+
+            /// \brief Start an external task
+            ///
+            /// This should be avoided if possible.  It is kept in for now for 
legacy
+            /// purposes.  This should be called before the external task is 
started.  If
+            /// a valid future is returned then it should be marked complete 
when the
+            /// external task has finished.
+            ///
+            /// \return an invalid future if the plan has already ended, 
otherwise this
+            ///         returns a future that must be completed when the 
external task
+            ///         finishes.
+            Result<Future<>> BeginExternalTask();
+
+            /// \brief Add a single function as a task to the query's task 
group
+            ///        on the compute threadpool.
+            ///
+            /// \param fn The task to run. Takes no arguments and returns a 
Status.
+            Status ScheduleTask(std::function<Status()> fn);
+            /// \brief Add a single function as a task to the query's task 
group
+            ///        on the compute threadpool.
+            ///
+            /// \param fn The task to run. Takes the thread index and returns 
a Status.
+            Status ScheduleTask(std::function<Status(size_t)> fn);
+            /// \brief Add a single function as a task to the query's task 
group on
+            ///        the IO thread pool
+            ///
+            /// \param fn The task to run. Returns a status.
+            Status ScheduleIOTask(std::function<Status()> fn);
+
+            // Register/Start TaskGroup is a way of performing a "Parallel 
For" pattern:
+            // - The task function takes the thread index and the index of the 
task
+            // - The on_finished function takes the thread index
+            // Returns an integer ID that will be used to reference the task 
group in
+            // StartTaskGroup. At runtime, call StartTaskGroup with the ID and 
the number of times
+            // you'd like the task to be executed. The need to register a task 
group before use will
+            // be removed after we rewrite the scheduler.
+            /// \brief Register a "parallel for" task group with the scheduler
+            ///
+            /// \param task The function implementing the task. Takes the 
thread_index and
+            ///             the task index.
+            /// \param on_finished The function that gets run once all tasks 
have been completed.
+            /// Takes the thread_index.
+            ///
+            /// Must be called inside of ExecNode::Init.
+            int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                                  std::function<Status(size_t)> on_finished);
+
+            /// \brief Start the task group with the specified ID. This can 
only
+            ///        be called once per task_group_id.
+            ///
+            /// \param task_group_id The ID  of the task group to run
+            /// \param num_tasks The number of times to run the task
+            Status StartTaskGroup(int task_group_id, int64_t num_tasks);
+
+            struct TempFileIOMark
+            {
+                QueryContext *ctx_;
+                size_t bytes_;
+                
+                TempFileIOMark(QueryContext *ctx, size_t bytes)
+                    :
+                    ctx_(ctx),
+                    bytes_(bytes)
+                {
+                    ctx_->in_flight_bytes_to_disk_ += bytes_;
+                }
+
+                ~TempFileIOMark()
+                {
+                    ctx_->in_flight_bytes_to_disk_ -= bytes_;
+                }
+            };
+
+            TempFileIOMark ReportTempFileIO(size_t bytes)
+            {
+                return { this, bytes };
+            }
+
+        private:
+            QueryOptions options_;
+            // To be replaced with Acero-specific context once scheduler is 
done and
+            // we don't need ExecContext for kernels

Review Comment:
   We can mostly keep todos like this in JIRAs I think unless trying to explain 
away something weird or unusual (which this is not).  Although I am probably 
the worst offender for rambling comments.



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/interfaces.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/exec/task_util.h"
+
+#pragma once
+
+namespace arrow
+{
+    namespace internal
+    {
+        class CpuInfo;
+    }
+
+    using io::IOContext;
+    namespace compute
+    {
+        struct ARROW_EXPORT QueryOptions
+        {
+            QueryOptions();
+            // 0 means unlimited
+            size_t max_memory_bytes;
+
+            /// \brief Should the plan use a legacy batching strategy
+            ///
+            /// This is currently in place only to support the Scanner::ToTable
+            /// method.  This method relies on batch indices from the scanner
+            /// remaining consistent.  This is impractical in the ExecPlan 
which
+            /// might slice batches as needed (e.g. for a join)
+            ///
+            /// However, it still works for simple plans and this is the only 
way
+            /// we have at the moment for maintaining implicit order.
+            bool use_legacy_batching;
+        };
+
+        class ARROW_EXPORT QueryContext
+        {
+        public:
+        QueryContext(
+            QueryOptions opts = {},
+            ExecContext exec_context = *default_exec_context());
+
+            Status Init(size_t max_num_threads);
+
+            const ::arrow::internal::CpuInfo *cpu_info() const;
+            const QueryOptions &options() const { return options_; }
+            MemoryPool *memory_pool() const { return 
exec_context_.memory_pool(); }
+            ::arrow::internal::Executor *executor() const { return 
exec_context_.executor(); }
+            ExecContext *exec_context() { return &exec_context_; }
+            IOContext *io_context() { return &io_context_; }

Review Comment:
   At first this surprised me since `io_context` is really only needed by the 
scan & write nodes but then I remembered we were doing spilling :).  So this is 
the `io_context` used to spill to disk then?



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -348,11 +349,13 @@ class ARROW_EXPORT ExecBatchBuilder {
   ExecBatch Flush();
 
   int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
+  bool is_full() const { return num_rows() == num_rows_max(); }
 
   static int num_rows_max() { return 1 << kLogNumRows; }
 
  private:
   static constexpr int kLogNumRows = 15;
+  static constexpr int64_t kAlignment = 512;

Review Comment:
   How confident are we that `512` will work for all disks?  What will happen 
if it doesn't?



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,9 +45,11 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
+  void InsertAt(ExecBatch batch, size_t idx);
   int64_t row_count() { return row_count_; }
   size_t batch_count() { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  void Resize(size_t size) { batches_.resize(size); }

Review Comment:
   What are the semantics / expectations here?  Is this more like 
`std::vector::reserve` or `std::vector::resize`?  Would be good to add a 
comment pointing out that default-constructed (empty? invalid?) batches will 
fill the newly added space.  Is the newly added space usable?



##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -15,13 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "arrow/util/atomic_util.h"

Review Comment:
   Where is this file?  I can't seem to find it.



##########
cpp/src/arrow/util/io_util.h:
##########
@@ -410,5 +410,11 @@ uint64_t GetThreadId();
 ARROW_EXPORT
 int64_t GetCurrentRSS();
 
+/// \brief Get the total memory available to the system in bytes
+///
+/// This function supports Windows, Linux, and Mac and will return 0 otherwise
+ARROW_EXPORT
+int64_t GetTotalMemoryBytes();

Review Comment:
   Can we get a quick unit test on this?  Just something to make sure we get a 
value > 0 is probably sufficient.  That way we can know quickly if we add some 
CI environment where this isn't supported.



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/interfaces.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/exec/task_util.h"
+
+#pragma once
+
+namespace arrow
+{
+    namespace internal
+    {
+        class CpuInfo;
+    }
+
+    using io::IOContext;
+    namespace compute
+    {
+        struct ARROW_EXPORT QueryOptions
+        {
+            QueryOptions();
+            // 0 means unlimited
+            size_t max_memory_bytes;
+
+            /// \brief Should the plan use a legacy batching strategy
+            ///
+            /// This is currently in place only to support the Scanner::ToTable
+            /// method.  This method relies on batch indices from the scanner
+            /// remaining consistent.  This is impractical in the ExecPlan 
which
+            /// might slice batches as needed (e.g. for a join)
+            ///
+            /// However, it still works for simple plans and this is the only 
way
+            /// we have at the moment for maintaining implicit order.
+            bool use_legacy_batching;
+        };
+
+        class ARROW_EXPORT QueryContext
+        {
+        public:
+        QueryContext(
+            QueryOptions opts = {},
+            ExecContext exec_context = *default_exec_context());
+
+            Status Init(size_t max_num_threads);
+
+            const ::arrow::internal::CpuInfo *cpu_info() const;
+            const QueryOptions &options() const { return options_; }
+            MemoryPool *memory_pool() const { return 
exec_context_.memory_pool(); }
+            ::arrow::internal::Executor *executor() const { return 
exec_context_.executor(); }
+            ExecContext *exec_context() { return &exec_context_; }
+            IOContext *io_context() { return &io_context_; }
+            TaskScheduler *scheduler() { return task_scheduler_.get(); }
+            util::AsyncTaskGroup *task_group() { return &task_group_; }
+
+            size_t GetThreadIndex();

Review Comment:
   This would not have been my first guess where to find this method.  However, 
I think the reasoning is that we need the executor to get this info?  Or is 
this just because we are still using the old `ThreadIndexer` approach?  Ideally 
I guess I would like to someday see this as a base method in `ExecNode`?  Does 
that make sense?



##########
cpp/src/arrow/compute/exec/query_context.h:
##########
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/interfaces.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/exec/task_util.h"
+
+#pragma once
+
+namespace arrow
+{
+    namespace internal
+    {
+        class CpuInfo;
+    }
+
+    using io::IOContext;
+    namespace compute
+    {
+        struct ARROW_EXPORT QueryOptions
+        {
+            QueryOptions();
+            // 0 means unlimited
+            size_t max_memory_bytes;
+
+            /// \brief Should the plan use a legacy batching strategy
+            ///
+            /// This is currently in place only to support the Scanner::ToTable
+            /// method.  This method relies on batch indices from the scanner
+            /// remaining consistent.  This is impractical in the ExecPlan 
which
+            /// might slice batches as needed (e.g. for a join)
+            ///
+            /// However, it still works for simple plans and this is the only 
way
+            /// we have at the moment for maintaining implicit order.
+            bool use_legacy_batching;
+        };
+
+        class ARROW_EXPORT QueryContext
+        {
+        public:
+        QueryContext(
+            QueryOptions opts = {},
+            ExecContext exec_context = *default_exec_context());
+
+            Status Init(size_t max_num_threads);
+
+            const ::arrow::internal::CpuInfo *cpu_info() const;
+            const QueryOptions &options() const { return options_; }
+            MemoryPool *memory_pool() const { return 
exec_context_.memory_pool(); }
+            ::arrow::internal::Executor *executor() const { return 
exec_context_.executor(); }
+            ExecContext *exec_context() { return &exec_context_; }
+            IOContext *io_context() { return &io_context_; }
+            TaskScheduler *scheduler() { return task_scheduler_.get(); }
+            util::AsyncTaskGroup *task_group() { return &task_group_; }
+
+            size_t GetThreadIndex();
+            size_t max_concurrency() const;
+            Result<util::TempVectorStack *> GetTempStack(size_t thread_index);
+
+            /// \brief Start an external task
+            ///
+            /// This should be avoided if possible.  It is kept in for now for 
legacy
+            /// purposes.  This should be called before the external task is 
started.  If
+            /// a valid future is returned then it should be marked complete 
when the
+            /// external task has finished.
+            ///
+            /// \return an invalid future if the plan has already ended, 
otherwise this
+            ///         returns a future that must be completed when the 
external task
+            ///         finishes.
+            Result<Future<>> BeginExternalTask();
+
+            /// \brief Add a single function as a task to the query's task 
group
+            ///        on the compute threadpool.
+            ///
+            /// \param fn The task to run. Takes no arguments and returns a 
Status.
+            Status ScheduleTask(std::function<Status()> fn);
+            /// \brief Add a single function as a task to the query's task 
group
+            ///        on the compute threadpool.
+            ///
+            /// \param fn The task to run. Takes the thread index and returns 
a Status.
+            Status ScheduleTask(std::function<Status(size_t)> fn);
+            /// \brief Add a single function as a task to the query's task 
group on
+            ///        the IO thread pool
+            ///
+            /// \param fn The task to run. Returns a status.
+            Status ScheduleIOTask(std::function<Status()> fn);
+
+            // Register/Start TaskGroup is a way of performing a "Parallel 
For" pattern:
+            // - The task function takes the thread index and the index of the 
task
+            // - The on_finished function takes the thread index
+            // Returns an integer ID that will be used to reference the task 
group in
+            // StartTaskGroup. At runtime, call StartTaskGroup with the ID and 
the number of times
+            // you'd like the task to be executed. The need to register a task 
group before use will
+            // be removed after we rewrite the scheduler.
+            /// \brief Register a "parallel for" task group with the scheduler
+            ///
+            /// \param task The function implementing the task. Takes the 
thread_index and
+            ///             the task index.
+            /// \param on_finished The function that gets run once all tasks 
have been completed.
+            /// Takes the thread_index.
+            ///
+            /// Must be called inside of ExecNode::Init.
+            int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                                  std::function<Status(size_t)> on_finished);
+
+            /// \brief Start the task group with the specified ID. This can 
only
+            ///        be called once per task_group_id.
+            ///
+            /// \param task_group_id The ID  of the task group to run
+            /// \param num_tasks The number of times to run the task
+            Status StartTaskGroup(int task_group_id, int64_t num_tasks);
+
+            struct TempFileIOMark
+            {
+                QueryContext *ctx_;
+                size_t bytes_;
+                
+                TempFileIOMark(QueryContext *ctx, size_t bytes)
+                    :
+                    ctx_(ctx),
+                    bytes_(bytes)
+                {
+                    ctx_->in_flight_bytes_to_disk_ += bytes_;
+                }
+
+                ~TempFileIOMark()
+                {
+                    ctx_->in_flight_bytes_to_disk_ -= bytes_;
+                }
+            };
+
+            TempFileIOMark ReportTempFileIO(size_t bytes)
+            {
+                return { this, bytes };
+            }

Review Comment:
   This could use some comments / explanation.



##########
cpp/src/arrow/compute/exec/spilling_util.h:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/exec/exec_plan.h"
+
+namespace arrow
+{
+    namespace compute
+    {
+#ifdef _WIN32
+        using FileHandle = void *;
+        extern const FileHandle kInvalidHandle;
+#else
+        using FileHandle = int;
+        constexpr FileHandle kInvalidHandle = -1;
+#endif
+
+        class SpillFile
+        {
+        public:
+
+            // To spill a batch the following must be true:
+            // - Row offset for each column must be 0.
+            // - Column buffers must be aligned to 512 bits
+            // - No column can be a scalar
+            // These assumptions aren't as inconvenient as it seems because
+            // typically batches will be partitioned before being spilled,
+            // meaning the batches will come from ExecBatchBuilder, which
+            // ensures these assumptions hold. 
+            Status SpillBatch(QueryContext *ctx, ExecBatch batch);
+            Status ReadBackBatches(
+                QueryContext *ctx,
+                std::function<Status(size_t, ExecBatch)> fn,
+                std::function<Status(size_t)> on_finished);
+            Status Cleanup();

Review Comment:
   When would I call `Cleanup` and why?



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/query_context.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/accumulation_queue.h"
+
+namespace arrow
+{
+    namespace compute
+    {
+        class SpillingHashJoin
+        {
+        public:
+            using OutputBatchCallback = std::function<void(int64_t, 
ExecBatch)>;
+            using BuildFinishedCallback = std::function<Status(size_t)>;
+            using FinishedCallback = std::function<void(int64_t)>;
+            using RegisterTaskGroupCallback = std::function<int(
+                std::function<Status(size_t, int64_t)>, 
std::function<Status(size_t)>)>;
+            using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
+            using PauseProbeSideCallback = std::function<void(int)>;
+            using ResumeProbeSideCallback = std::function<void(int)>;
+            using AbortContinuationImpl = std::function<void()>;
+
+            struct CallbackRecord
+            {
+                OutputBatchCallback output_batch_callback;
+                BuildFinishedCallback build_finished_callback;
+                FinishedCallback finished_callback;
+                RegisterTaskGroupCallback register_task_group_;
+                StartTaskGroupCallback start_task_group_callback;
+                PauseProbeSideCallback pause_probe_side_callback;
+                AbortContinuationImpl abort_callback;
+            };

Review Comment:
   Can we just use a pure virtual class at this point?
   
   ```
   class HashJoinExternals {
     virtual void OutputBatch(int64_t, ExecBatch) = 0;
     // ...
   };
   ```



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,9 +45,11 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
+  void InsertAt(ExecBatch batch, size_t idx);

Review Comment:
   If I understand this method correctly it replaces the batch at index `idx`.  
That was a little surprising since I think of `AccumulationQueue` as a vector 
and `std::vector` has an `insert` method that pushes elements to the right.  
Can you change this to a name like `Replace` or `Set`?  Or at least add a brief 
doc comment.



##########
cpp/src/arrow/util/atomic_util.h:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <type_traits>
+
+namespace arrow
+{
+    namespace util
+    {
+#if defined(__clang) || defined(__GNUC__)
+        template <typename T>
+        inline T AtomicLoad(T *addr, std::memory_order order = 
std::memory_order_seq_cst) noexcept
+        {
+            T ret;
+            __atomic_load(addr, &ret, order);
+            return ret;
+        }
+
+        template <typename T>
+        inline void AtomicStore(T *addr, T &val, std::memory_order order = 
std::memory_order_seq_cst) noexcept
+        {
+            __atomic_store(addr, val, order);
+        }
+
+        template <typename T>
+        inline T AtomicFetchAdd(T *addr, T &val, std::memory_order order = 
std::memory_order_seq_cst) noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchAdd can only 
be used on integral types");
+            return __atomic_fetch_add(addr, val, order);
+        }
+
+        template <typename T>
+        inline T AtomicFetchSub(T *addr, T &val, std::memory_order order = 
std::memory_order_seq_cst) noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchSub can only 
be used on integral types");
+            return __atomic_fetch_sub(addr, val, order);
+        }
+
+#elif defined(_MSC_VER)
+        #include <intrin.h>
+        template <typename T>
+        inline T AtomicLoad(T *addr, std::memory_order /*order*/) noexcept
+        {
+            T val = *addr;
+            _ReadWriteBarrier();
+            return val;
+        }
+
+        template <typename T>
+        inline void AtomicStore(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            _ReadWriteBarrier();
+            *addr = val;
+        }
+
+        template <typename T, typename std::enable_if<sizeof(T) == 1>::type>
+        inline T AtomicFetchAdd(T *addr, T &val, std::memory_order /*order*/) 
noexcepet
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchAdd can only 
be used on integral types");
+            return _InterlockedExchangeAdd8(addr, val);
+        }
+
+        template <typename T, typename std::enable_if<sizeof(T) == 2>::type>
+        inline T AtomicFetchAdd(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchAdd can only 
be used on integral types");
+            return _InterlockedExchangeAdd16(addr, val);
+        }
+
+        template <typename T, typename std::enable_if<sizeof(T) == 4>::type>
+        inline T AtomicFetchAdd(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchAdd can only 
be used on integral types");
+            return _InterlockedExchangeAdd(addr, val);
+        }
+
+#if _WIN64
+        template <typename T, typename std::enable_if<sizeof(T) == 8>::type>
+        inline T AtomicFetchAdd(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchAdd can only 
be used on integral types");
+            return _InterlockedExchangeAdd64(addr, val);
+        }
+#else
+        template <typename T, typename std::enable_if<sizeof(T) == 8>::type>
+        inline T AtomicFetchAdd(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchAdd can only 
be used on integral types");
+            _ReadWriteBarrier();
+            T expected = *addr;
+            for(;;)
+            {
+                T new_val = expected + val;
+                T prev = _InterlockedCompareExchange64(addr, new_val, 
expected);
+                if(prev == expected)
+                    return prev;
+                expected = prev;
+            }
+        }
+#endif
+
+        template <typename T, typename std::enable_if<sizeof(T) == 1>::type>
+        inline T AtomicFetchSub(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchSub can only 
be used on integral types");
+            return _InterlockedExchangeAdd8(addr, -val);
+        }
+
+        template <typename T, typename std::enable_if<sizeof(T) == 2>::type>
+        inline T AtomicFetchSub(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchSub can only 
be used on integral types");
+            return _InterlockedExchangeAdd16(addr, -val);
+        }
+
+        template <typename T, typename std::enable_if<sizeof(T) == 4>::type>
+        inline T AtomicFetchSub(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchSub can only 
be used on integral types");
+            return _InterlockedExchangeAdd(addr, -val);
+        }
+
+#if _WIN64
+        template <typename T, typename std::enable_if<sizeof(T) == 8>::type>
+        inline T AtomicFetchSub(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchSub can only 
be used on integral types");
+            return _InterlockedExchangeAdd64(addr, -val);
+        }
+#else
+        template <typename T, typename std::enable_if<sizeof(T) == 8>::type>
+        inline T AtomicFetchSub(T *addr, T &val, std::memory_order /*order*/) 
noexcept
+        {
+            static_assert(std::is_integral<T>::value, "AtomicFetchSub can only 
be used on integral types");
+            return AtomicFetchAdd(addr, -val, std::memory_order_seq_cst);
+        }        
+#endif
+#endif

Review Comment:
   Why do you need these instead of `std::atomic`?



##########
cpp/src/arrow/compute/exec/spilling_util.h:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/exec/exec_plan.h"
+
+namespace arrow
+{
+    namespace compute
+    {
+#ifdef _WIN32
+        using FileHandle = void *;
+        extern const FileHandle kInvalidHandle;
+#else
+        using FileHandle = int;
+        constexpr FileHandle kInvalidHandle = -1;
+#endif
+
+        class SpillFile
+        {
+        public:
+
+            // To spill a batch the following must be true:
+            // - Row offset for each column must be 0.
+            // - Column buffers must be aligned to 512 bits
+            // - No column can be a scalar
+            // These assumptions aren't as inconvenient as it seems because
+            // typically batches will be partitioned before being spilled,
+            // meaning the batches will come from ExecBatchBuilder, which
+            // ensures these assumptions hold. 
+            Status SpillBatch(QueryContext *ctx, ExecBatch batch);
+            Status ReadBackBatches(
+                QueryContext *ctx,
+                std::function<Status(size_t, ExecBatch)> fn,
+                std::function<Status(size_t)> on_finished);

Review Comment:
   Can you add a comment explaining this method (or really the whole class)?  
Does `fn` get called multiple times?  Is there a batch size of some kind?



##########
cpp/src/arrow/compute/exec/spilling_util.h:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/exec/exec_plan.h"

Review Comment:
   Can we follow `IWYU` here and include `vector`, `atomic`, `functional`, 
`memory`, and `query_context.h`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to