On Thu, 9 Jan 2025 10:13:52 GMT, Viktor Klang <[email protected]> wrote:
> The following patch updates Gatherers.mapConcurrent to limit work-in-progress
> (on-going and completed-unpushed) to the `maxConcurrency` so that
> head-of-line blocking does not cause completed-unpushed work to grow
> unbounded.
>
> This also simplifies interruption handling to ignore-and-restore, which needs
> to be done on a per-element-basis as the calling thread can change between
> invocations of the integrator, as well as the finisher, so restoring it on
> finish is not possible (and won't happen if there's an exception thrown
> during integration anyway).
>
> Furthermore, logic has been added to reduce the risk of any spawned virtual
> threads surviving the processing of the stream.
I looked at a few early iterations of this as the PR was being created so I
think in a good place.
src/java.base/share/classes/java/util/stream/Gatherers.java line 392:
> 390: while (proceed
> 391: && (current = wip.peekFirst()) != null
> 392: && (current.isDone() || atLeastN > 0)) {
It might be better to indent these two lines so that it's clearer what the
while expression is vs. the code in the block.
src/java.base/share/classes/java/util/stream/Gatherers.java line 421:
> 419: if (!success && !wip.isEmpty()) {
> 420: // First signal cancellation for all tasks in
> progress
> 421: for(var task : wip)
Minor formating nit is that you probably want a space in "for(", there are a
few more of these in the patch.
test/jdk/java/util/stream/GatherersMapConcurrentTest.java line 322:
> 320: @ParameterizedTest
> 321: @MethodSource("concurrencyConfigurations")
> 322: public void
> behavesAsExpectedWhenCallerIsInterrupted(ConcurrencyConfig cc) {
It might be helpful for future maintainers to put a comment on the
behaveAsExpectedXXX tests so that it's easier to figure out what they are
testing.
-------------
Marked as reviewed by alanb (Reviewer).
PR Review: https://git.openjdk.org/jdk/pull/22999#pullrequestreview-2540222162
PR Review Comment: https://git.openjdk.org/jdk/pull/22999#discussion_r1909009172
PR Review Comment: https://git.openjdk.org/jdk/pull/22999#discussion_r1909010667
PR Review Comment: https://git.openjdk.org/jdk/pull/22999#discussion_r1909013865