crepererum commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2106955247
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec {
// Get existing ordering to use for merging
let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
+ let state = Arc::clone(&self.state);
+ if !self.state_initialized.swap(true, Ordering::Relaxed) {
+ state.lock().ensure_input_streams_initialized(
Review Comment:
The observation done by @gabotechs here
https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 is
correct:
1. The original intention was to make `execute` not content.
2. The delay is indeed a breakage of the API contract.
However I want to broaden the discussion a bit:
This is racy: the first thread could check the boolean but not get the lock,
while other threads skip the IF body and start to poll, at which point they get
the lock (around line 670 WITH PR applied, around the comment "lock mutexes")
and now you're blocking the async IO runtime with initialization work, or with
your implementation you just get an error
`RepartitionExecState::init_input_streams must be called before consuming input
streams` (this needs to be fixed before merging because this might happen under
high load!).
And even if you would find a non-racy API that combines the boolean w/ the
lock, you still have the same semantic race. I think the question is: are we
allowed to poll streams even if not all `execute` calls finished? I would say:
yes. Like in theory you could even interleave:
1. execute partition 0
2. poll partition 0
3. execute partition 1
4. poll partition 1
5. ...
I think in general, this problem cannot be fully fixed though: you either
block during `execute` or you potentially block during `poll`, at least as long
the `execute` method needs to be called PER PARTITION -- which TBH might have
been a somewhat unfortunate choice, I would rather call it once and return a
vector of streams.
So the question is: is this PR better then the status quo? I would say yes,
but I would like to see at least one additional test to simulate the race
described above so it doesn't error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]