This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e52dea1d90 [VL] Add back RAII style Velox driver suspension into
RowVectorStream (#8149)
e52dea1d90 is described below
commit e52dea1d90556dc924f631e02ae5fc0198df5fcd
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Dec 5 13:36:19 2024 +0800
[VL] Add back RAII style Velox driver suspension into RowVectorStream
(#8149)
---
cpp/velox/operators/plannodes/RowVectorStream.h | 41 ++++++++++++++-----------
1 file changed, 23 insertions(+), 18 deletions(-)
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index fcf2ffd15f..63ddd0abe0 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -23,6 +23,27 @@
#include "velox/exec/Operator.h"
#include "velox/exec/Task.h"
+namespace {
+class SuspendedSection {
+ public:
+ explicit SuspendedSection(facebook::velox::exec::Driver* driver) :
driver_(driver) {
+ if (driver_->task()->enterSuspended(driver->state()) !=
facebook::velox::exec::StopReason::kNone) {
+ VELOX_FAIL("Terminate detected when entering suspended section");
+ }
+ }
+
+ virtual ~SuspendedSection() {
+ if (driver_->task()->leaveSuspended(driver_->state()) !=
facebook::velox::exec::StopReason::kNone) {
+ LOG(WARNING) << "Terminate detected when leaving suspended section for
driver " << driver_->driverCtx()->driverId
+ << " from task " << driver_->task()->taskId();
+ }
+ }
+
+ private:
+ facebook::velox::exec::Driver* const driver_;
+};
+} // namespace
+
namespace gluten {
class RowVectorStream {
public:
@@ -47,16 +68,8 @@ class RowVectorStream {
// As of now, non-zero running threads usually happens when:
// 1. Task A spills task B;
// 2. Task A trys to grow buffers created by task B, during which spill
is requested on task A again.
- // facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
- auto driver = driverCtx_->driver;
- if (driver->task()->enterSuspended(driver->state()) !=
facebook::velox::exec::StopReason::kNone) {
- VELOX_FAIL("Terminate detected when entering suspended section");
- }
+ SuspendedSection ss(driverCtx_->driver);
hasNext = iterator_->hasNext();
- if (driver->task()->leaveSuspended(driver->state()) !=
facebook::velox::exec::StopReason::kNone) {
- LOG(WARNING) << "Terminate detected when leaving suspended section for
driver " << driver->driverCtx()->driverId
- << " from task " << driver->task()->taskId();
- }
}
if (!hasNext) {
finished_ = true;
@@ -73,16 +86,8 @@ class RowVectorStream {
{
// We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
// driver to make the current task open to spilling.
- // facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
- auto driver = driverCtx_->driver;
- if (driver->task()->enterSuspended(driver->state()) !=
facebook::velox::exec::StopReason::kNone) {
- VELOX_FAIL("Terminate detected when entering suspended section");
- }
+ SuspendedSection ss(driverCtx_->driver);
cb = iterator_->next();
- if (driver->task()->leaveSuspended(driver->state()) !=
facebook::velox::exec::StopReason::kNone) {
- LOG(WARNING) << "Terminate detected when leaving suspended section for
driver " << driver->driverCtx()->driverId
- << " from task " << driver->task()->taskId();
- }
}
const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, cb);
auto vp = vb->getRowVector();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]