gabotechs commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2096083785
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -158,28 +225,17 @@ impl RepartitionExecState {
));
spawned_tasks.push(wait_for_task);
}
-
- Self {
+ *self = Self::ConsumingInputStreams(ConsumingInputStreamsState {
channels,
abort_helper: Arc::new(spawned_tasks),
+ });
+ match self {
+ RepartitionExecState::ConsumingInputStreams(value) => Ok(value),
+ _ => unreachable!(),
}
}
}
-/// Lazily initialized state
-///
-/// Note that the state is initialized ONCE for all partitions by a single
task(thread).
-/// This may take a short while. It is also like that multiple threads
-/// call execute at the same time, because we have just started "target
partitions" tasks
-/// which is commonly set to the number of CPU cores and all call execute at
the same time.
-///
-/// Thus, use a **tokio** `OnceCell` for this initialization so as not to
waste CPU cycles
-/// in a mutex lock but instead allow other threads to do something useful.
-///
-/// Uses a parking_lot `Mutex` to control other accesses as they are very
short duration
-/// (e.g. removing channels on completion) where the overhead of `await` is
not warranted.
-type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>;
Review Comment:
Actually one of the main points of the PR is removing this `LazyState`. The
issue is that `LazyState::get_or_init()` is an async method, and therefore, it
needs to be called within an async context.
As `PhysicalPlan::execute` is not async, we are forced to initialized the
`LazyState` inside the `future::stream::once(async move { ... })` block, which
means that the `LazyState::get_or_init()` will not be called until the first
message in the stream is polled, therefore delaying the `.execute()` call to
the child input.
I see that the purpose of introducing `LazyState` in
https://github.com/apache/datafusion/pull/10009 was to reduce lock contention
in `RepartitionExec::execute` calls, but my guess is that this can be more
simply solved by just checking an `AtomicBool` in order to just lock the state
once, letting any other threads continue the work without performing locks on
RepartitionExec::execute, and therefore, allowing us to call `input.execute()`
synchronously upon a `RepartitionExec::execute` call.
Not sure if there's a middle term solution to this that allows us to keep
the `LazyState`, I'll try to think of something, but otherwise I'm happy to
wait for @crepererum's input next week
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]