This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e52dea1d90 [VL] Add back RAII style Velox driver suspension into 
RowVectorStream (#8149)
e52dea1d90 is described below

commit e52dea1d90556dc924f631e02ae5fc0198df5fcd
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Dec 5 13:36:19 2024 +0800

    [VL] Add back RAII style Velox driver suspension into RowVectorStream 
(#8149)
---
 cpp/velox/operators/plannodes/RowVectorStream.h | 41 ++++++++++++++-----------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index fcf2ffd15f..63ddd0abe0 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -23,6 +23,27 @@
 #include "velox/exec/Operator.h"
 #include "velox/exec/Task.h"
 
+namespace {
+class SuspendedSection {
+ public:
+  explicit SuspendedSection(facebook::velox::exec::Driver* driver) : 
driver_(driver) {
+    if (driver_->task()->enterSuspended(driver->state()) != 
facebook::velox::exec::StopReason::kNone) {
+      VELOX_FAIL("Terminate detected when entering suspended section");
+    }
+  }
+
+  virtual ~SuspendedSection() {
+    if (driver_->task()->leaveSuspended(driver_->state()) != 
facebook::velox::exec::StopReason::kNone) {
+      LOG(WARNING) << "Terminate detected when leaving suspended section for 
driver " << driver_->driverCtx()->driverId
+                   << " from task " << driver_->task()->taskId();
+    }
+  }
+
+ private:
+  facebook::velox::exec::Driver* const driver_;
+};
+} // namespace
+
 namespace gluten {
 class RowVectorStream {
  public:
@@ -47,16 +68,8 @@ class RowVectorStream {
       // As of now, non-zero running threads usually happens when:
       // 1. Task A spills task B;
       // 2. Task A trys to grow buffers created by task B, during which spill 
is requested on task A again.
-      // facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
-      auto driver = driverCtx_->driver;
-      if (driver->task()->enterSuspended(driver->state()) != 
facebook::velox::exec::StopReason::kNone) {
-        VELOX_FAIL("Terminate detected when entering suspended section");
-      }
+      SuspendedSection ss(driverCtx_->driver);
       hasNext = iterator_->hasNext();
-      if (driver->task()->leaveSuspended(driver->state()) != 
facebook::velox::exec::StopReason::kNone) {
-        LOG(WARNING) << "Terminate detected when leaving suspended section for 
driver " << driver->driverCtx()->driverId
-                     << " from task " << driver->task()->taskId();
-      }
     }
     if (!hasNext) {
       finished_ = true;
@@ -73,16 +86,8 @@ class RowVectorStream {
     {
       // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
       // driver to make the current task open to spilling.
-      // facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
-      auto driver = driverCtx_->driver;
-      if (driver->task()->enterSuspended(driver->state()) != 
facebook::velox::exec::StopReason::kNone) {
-        VELOX_FAIL("Terminate detected when entering suspended section");
-      }
+      SuspendedSection ss(driverCtx_->driver);
       cb = iterator_->next();
-      if (driver->task()->leaveSuspended(driver->state()) != 
facebook::velox::exec::StopReason::kNone) {
-        LOG(WARNING) << "Terminate detected when leaving suspended section for 
driver " << driver->driverCtx()->driverId
-                     << " from task " << driver->task()->taskId();
-      }
     }
     const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, cb);
     auto vp = vb->getRowVector();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to