That said, I'm still curious. Have you considered propagating cancellation,
re-interrupting the current thread, and then throwing an unchecked
exception upon interruption? It's going to be the most fail-fast-compatible.

catch (IE interrupted) {
  cancel all pending futures
  Thread.currentThread().interrupt();
  throw new UncheckedInterruptedException();
}

What are the concerns against such practice?





On Mon, May 26, 2025 at 8:31 AM Jige Yu <yuj...@gmail.com> wrote:

> Yeah, I think I missed the task cancellation and the joining part.
>
> When the mapConcurrent() thread is interrupted, it propagates the
> cancellation to the child threads.
>
> And if any of the children threads (running the user-provided Function)
> receives that cancellation, it will still need to catch it and handle
> interruption.
>
> That means the interruption isn't swallowed. If the user code wants to
> abort, they can always just throw an unchecked exception and catch it from
> the caller of mapConcurrent().
>
> That should do for most of the IO type work.
>
> This makes sense.
>
>
>
> On Mon, May 26, 2025 at 12:30 AM Viktor Klang <viktor.kl...@oracle.com>
> wrote:
>
>> Yes, the updated Gatherers.mapConcurrent will continue under interruption
>> until done, and then restore the interrupt status of the calling thread.
>>
>> It cannot (reasonably) do anything else—if it were to throw
>> InterruptedException, it couldn't since that is a checked exception and
>> there's no tracking of checked exceptions throughout a Stream, and that is
>> the specified behavior to handle thread interrupts (i.e. clear flag and
>> throw InterruptedException, which is not possible in this case; or, make
>> sure that interrupt status is maintained, which is possible in this case.
>>
>> I have given it some more thought in the meantime, but I haven't arrived
>> in something more satisfactory than that, yet.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Jige Yu <yuj...@gmail.com>
>> *Sent:* Monday, 26 May 2025 03:07
>> *To:* Viktor Klang <viktor.kl...@oracle.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>>
>> Thanks Viktor!
>>
>> Sorry, I was giving myself more time to read the code, but then I lost
>> track.
>>
>> If I'm reading the code right, the current behavior is that if the
>> current thread is interrupted, the mapConcurrent() would suppress the
>> interruption until all elements are processed? (It will then restore the
>> interruption bit)
>>
>> Doesn't this break fail fast?
>>
>> For example, I may have a mapConcurrent() called from within structured
>> concurrency, and that fans out to two operations:
>> †
>> scope.fork(() -> doA());
>> scope.fork(() -> doB());  // calls mapConcurrent() internally
>> scope.join().throwIfFailed();
>>
>> If doA() fails, doB() will be cancelled (thread interrupted).
>>
>> If mapConcurrent() ignores the interruption, and if doB has a long list
>> to process, it'll continue to consume system resources even when the caller
>> no longer needs the results, no?
>>
>>
>> On Fri, Feb 7, 2025 at 2:16 AM Viktor Klang <viktor.kl...@oracle.com>
>> wrote:
>>
>> >Sorry, did the PR stop using Semaphore?
>>
>> No, not that PR. See:
>> https://github.com/openjdk/jdk/commit/450636ae28b84ded083b6861c6cba85fbf87e16e
>> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/commit/450636ae28b84ded083b6861c6cba85fbf87e16e__;!!ACWV5N9M2RV99hQ!OKfWrvJEDCkiHcT5b4a6gfeomZzZeq3jv6ir2vf2I5WWIhujtb-by_VuQFaAPDIuAA0qAwpTUjllXMPF$>
>>
>> The problem with interruption under parallel evaluation is that there is
>> no general support for propagation of interruption in CountedCompleters.
>> Adding support for such in (at least) GathererOp needs further study before
>> contemplating making any changes to mapConcurrent()'s interruption policy.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>>
>> ------------------------------
>> *From:* Jige Yu <yuj...@gmail.com>
>> *Sent:* Thursday, 6 February 2025 17:04
>> *To:* Viktor Klang <viktor.kl...@oracle.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>>
>> Sorry, did the PR stop using Semaphore?
>>
>> I had naively thought that mapConcurrent() will keep a buffer of Future
>> of all currently-running concurrent tasks (it can be a
>> ConcurrentMap<TaskKey, Future> if we don't have to ensure FIFO).
>>
>> Upon interruption, the main thread can call .cancel(true) on all pending
>> Futures; optionally join with the VTs (if we need to block until all VTs
>> exit); then propagate exception.
>>
>> Upon completion, each task just removes itself from the ConcurrentMap.
>>
>> Just in case it adds anything.
>>
>>
>>
>> On Thu, Feb 6, 2025 at 6:47 AM Viktor Klang <viktor.kl...@oracle.com>
>> wrote:
>>
>> After some more investigation it seems tractable to propagate
>> interruption of the caller in sequential mode, but parallel mode will
>> require much bigger considerations.
>>
>> I made a comment to that effect on the JBS issue:
>> https://bugs.openjdk.org/browse/JDK-8349462?focusedId=14750017&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14750017
>>
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Viktor Klang <viktor.kl...@oracle.com>
>> *Sent:* Thursday, 6 February 2025 11:51
>> *To:* Jige Yu <yuj...@gmail.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>>
>> I think alignment in behavior between parallel Stream and mapConcurrent
>> in terms of how interruptions are handled is a possible path forward.
>>
>> I decided to close the PR for now as I realized my parallel Stream
>> example had misled me regarding its exception throwing, so I'll need to go
>> back and refine the solution.
>>
>> It still seems solvable though.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Jige Yu <yuj...@gmail.com>
>> *Sent:* Wednesday, 5 February 2025 19:20
>> *To:* Viktor Klang <viktor.kl...@oracle.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>>
>> Oh good call!
>>
>> I forgot to check what parallel streams do upon interruption (didn't
>> think they do any blocking calls, but at least the main thread must block).
>>
>> On Wed, Feb 5, 2025 at 8:18 AM Viktor Klang <viktor.kl...@oracle.com>
>> wrote:
>>
>> Hi Jige,
>>
>> I opened an issue to track the concern, and I have proposed a change
>> which seems to align well with how parallel streams behave under caller
>> thread interruption.
>>
>> I've opened the following PR for review:
>> https://github.com/openjdk/jdk/pull/23467
>> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$>
>>
>> If you are able to make a local OpenJDK build with that solution you
>> could check if it addresses your use-cases (or not).
>>
>> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$>
>> 8349462: Gatherers.mapConcurrent could support async interrupts by
>> viktorklang-ora · Pull Request #23467 · openjdk/jdk
>> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$>
>> This change is likely going to need some extra verbiage in the spec for
>> mapConcurrent, and thus a CSR. This behavior aligns mapConcurrent with how
>> parallel streams work in conjunction with interrup...
>> github.com
>> <https://urldefense.com/v3/__http://github.com__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89DBkMefT$>
>>
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Jige Yu <yuj...@gmail.com>
>> *Sent:* Wednesday, 5 February 2025 16:24
>> *To:* Viktor Klang <viktor.kl...@oracle.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>>
>> Thanks Viktor!
>>
>> I understand the problem.
>>
>> The main reason I asked is because I want to understand how the core Java
>> team thinks of throwing an unchecked exception.
>>
>> As explained above, I consider losing cancellability a big deal, a deal
>> breaker even. And I thought throwing unchecked is more acceptable. Because
>> the most common reason the mapConcurrent() VT can be interrupted is due to
>> cancellation from a parent mapConcurrent(), or a parent Structured
>> Concurrency scope. The cancellation could be either from an organic
>> exception, or from the downstream not needing more elements, like maybe due
>> to findFirst() already getting an element.
>>
>> In both cases, since the concurrent operation is already cancelled
>> (result ignored), what exception pops up to the top level isn't that big of
>> a deal (perhaps only a log record will be seen?)
>>
>> But if the core Java team considers it a bad idea, I would love to learn
>> and adjust.
>>
>> On Tue, Feb 4, 2025 at 4:41 AM Viktor Klang <viktor.kl...@oracle.com>
>> wrote:
>>
>> Hi,
>>
>> The problem is that mapConcurrent cannot throw InterruptedException
>> because that is a checked exception, so we cannot clear the interrupted
>> flag and throw that exception.
>>
>> So the updated semantics is to not cut the stream short but instead run
>> to completion, restoring the interruption flag.
>>
>> There exists a couple of alternatives to this approach which I am
>> contemplating, but they need to be further explored before I consider
>> moving forward with any of them.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Jige Yu <yuj...@gmail.com>
>> *Sent:* Monday, 27 January 2025 17:00
>> *To:* Viktor Klang <viktor.kl...@oracle.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>>
>> Thanks Viktor!
>>
>> It looks like the current fix ignores interruption.
>>
>> I want to make sure my concern of it defeating cancellation is heard and
>> understood.
>>
>> The scenarios I worry about is for a mapConcurrent() that fans out to
>> another method call, which internally calls mapConcurrent() as
>> implementation detail.
>>
>> An example:
>>
>> List<RefundResponse> refundHelper(transaction) {
>>   transaction.creditCardAccounts.stream()
>>     .gather(mapConcurrent(acct -> service.refund(acct))
>>     .toList();
>> }
>>
>> transactions.stream()
>>     .gather(mapConcurrent(transaction -> refundHelper(transaction));
>>
>>
>> It seems undesirable that in such a case all the service.refund() calls
>> become non cancellable, because the only way the outer mapConcurrent()
>> cancels the refundHelper() calls is through Thread.interrupt() the virtual
>> threads that call refundHelper(), which would then be disabled by the inner
>> mapConcurrent().
>>
>> Does this example make sense to you? I can further explain if anything
>> isn't clear.
>>
>> But I want to make sure the decision to disable interruption is
>> deliberate judgement call that such nested mapConcurrent() is unlikely,or
>> not important.
>>
>> Cheers,
>>
>>
>>
>> On Mon, Jan 27, 2025 at 6:11 AM Viktor Klang <viktor.kl...@oracle.com>
>> wrote:
>>
>> Hi!
>>
>> Please see: https://github.com/openjdk/jdk/pull/23100
>> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23100__;!!ACWV5N9M2RV99hQ!IzQs0G26M7ZGPwJ3YJpCcS0gxi6BjqoBux2T5u0cHud_zb_mHLfiIrASSZiP0ynNgnaAuwuOh__WinK8$>
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Jige Yu <yuj...@gmail.com>
>> *Sent:* Sunday, 26 January 2025 23:03
>> *To:* Viktor Klang <viktor.kl...@oracle.com>
>> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* [External] : Re: mapConcurrent() with InterruptedException
>>
>> Checking in on what you've found out, Viktor.
>>
>> From where we left off, I understand that you were looking at
>> alternatives instead of silent truncation?
>>
>> Have you reached any conclusion?
>>
>> We touched on disallowing interruption during mapConcurrent(). I still
>> have concerns with disabling cancellation, because it basically undoes this
>> API note from the javadoc
>> <https://cr.openjdk.org/~alanb/sc-20240503/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)>
>> :
>>
>> API Note: In progress tasks will be attempted to be cancelled, on a
>> best-effort basis, in situations where the downstream no longer wants to
>> receive any more elements.
>> In reality, people will use mapConcurrent() to fan out rpcs. Sometimes
>> these rpcs are just a single blocking call; yet sometimes they may
>> themselves be a Structured Concurrency scope, with 2 or 3 rpcs that
>> constitute a single logical operation. Under two conditions, cancellation
>> is imho important semantic:
>>
>>    1. The downstream code uses filter().findFirst(), and when it sees an
>>    element, it will return and no longer needs the other pending rpcs to
>>    complete. If cancellation is disabled, these unnecessary rpcs will waste
>>    system resources.
>>    2. One of the rpc throws and the Stream pipeline needs to propagate
>>    the exception. Again, if the other rpcs cannot be cancelled, we'll have
>>    many zombie rpcs.
>>
>> Zombie rpcs may or may not be a deal breaker, depending on the specific
>> use case. But for a JDK library, losing cancellation would have a negative
>> impact on usability.
>>
>> My 2c,
>>
>>
>> On Fri, Jan 3, 2025 at 9:18 AM Viktor Klang <viktor.kl...@oracle.com>
>> wrote:
>>
>> Hi Ben,
>>
>> Thanks for raising these questions—getting feedback is crucial in the
>> Preview stage of features.
>>
>> I wrote a reply to the Reddit thread so I'll just summarize here:
>>
>> It is important to note that *mapConcurrent()* is not a part of the
>> Structured Concurrency JEPs, so it is not designed to join SC scopes.
>>
>> I'm currently experimenting with ignoring-but-restoring interrupts on the
>> "calling thread" for *mapConcurrent()*, as well as capping
>> work-in-progress to *maxConcurrency* (not only capping the concurrency
>> but also the amount of completed-but-yet-to-be-pushed work). Both of these
>> adjustments should increase predictability of behavior in the face of
>> blocking operations with variable delays.
>>
>> Another adjustment I'm looking at right now is to harden/improve the
>> cleanup to wait for concurrent tasks to acknowledge cancellation, so that
>> once the finisher is done executing the VTs are known to have terminated.
>>
>> As for not preserving the encounter order, that would be a completely
>> different thing, and I'd encourage you to experiment with that if that
>> functionality would be interesting for your use-case(s).
>>
>> Again, thanks for your feedback!
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of Jige
>> Yu <yuj...@gmail.com>
>> *Sent:* Friday, 3 January 2025 17:53
>> *To:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
>> *Subject:* mapConcurrent() with InterruptedException
>>
>> Hi Java Experts,
>>
>> I sent this email incorrectly to loom-dev@ and was told on Reddit that
>> core-libs-dev is the right list.
>>
>> The question is about the behavior of mapConcurrent() when the thread is
>> interrupted.
>>
>> Currently mapConcurrent()'s finisher phase will re-interrupt the thread,
>> then stop at whatever element that has already been processed and return.
>>
>> This strikes me as a surprising behavior, because for example if I'm
>> running:
>>
>>    Stream.of(1, 2, 3)
>>         .gather(mapConcurrent(i -> i * 2))
>>         .toList()
>>
>> and the thread is being interrupted, the result could be any of [2], [2,
>> 4] or [2, 4, 6].
>>
>> Since thread interruption is cooperative, there is no guarantee that the
>> thread being interrupted will just abort. It's quite possible that it'll
>> keep going and then will use for example [2] as the result of doubling the
>> list of [1, 2, 3], which is imho incorrect.
>>
>> In the Reddit
>> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/observations_of_gatherersmapconcurrent/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8M4PjPb7L$>
>>  thread,
>> someone argued that interruption rarely happens so it's more of a
>> theoretical issue. But interruption can easily happen in Structured
>> Concurrency or in mapConcurrent() itself if any subtask has failed in order
>> to cancel/interrupt the other ongoing tasks.
>>
>> There had been discussion about alternative strategies:
>>
>>    1. Don't respond to interruption and just keep running to completion.
>>    2. Re-interrupt thread and wrap the InterruptedException in a
>>    standard unchecked exception (StructuredConcurrencyInterruptedException?).
>>
>>
>> I have concerns with option 1 because it disables cancellation
>> propagation when mapConcurrent() itself is used in a subtask of a parent
>> mapConcurrent() or in a StructuredConcurrencyScope.
>>
>> Both equivalent Future-composition async code, or C++'s fiber trees
>> support cancellation propagation and imho it's a critical feature or else
>> it's possible that a zombie thread is still sending RPCs long after the
>> main thread has exited (failed, or falled back to some default action).
>>
>> My arguments for option 2:
>>
>>    1. InterruptedException is more error prone than traditional checked
>>    exceptions for *users* to catch and handle. They can forget to
>>    re-interrupt the thread. It's so confusing that even seasoned programmers
>>    may not know they are *supposed to* re-interrupt the thread.
>>    2. With Stream API using functional interfaces like Supplier,
>>    Function, the option of just tacking on "throws IE" isn't available to 
>> many
>>    users.
>>    3. With Virtual Threads, it will be more acceptable, or even become
>>    common to do blocking calls from a stream operation (including but
>>    exclusive to mapConcurrent()). So the chance users are forced to deal with
>>    IE will become substantially higher.
>>    4. Other APIs such as the Structured Concurrency API have already
>>    started wrapping system checked exceptions like ExecutionException,
>>    TimeoutException in unchecked exceptions ( join()
>>    
>> <https://urldefense.com/v3/__https://download.java.net/java/early_access/loom/docs/api/java.base/java/util/concurrent/StructuredTaskScope.html*join()__;Iw!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MxGG4HzA$>
>>  for
>>    example).
>>    5. Imho, exceptions that we'd rather users not catch and handle but
>>    instead should mostly just propagate up as is, should be unchecked.
>>
>> There is also a side discussion
>> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/comment/m4z4f8c/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MyZYl02k$>
>>  about
>> whether mapConcurrent() is better off preserving input order or push to
>> downstream as soon as an element is computed. I'd love to discuss that
>> topic too but maybe it's better to start a separate thread?
>>
>> Thank you and cheers!
>>
>> Ben Yu
>>
>>

Reply via email to