zanmato1984 commented on code in PR #40484:
URL: https://github.com/apache/arrow/pull/40484#discussion_r1539752003


##########
cpp/src/arrow/compute/util.cc:
##########
@@ -32,10 +32,10 @@ using internal::CpuInfo;
 namespace util {
 
 void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) {
-  int64_t new_top = top_ + PaddedAllocationSize(num_bytes) + 2 * 
sizeof(uint64_t);
-  // Stack overflow check (see GH-39582).
+  const auto estimate_size = EstimateAllocSize(num_bytes);
   // XXX cannot return a regular Status because most consumers do not either.
-  ARROW_CHECK_LE(new_top, buffer_size_) << "TempVectorStack::alloc overflow";
+  ARROW_DCHECK_OK(CheckAllocOverflow(estimate_size));

Review Comment:
   We probably should use `ARROW_CHECK_OK` here?



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;

Review Comment:
   The scope of these three variables doesn't have to be this function, right? 
We can put them into the `if` statement below?



##########
cpp/src/arrow/compute/util.h:
##########
@@ -89,16 +89,22 @@ class ARROW_EXPORT TempVectorStack {
   Status Init(MemoryPool* pool, int64_t size) {
     num_vectors_ = 0;
     top_ = 0;
-    buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * 
sizeof(uint64_t);
+    buffer_size_ = EstimateAllocSize(size);
     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
     // Ensure later operations don't accidentally read uninitialized memory.
     std::memset(buffer->mutable_data(), 0xFF, size);
     buffer_ = std::move(buffer);
     return Status::OK();
   }
 
+  static int64_t EstimateAllocSize(int64_t size) {
+    return PaddedAllocationSize(size) + 2 * sizeof(uint64_t);
+  }
+
+  Status CheckAllocOverflow(int64_t alloc_size);
+
  private:
-  int64_t PaddedAllocationSize(int64_t num_bytes) {
+  static int64_t PaddedAllocationSize(int64_t num_bytes) {

Review Comment:
   Maybe we can align all the function names to either `XxxAllocSize` or 
`XxxAllocationSize`?



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
+  if (!ctx->stack) {

Review Comment:
   ```suggestion
     if (!stack) {
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);

Review Comment:
   ```suggestion
     auto stack = ctx->stack;
     std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -460,6 +479,11 @@ void Hashing32::HashMultiColumn(const 
std::vector<KeyColumnArray>& cols,
 
     first_row += batch_size_next;
   }
+
+  if (temp_stack) {
+    ctx->stack = nullptr;
+  }

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
+  if (!ctx->stack) {
+    temp_stack = std::make_shared<util::TempVectorStack>();
+    RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
+    ctx->stack = temp_stack.get();
+  } else {
+    RETURN_NOT_OK(ctx->stack->CheckAllocOverflow(alloc_size));
+  }
 
-  auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
max_batch_size);
+  auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
alloc_batch_size);
   uint32_t* hash_temp = hash_temp_buf.mutable_data();
 
-  auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, 
max_batch_size);
+  auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, 
alloc_batch_size);
   uint16_t* null_indices = null_indices_buf.mutable_data();
   int num_null_indices;
 
-  auto null_hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
max_batch_size);
+  auto null_hash_temp_buf =
+      util::TempVectorHolder<uint32_t>(ctx->stack, alloc_batch_size);

Review Comment:
   ```suggestion
         util::TempVectorHolder<uint32_t>(stack, alloc_batch_size);
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
+  if (!ctx->stack) {
+    temp_stack = std::make_shared<util::TempVectorStack>();
+    RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
+    ctx->stack = temp_stack.get();
+  } else {
+    RETURN_NOT_OK(ctx->stack->CheckAllocOverflow(alloc_size));

Review Comment:
   ```suggestion
       RETURN_NOT_OK(stack->CheckAllocOverflow(alloc_size));
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);

Review Comment:
   Maybe it can be more clear if we combine these two lines to `const uint32_t 
max_batch_size = std::min(num_rows, util::MiniBatch::kMiniBatchLength);`.



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
+  if (!ctx->stack) {
+    temp_stack = std::make_shared<util::TempVectorStack>();
+    RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
+    ctx->stack = temp_stack.get();

Review Comment:
   ```suggestion
      stack = temp_stack.get();
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
+  if (!ctx->stack) {
+    temp_stack = std::make_shared<util::TempVectorStack>();
+    RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
+    ctx->stack = temp_stack.get();
+  } else {
+    RETURN_NOT_OK(ctx->stack->CheckAllocOverflow(alloc_size));
+  }
 
-  auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
max_batch_size);
+  auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
alloc_batch_size);

Review Comment:
   ```suggestion
     auto hash_temp_buf = util::TempVectorHolder<uint32_t>(stack, 
alloc_batch_size);
   ```



##########
cpp/src/arrow/compute/key_hash.cc:
##########
@@ -378,24 +378,43 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool 
combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
+Status Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
+  const auto alloc_batch_size = std::min(num_rows, max_batch_size);
+
+  // pre calculate alloc size in TempVectorStack for hash_temp_buf, 
null_hash_temp_buf
+  // and null_indices_buf
+  const auto alloc_hash_temp_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint32_t));
+  const auto alloc_for_null_indices_buf =
+      util::TempVectorStack::EstimateAllocSize(alloc_batch_size * 
sizeof(uint16_t));
+  const auto alloc_size = alloc_hash_temp_buf * 2 + alloc_for_null_indices_buf;
+
+  std::shared_ptr<util::TempVectorStack> temp_stack(nullptr);
+  if (!ctx->stack) {
+    temp_stack = std::make_shared<util::TempVectorStack>();
+    RETURN_NOT_OK(temp_stack->Init(default_memory_pool(), alloc_size));
+    ctx->stack = temp_stack.get();
+  } else {
+    RETURN_NOT_OK(ctx->stack->CheckAllocOverflow(alloc_size));
+  }
 
-  auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
max_batch_size);
+  auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, 
alloc_batch_size);
   uint32_t* hash_temp = hash_temp_buf.mutable_data();
 
-  auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, 
max_batch_size);
+  auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, 
alloc_batch_size);

Review Comment:
   ```suggestion
     auto null_indices_buf = util::TempVectorHolder<uint16_t>(stack, 
alloc_batch_size);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to