>Ultimately, the question is whether mapConcurrent() should be optimized 
>primarily for preserving input sequence, or for enabling maximum throughput 
>and flexibility in concurrent scenarios, including efficient "first-to-finish" 
>patterns.

The answer to that is clear in the specification for the operation:

«This operation preserves the ordering of the stream.» - 
http://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com>
Sent: Wednesday, 4 June 2025 18:18
To: Viktor Klang <viktor.kl...@oracle.com>
Cc: core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?

Hi Viktor,

Thank you for the detailed explanation and the links to the Javadoc for 
findFirst() and limit(). You're absolutely correct in your characterization of 
these operations – they are indeed concerned with encounter order, and parallel 
execution doesn't inherently change that unless the stream is explicitly 
unordered. I want to clarify a few of my points that I didn't explain clearly 
(see below).


On Wed, Jun 4, 2025 at 7:38 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi Jige,

First of all—you're most welcome. Thanks for your insightful questions.

>Temptation for Race Semantics: The beauty of mapConcurrent() integrating with 
>the Stream API means developers will naturally be drawn to use it for 
>race-like scenarios. Operations like findFirst() or limit(N) to get the first 
>few completed results are very intuitive combinations.

It's important to distinguish between spatial (encounter) order and temporal 
(availability) order.

If we look at `Stream::findFirst()` we see:

«Returns an 
Optional<https://docs.oracle.com/en/java/javase/23/docs/api/java.base/java/util/Optional.html>
 describing the first element of this stream, or an empty Optional if the 
stream is empty. If the stream has no encounter order, then any element may be 
returned.» - 
https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#findFirst()

And if we look at `Stream::limit(long)` we see:

«While limit() is generally a cheap operation on sequential stream pipelines, 
it can be quite expensive on ordered parallel pipelines, especially for large 
values of maxSize, since limit(n) is constrained to return not just any n 
elements, but the first n elements in the encounter order. » (emphasis mine) - 
https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#limit(long)

So, we can conclude that "first" and "limit" are about encounter order, and we 
can conclude that the presence of parallel does not change that—only 
"unorderedness" may change that.



My apologies if my previous example using findFirst() wasn't as clear as it 
could have been. I'd like to clarify how I see the interaction with 
mapConcurrent() playing out:

  1.  mapConcurrent()'s Influence on Encounter Order: When I mentioned using 
findFirst() (or limit()) for race semantics, the implicit assumption was how 
mapConcurrent() would feed elements into these terminal operations. The 
"encounter order" that findFirst() sees is precisely what mapConcurrent() 
emits. If mapConcurrent() emits results strictly according to input order, then 
findFirst() will respect that. However, if an unordered mapConcurrent() were to 
emit results in their temporal order of completion, then findFirst() would 
naturally pick the first one that actually finished, effectively achieving a 
race. My point is that mapConcurrent() is in a position to define the encounter 
order that downstream operations like findFirst() will act upon. The surprise 
isn't with findFirst()'s definition, but with how a strictly ordered 
mapConcurrent() might prevent it from being used effectively for 
"first-to-finish" logic.

  2.  The Role of findAny(): You're right that my example using findFirst() was 
more nuanced than I intuitively assumed. A developer well-versed in the Stream 
API nuances might indeed opt for findAny() to implement race semantics, 
especially given its Javadoc explicitly mentioning suitability for "maximal 
performance in parallel operations." findAny() is designed to return any 
element, which aligns well with picking the temporally first available result 
from an unordered concurrent process.

  3.  Aligning Spec with Intuition: My broader concern is about the intuitive 
use of these powerful new tools. If mapConcurrent() maintains strict input 
ordering, it's a perfectly valid and understandable specification by itself. 
However, the potential for surprise arises when developers, especially those 
less acquainted with every fine print of stream ordering, attempt to combine 
mapConcurrent() with findFirst() or findAny() to build common concurrent 
utilities like a race. They might intuitively expect that "concurrent mapping" 
followed by "find first/any" would yield the result that completes earliest. 
While developers should read all related documentation, there's also a design 
ideal where the API's behavior aligns closely with reasonable developer 
intuition, especially for common patterns. An unordered mapConcurrent() (or an 
option for it) would, in my view, better support this intuitive use for 
race-like patterns. Whereas, I have my doubts that developers commonly would 
intuitively assume a concurrent operation to stick to strict ordering (it's not 
what we are used to in most concurrent or multil-threading scenarios)

Ultimately, the question is whether mapConcurrent() should be optimized 
primarily for preserving input sequence, or for enabling maximum throughput and 
flexibility in concurrent scenarios, including efficient "first-to-finish" 
patterns.



>Surprise for Race Semantics Users: Following from the above, it could be 
>surprising for developers when they realize that the inherent input ordering 
>of mapConcurrent() means it's not optimized for these race scenarios. The 
>expectation would be that findFirst() returns as soon as any task completes, 
>but ordering can delay this if an earlier task (in input order) is slower.

This should be addressed above.

>Ordering Assumption in Concurrency: My experience is that ordering is not 
>typically a default assumption when dealing with operations explicitly marked 
>as "parallel" or "concurrent." For instance, Stream.forEach() on a parallel 
>stream does not guarantee encounter order, presumably for performance reasons 
>– a similar trade-off to what's being discussed for mapConcurrent(). 
>Developers often consult documentation for ordering guarantees in concurrent 
>contexts rather than assuming them.

This should also be addressed above.

>Expectation of "True" Concurrency: When I see an API like 
>mapConcurrent(maxConcurrency, mapper), my mental model is that if 
>maxConcurrency permits, new tasks should be initiated as soon as a slot is 
>free.

This is interesting, because this is how mapConcurrent used to work. It only 
placed the limit of concurrent work in progress and not work not yet possible 
to propagate downstream. This was changed, because a delayed initial (in 
encounter order) item may let subsequent (completed) work queue up indefinitely.

So in conclusion, there's still room for a different take on 
"mapConcurrentReorder" (name of course left up to the reader), and the good 
news is that such a Gatherer can be implemented, evaluated, hardened, etc 
outside of the JDK—and potentially some day something like it ends up in the 
JDK.


Your mention of a potential mapConcurrentReorder() also brings to mind a 
thought about API consistency with existing Stream conventions. We see patterns 
like Stream.forEach() (which for parallel streams doesn't guarantee encounter 
order, prioritizing performance) versus Stream.forEachOrdered() (which 
explicitly enforces it). If mapConcurrent() were to follow a similar naming 
convention, it might suggest that the base mapConcurrent() itself would be the 
version optimized for throughput (and thus potentially unordered by default), 
while a hypothetical mapConcurrentOrdered() would be the variant explicitly 
providing the strict ordering guarantee. This is an observation on how naming 
might align with established JDK patterns to intuitively guide user 
expectations about the default behavior.

This naturally leads to the inherent challenge in designing such an API. It 
seems we're trying to balance three desirable, but sometimes conflicting, goals:

  1.  Strict Encounter Ordering: Results are emitted in the same order as the 
input elements.
  2.  Bounded Memory Buffering: Avoid out-of-memory errors by not letting 
completed but un-emitted results queue up indefinitely.
  3.  Optimized (True) Concurrency: If maxConcurrency is set (e.g., to N), the 
system strives to have N tasks actively running whenever there are pending 
input elements and available concurrency slots, rather than being stalled by a 
slow-to-complete but earlier-in-sequence task.

It appears that achieving all three simultaneously is not possible, and a 
compromise must be made.

From my perspective:

  *   Goal #2 (Bounded Memory) is non-negotiable; OOM situations are generally 
unacceptable.
  *   Goal #3 (Optimized Concurrency) feels fundamental to an API named 
mapConcurrent(). Users will likely expect it to maximize the concurrent 
execution of tasks up to the specified limit. Deviations from this, often 
necessitated by strict adherence to Goal #1, can lead to surprises.
  *   Goal #1 (Strict Ordering), while a "nice-to-have" and sometimes 
beneficial, might not always align with common developer intuition for 
operations explicitly labeled "concurrent," especially if it compromises true 
concurrency. As discussed, ordering can often be reintroduced by the caller if 
specifically needed.

The current implementation understandably prioritizes #1 (Ordering) and #2 
(Bounded Memory). However, this prioritization can lead to situations where #3 
(Optimized Concurrency) is not fully realized, which can be counter-intuitive.

I've already mentioned the race scenario. Consider another example where this 
becomes particularly evident. Imagine a scenario with a long-running task 
(e.g., for periodic monitoring) combined with a stream of other "real" tasks:

Java

// Main tasks
List<Callable<Result>> realTasks = ... ;

// A long-running monitoring task
Callable<Void> longRunningMonitoringTask = () -> {
    while (!allRealTasksSeemDone()) { // Simplified condition
        System.out.println("Monitoring...");
        Thread.sleep(5000);
    }
    return null;
};

Stream<Callable<?>> allTasks = Stream.concat(
    Stream.of(longRunningMonitoringTask),
    realTasks.stream().map(task -> (Callable<?>) task)
);

allTasks
    .gather(Gatherers.mapConcurrent(MAX_CONCURRENCY, callable -> 
callable.call()))
    .forEach(result -> { /* output results */ });


The longRunningMonitoringTask would usually be the first element, with the 
current strictly ordered behavior, it would occupy one of the MAX_CONCURRENCY 
slots. and as soon as MAX_CONCURRENCY tasks are executed, completed or not, all 
subsequent realTasks (beyond the first one) would be starved, unable to even 
start until the longRunningMonitoringTask completes—which is never. This 
creates a deadlock or significant processing delay that would likely surprise a 
developer expecting tasks to proceed concurrently up to the maxConcurrency 
limit. An implementation more aligned with the "base name implies throughput" 
idea (as per the forEach analogy) would likely handle this more gracefully by 
letting tasks run truly concurrently without an implicit ordering dependency.

This reinforces my belief that an alternative gatherer, or a default behavior 
for mapConcurrent() that prioritizes concurrency (as the name mapConcurrent 
might suggest to many, akin to forEach), could be very valuable. I understand 
this might be something that evolves outside the JDK initially, and I 
appreciate you highlighting that path.

Thanks again for the continued discussion and the transparency about the design 
choices.

Best regards,



Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Wednesday, 4 June 2025 16:20
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?


Hi Viktor,

Thank you for sharing that the general feedback on mapConcurrent() has been 
positive and for the insights into the ongoing enhancements, especially around 
interruption handling and work-in-progress tracking.

To clarify my own position, I am also extremely enthusiastic about the 
mapConcurrent() API overall. It offers an elegant and straightforward way to 
manage homogenous, I/O-intensive concurrent tasks within a structured 
concurrency model, which is a significant improvement and a much-needed 
addition. My feedback on ordering is aimed at maximizing its potential.

I'd like to elaborate on a few specific scenarios and expectations that inform 
my perspective on the ordering:

  1.  Temptation for Race Semantics: The beauty of mapConcurrent() integrating 
with the Stream API means developers will naturally be drawn to use it for 
race-like scenarios. Operations like findFirst() or limit(N) to get the first 
few completed results are very intuitive combinations. For example:

Java

// Hypothetical use case: find the fastest responding service
Optional<Result> fastestResult = serviceUrls.stream()
    .gather(Gatherers.mapConcurrent(MAX_CONCURRENCY, url -> fetch(url)))
    .findFirst();


  2.  Surprise for Race Semantics Users: Following from the above, it could be 
surprising for developers when they realize that the inherent input ordering of 
mapConcurrent() means it's not optimized for these race scenarios. The 
expectation would be that findFirst() returns as soon as any task completes, 
but ordering can delay this if an earlier task (in input order) is slower.

  3.  Ordering Assumption in Concurrency: My experience is that ordering is not 
typically a default assumption when dealing with operations explicitly marked 
as "parallel" or "concurrent." For instance, Stream.forEach() on a parallel 
stream does not guarantee encounter order, presumably for performance reasons – 
a similar trade-off to what's being discussed for mapConcurrent(). Developers 
often consult documentation for ordering guarantees in concurrent contexts 
rather than assuming them.

  4.  Expectation of "True" Concurrency: When I see an API like 
mapConcurrent(maxConcurrency, mapper), my mental model is that if 
maxConcurrency permits, new tasks should be initiated as soon as a slot is 
free. For example, with maxConcurrency=2:

     *   Task 1 starts.
     *   Task 2 starts.
     *   If Task 2 finishes while Task 1 is still running, I would expect Task 
3 to run concurrently alongside task 1, because the max concurrency is 2, not 
1. The current ordered behavior, where Task 3 might have to wait for Task 1 to 
complete before its result can be processed (even if Task 3 itself could have 
started and finished), can feel a bit counterintuitive to the notion of 
maximizing concurrency up to the specified limit. It almost feels like not a 
"max concurrency", but "max buffer size".

These points are offered to highlight potential areas where the current default 
could lead to subtle surprises or suboptimal performance for useful concurrent 
patterns.

Thanks again for the open discussion and for your work on these valuable 
additions to the JDK.

Best regards,

On Tue, Jun 3, 2025 at 2:13 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
The general feedback received thus far has been primarily positive. There have 
been a few behavior-related enhancements over the previews to better handle 
interruption (there's still room to improve there, as per our concurrent 
conversation) as well as some improvements to work-in-progress tracking.

It will be interesting to see which Gatherer-based operations will be devised 
by Java developers in the future.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 2 June 2025 18:54
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?


Hi Viktor,

Thanks for your reply and for sharing your experience regarding user 
preferences. I appreciate that perspective.

You're right, if an unordered version of mapConcurrent proves to be widely 
beneficial and is implemented and adopted by the community, it could certainly 
make a strong case for future inclusion in the JDK.

I wanted to clarify a nuance regarding user preference that I might not have 
articulated clearly before. If the question is simply "ordered or unordered?", 
in isolation, I can see why many, myself included, might lean towards "ordered" 
as a general preference.

However, the decision becomes more complex when the associated trade-offs are 
considered. If the question were phrased more like, "Do you prefer an ordered 
mapConcurrent by default, even if it entails potential performance overhead and 
limitations for certain use cases like race() operations, versus an unordered 
version that offers higher throughput and broader applicability in such 
scenarios?" my (and perhaps others') answer might differ. The perceived cost 
versus benefit of ordering changes significantly when these factors are 
explicit.

My initial suggestion stemmed from the belief that the performance and 
flexibility gains of an unordered approach for I/O-bound tasks would, in many 
practical situations, outweigh the convenience of default ordering, especially 
since ordering can be reintroduced relatively easily, and explicitly, when 
needed.

Thanks again for the discussion.

Best regards,

On Mon, Jun 2, 2025 at 8:51 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
>My perspective is that strict adherence to input order for mapConcurrent() 
>might not be the most common or beneficial default behavior for users.

If there is indeed a majority who would benefit from an unordered version of 
mapConcurrent (my experience is that the majority prefer ordered) then, since 
it is possible to implement such a Gatherer outside of the JDK, this is 
something which will be constructed, widely used, and someone will then propose 
to add something similar to the JDK.

>While re-implementing the gatherer is a possibility, the existing 
>implementation is non-trivial, and creating a custom, robust alternative 
>represents a significant undertaking.

The existing version needs to maintain order, which adds to the complexity of 
the implementation. Implementing an unordered version would likely look 
different.
I'd definitely encourage taking the opportunity to attempt to implement it.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 2 June 2025 17:05
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?


Thank you for your response and for considering my feedback on the 
mapConcurrent() gatherer. I understand and respect that the final decision 
rests with the JDK maintainers.

I would like to offer a couple of further points for consideration. My 
perspective is that strict adherence to input order for mapConcurrent() might 
not be the most common or beneficial default behavior for users. I'd be very 
interested to see any research or data that suggests otherwise, as that would 
certainly inform my understanding.

From my experience, a more common need is for higher throughput in 
I/O-intensive operations. The ability to support use cases like race()—where 
the first successfully completed operation determines the outcome—also seems 
like a valuable capability that is currently infeasible due to the ordering 
constraint.

As I see it, if a developer specifically requires the input order to be 
preserved, this can be achieved with relative ease by applying a subsequent 
sorting operation. For instance:

.gather(mapConcurrent(...))
.sorted(Comparator.comparing(Result::getInputSequenceId))


The primary challenge in these scenarios is typically the efficient fan-out and 
execution of concurrent tasks, not the subsequent sorting of results.

Conversely, as you've noted, there isn't a straightforward way to modify the 
current default ordered behavior to achieve the higher throughput or race() 
semantics that an unordered approach would naturally provide.

While re-implementing the gatherer is a possibility, the existing 
implementation is non-trivial, and creating a custom, robust alternative 
represents a significant undertaking. My hope was that an unordered option 
could be a valuable addition to the standard library, benefiting a wider range 
of developers.

Thank you again for your time and consideration.


On Mon, Jun 2, 2025 at 7:48 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
>Even if it by default preserves input order, when I explicitly called 
>stream.unordered(), could mapConcurrent() respect that and in return achieve 
>higher throughput with support for race?

The Gatherer doesn't know whether the Stream is unordered or ordered. The 
operation should be semantically equivalent anyway.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 2 June 2025 16:29
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>; 
core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: [External] : Re: Should mapConcurrent() respect time order instead of 
input order?

Sorry. Forgot to copy to the mailing list.

On Mon, Jun 2, 2025 at 7:27 AM Jige Yu 
<yuj...@gmail.com<mailto:yuj...@gmail.com>> wrote:
Thanks Viktor!

I was thinking from my own experience that I wouldn't have automatically 
assumed that a concurrent fanout library would by default preserve input order.

And I think wanting high throughput with real-life utilities like race would be 
more commonly useful.

But I could be wrong.

Regardless, mapConcurrent() can do both, no?

Even if it by default preserves input order, when I explicitly called 
stream.unordered(), could mapConcurrent() respect that and in return achieve 
higher throughput with support for race?



On Mon, Jun 2, 2025 at 2:33 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi!

In a similar vein to the built-in Collectors,
the built-in Gatherers provide solutions to common stream-related problems, but 
also, they also serve as "inspiration" for developers for what is possible to 
implement using Gatherers.

If someone, for performance reasons, and with a use-case which does not require 
encounter-order, want to take advantage of that combination of circumstances, 
it is definitely possible to implement your own Gatherer which has that 
behavior.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev 
<core-libs-dev-r...@openjdk.org<mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Sunday, 1 June 2025 21:08
To: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Should mapConcurrent() respect time order instead of input order?

It seems like for most people, input order isn't that important for concurrent 
work, and concurrent results being in non-deterministic order is often expected.

If mapConcurrent() just respect output encounter order:

It'll be able to achieve higher throughput if an early task is slow, For 
example, with concurrency=2, and if the first task takes 10 minutes to run, 
mapConcurrent() would only be able to process 2 tasks within the first 10 
minutes; whereas with encounter order, the first task being slow doesn't block 
the 3rd - 100th elements from being processed and output.

mapConcurrent() can be used to implement useful concurrent semantics, for 
example to support race semantics. Imagine if I need to send request to 10 
candidate backends and take whichever that succeeds first, I'd be able to do:

backends.stream()
    .gather(mapConcurrent(
        backend -> {
          try {
            return backend.fetchOrder();
           } catch (RpcException e) {
             return null; // failed to fetch but not fatal
           }
        })
        .filter(Objects::notNull)
        .findFirst(); // first success then cancel the rest

Cheers,

Reply via email to