That said, I'm still curious. Have you considered propagating cancellation, re-interrupting the current thread, and then throwing an unchecked exception upon interruption? It's going to be the most fail-fast-compatible.
catch (IE interrupted) { cancel all pending futures Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(); } What are the concerns against such practice? On Mon, May 26, 2025 at 8:31 AM Jige Yu <yuj...@gmail.com> wrote: > Yeah, I think I missed the task cancellation and the joining part. > > When the mapConcurrent() thread is interrupted, it propagates the > cancellation to the child threads. > > And if any of the children threads (running the user-provided Function) > receives that cancellation, it will still need to catch it and handle > interruption. > > That means the interruption isn't swallowed. If the user code wants to > abort, they can always just throw an unchecked exception and catch it from > the caller of mapConcurrent(). > > That should do for most of the IO type work. > > This makes sense. > > > > On Mon, May 26, 2025 at 12:30 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > >> Yes, the updated Gatherers.mapConcurrent will continue under interruption >> until done, and then restore the interrupt status of the calling thread. >> >> It cannot (reasonably) do anything else—if it were to throw >> InterruptedException, it couldn't since that is a checked exception and >> there's no tracking of checked exceptions throughout a Stream, and that is >> the specified behavior to handle thread interrupts (i.e. clear flag and >> throw InterruptedException, which is not possible in this case; or, make >> sure that interrupt status is maintained, which is possible in this case. >> >> I have given it some more thought in the meantime, but I haven't arrived >> in something more satisfactory than that, yet. >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* Jige Yu <yuj...@gmail.com> >> *Sent:* Monday, 26 May 2025 03:07 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException >> >> Thanks Viktor! >> >> Sorry, I was giving myself more time to read the code, but then I lost >> track. >> >> If I'm reading the code right, the current behavior is that if the >> current thread is interrupted, the mapConcurrent() would suppress the >> interruption until all elements are processed? (It will then restore the >> interruption bit) >> >> Doesn't this break fail fast? >> >> For example, I may have a mapConcurrent() called from within structured >> concurrency, and that fans out to two operations: >> † >> scope.fork(() -> doA()); >> scope.fork(() -> doB()); // calls mapConcurrent() internally >> scope.join().throwIfFailed(); >> >> If doA() fails, doB() will be cancelled (thread interrupted). >> >> If mapConcurrent() ignores the interruption, and if doB has a long list >> to process, it'll continue to consume system resources even when the caller >> no longer needs the results, no? >> >> >> On Fri, Feb 7, 2025 at 2:16 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> >Sorry, did the PR stop using Semaphore? >> >> No, not that PR. See: >> https://github.com/openjdk/jdk/commit/450636ae28b84ded083b6861c6cba85fbf87e16e >> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/commit/450636ae28b84ded083b6861c6cba85fbf87e16e__;!!ACWV5N9M2RV99hQ!OKfWrvJEDCkiHcT5b4a6gfeomZzZeq3jv6ir2vf2I5WWIhujtb-by_VuQFaAPDIuAA0qAwpTUjllXMPF$> >> >> The problem with interruption under parallel evaluation is that there is >> no general support for propagation of interruption in CountedCompleters. >> Adding support for such in (at least) GathererOp needs further study before >> contemplating making any changes to mapConcurrent()'s interruption policy. >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> >> ------------------------------ >> *From:* Jige Yu <yuj...@gmail.com> >> *Sent:* Thursday, 6 February 2025 17:04 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException >> >> Sorry, did the PR stop using Semaphore? >> >> I had naively thought that mapConcurrent() will keep a buffer of Future >> of all currently-running concurrent tasks (it can be a >> ConcurrentMap<TaskKey, Future> if we don't have to ensure FIFO). >> >> Upon interruption, the main thread can call .cancel(true) on all pending >> Futures; optionally join with the VTs (if we need to block until all VTs >> exit); then propagate exception. >> >> Upon completion, each task just removes itself from the ConcurrentMap. >> >> Just in case it adds anything. >> >> >> >> On Thu, Feb 6, 2025 at 6:47 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> After some more investigation it seems tractable to propagate >> interruption of the caller in sequential mode, but parallel mode will >> require much bigger considerations. >> >> I made a comment to that effect on the JBS issue: >> https://bugs.openjdk.org/browse/JDK-8349462?focusedId=14750017&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14750017 >> >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* Viktor Klang <viktor.kl...@oracle.com> >> *Sent:* Thursday, 6 February 2025 11:51 >> *To:* Jige Yu <yuj...@gmail.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException >> >> I think alignment in behavior between parallel Stream and mapConcurrent >> in terms of how interruptions are handled is a possible path forward. >> >> I decided to close the PR for now as I realized my parallel Stream >> example had misled me regarding its exception throwing, so I'll need to go >> back and refine the solution. >> >> It still seems solvable though. >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* Jige Yu <yuj...@gmail.com> >> *Sent:* Wednesday, 5 February 2025 19:20 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException >> >> Oh good call! >> >> I forgot to check what parallel streams do upon interruption (didn't >> think they do any blocking calls, but at least the main thread must block). >> >> On Wed, Feb 5, 2025 at 8:18 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> Hi Jige, >> >> I opened an issue to track the concern, and I have proposed a change >> which seems to align well with how parallel streams behave under caller >> thread interruption. >> >> I've opened the following PR for review: >> https://github.com/openjdk/jdk/pull/23467 >> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$> >> >> If you are able to make a local OpenJDK build with that solution you >> could check if it addresses your use-cases (or not). >> >> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$> >> 8349462: Gatherers.mapConcurrent could support async interrupts by >> viktorklang-ora · Pull Request #23467 · openjdk/jdk >> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$> >> This change is likely going to need some extra verbiage in the spec for >> mapConcurrent, and thus a CSR. This behavior aligns mapConcurrent with how >> parallel streams work in conjunction with interrup... >> github.com >> <https://urldefense.com/v3/__http://github.com__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89DBkMefT$> >> >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* Jige Yu <yuj...@gmail.com> >> *Sent:* Wednesday, 5 February 2025 16:24 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException >> >> Thanks Viktor! >> >> I understand the problem. >> >> The main reason I asked is because I want to understand how the core Java >> team thinks of throwing an unchecked exception. >> >> As explained above, I consider losing cancellability a big deal, a deal >> breaker even. And I thought throwing unchecked is more acceptable. Because >> the most common reason the mapConcurrent() VT can be interrupted is due to >> cancellation from a parent mapConcurrent(), or a parent Structured >> Concurrency scope. The cancellation could be either from an organic >> exception, or from the downstream not needing more elements, like maybe due >> to findFirst() already getting an element. >> >> In both cases, since the concurrent operation is already cancelled >> (result ignored), what exception pops up to the top level isn't that big of >> a deal (perhaps only a log record will be seen?) >> >> But if the core Java team considers it a bad idea, I would love to learn >> and adjust. >> >> On Tue, Feb 4, 2025 at 4:41 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> Hi, >> >> The problem is that mapConcurrent cannot throw InterruptedException >> because that is a checked exception, so we cannot clear the interrupted >> flag and throw that exception. >> >> So the updated semantics is to not cut the stream short but instead run >> to completion, restoring the interruption flag. >> >> There exists a couple of alternatives to this approach which I am >> contemplating, but they need to be further explored before I consider >> moving forward with any of them. >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* Jige Yu <yuj...@gmail.com> >> *Sent:* Monday, 27 January 2025 17:00 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException >> >> Thanks Viktor! >> >> It looks like the current fix ignores interruption. >> >> I want to make sure my concern of it defeating cancellation is heard and >> understood. >> >> The scenarios I worry about is for a mapConcurrent() that fans out to >> another method call, which internally calls mapConcurrent() as >> implementation detail. >> >> An example: >> >> List<RefundResponse> refundHelper(transaction) { >> transaction.creditCardAccounts.stream() >> .gather(mapConcurrent(acct -> service.refund(acct)) >> .toList(); >> } >> >> transactions.stream() >> .gather(mapConcurrent(transaction -> refundHelper(transaction)); >> >> >> It seems undesirable that in such a case all the service.refund() calls >> become non cancellable, because the only way the outer mapConcurrent() >> cancels the refundHelper() calls is through Thread.interrupt() the virtual >> threads that call refundHelper(), which would then be disabled by the inner >> mapConcurrent(). >> >> Does this example make sense to you? I can further explain if anything >> isn't clear. >> >> But I want to make sure the decision to disable interruption is >> deliberate judgement call that such nested mapConcurrent() is unlikely,or >> not important. >> >> Cheers, >> >> >> >> On Mon, Jan 27, 2025 at 6:11 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> Hi! >> >> Please see: https://github.com/openjdk/jdk/pull/23100 >> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23100__;!!ACWV5N9M2RV99hQ!IzQs0G26M7ZGPwJ3YJpCcS0gxi6BjqoBux2T5u0cHud_zb_mHLfiIrASSZiP0ynNgnaAuwuOh__WinK8$> >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* Jige Yu <yuj...@gmail.com> >> *Sent:* Sunday, 26 January 2025 23:03 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* [External] : Re: mapConcurrent() with InterruptedException >> >> Checking in on what you've found out, Viktor. >> >> From where we left off, I understand that you were looking at >> alternatives instead of silent truncation? >> >> Have you reached any conclusion? >> >> We touched on disallowing interruption during mapConcurrent(). I still >> have concerns with disabling cancellation, because it basically undoes this >> API note from the javadoc >> <https://cr.openjdk.org/~alanb/sc-20240503/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)> >> : >> >> API Note: In progress tasks will be attempted to be cancelled, on a >> best-effort basis, in situations where the downstream no longer wants to >> receive any more elements. >> In reality, people will use mapConcurrent() to fan out rpcs. Sometimes >> these rpcs are just a single blocking call; yet sometimes they may >> themselves be a Structured Concurrency scope, with 2 or 3 rpcs that >> constitute a single logical operation. Under two conditions, cancellation >> is imho important semantic: >> >> 1. The downstream code uses filter().findFirst(), and when it sees an >> element, it will return and no longer needs the other pending rpcs to >> complete. If cancellation is disabled, these unnecessary rpcs will waste >> system resources. >> 2. One of the rpc throws and the Stream pipeline needs to propagate >> the exception. Again, if the other rpcs cannot be cancelled, we'll have >> many zombie rpcs. >> >> Zombie rpcs may or may not be a deal breaker, depending on the specific >> use case. But for a JDK library, losing cancellation would have a negative >> impact on usability. >> >> My 2c, >> >> >> On Fri, Jan 3, 2025 at 9:18 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> Hi Ben, >> >> Thanks for raising these questions—getting feedback is crucial in the >> Preview stage of features. >> >> I wrote a reply to the Reddit thread so I'll just summarize here: >> >> It is important to note that *mapConcurrent()* is not a part of the >> Structured Concurrency JEPs, so it is not designed to join SC scopes. >> >> I'm currently experimenting with ignoring-but-restoring interrupts on the >> "calling thread" for *mapConcurrent()*, as well as capping >> work-in-progress to *maxConcurrency* (not only capping the concurrency >> but also the amount of completed-but-yet-to-be-pushed work). Both of these >> adjustments should increase predictability of behavior in the face of >> blocking operations with variable delays. >> >> Another adjustment I'm looking at right now is to harden/improve the >> cleanup to wait for concurrent tasks to acknowledge cancellation, so that >> once the finisher is done executing the VTs are known to have terminated. >> >> As for not preserving the encounter order, that would be a completely >> different thing, and I'd encourage you to experiment with that if that >> functionality would be interesting for your use-case(s). >> >> Again, thanks for your feedback! >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of Jige >> Yu <yuj...@gmail.com> >> *Sent:* Friday, 3 January 2025 17:53 >> *To:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> >> *Subject:* mapConcurrent() with InterruptedException >> >> Hi Java Experts, >> >> I sent this email incorrectly to loom-dev@ and was told on Reddit that >> core-libs-dev is the right list. >> >> The question is about the behavior of mapConcurrent() when the thread is >> interrupted. >> >> Currently mapConcurrent()'s finisher phase will re-interrupt the thread, >> then stop at whatever element that has already been processed and return. >> >> This strikes me as a surprising behavior, because for example if I'm >> running: >> >> Stream.of(1, 2, 3) >> .gather(mapConcurrent(i -> i * 2)) >> .toList() >> >> and the thread is being interrupted, the result could be any of [2], [2, >> 4] or [2, 4, 6]. >> >> Since thread interruption is cooperative, there is no guarantee that the >> thread being interrupted will just abort. It's quite possible that it'll >> keep going and then will use for example [2] as the result of doubling the >> list of [1, 2, 3], which is imho incorrect. >> >> In the Reddit >> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/observations_of_gatherersmapconcurrent/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8M4PjPb7L$> >> thread, >> someone argued that interruption rarely happens so it's more of a >> theoretical issue. But interruption can easily happen in Structured >> Concurrency or in mapConcurrent() itself if any subtask has failed in order >> to cancel/interrupt the other ongoing tasks. >> >> There had been discussion about alternative strategies: >> >> 1. Don't respond to interruption and just keep running to completion. >> 2. Re-interrupt thread and wrap the InterruptedException in a >> standard unchecked exception (StructuredConcurrencyInterruptedException?). >> >> >> I have concerns with option 1 because it disables cancellation >> propagation when mapConcurrent() itself is used in a subtask of a parent >> mapConcurrent() or in a StructuredConcurrencyScope. >> >> Both equivalent Future-composition async code, or C++'s fiber trees >> support cancellation propagation and imho it's a critical feature or else >> it's possible that a zombie thread is still sending RPCs long after the >> main thread has exited (failed, or falled back to some default action). >> >> My arguments for option 2: >> >> 1. InterruptedException is more error prone than traditional checked >> exceptions for *users* to catch and handle. They can forget to >> re-interrupt the thread. It's so confusing that even seasoned programmers >> may not know they are *supposed to* re-interrupt the thread. >> 2. With Stream API using functional interfaces like Supplier, >> Function, the option of just tacking on "throws IE" isn't available to >> many >> users. >> 3. With Virtual Threads, it will be more acceptable, or even become >> common to do blocking calls from a stream operation (including but >> exclusive to mapConcurrent()). So the chance users are forced to deal with >> IE will become substantially higher. >> 4. Other APIs such as the Structured Concurrency API have already >> started wrapping system checked exceptions like ExecutionException, >> TimeoutException in unchecked exceptions ( join() >> >> <https://urldefense.com/v3/__https://download.java.net/java/early_access/loom/docs/api/java.base/java/util/concurrent/StructuredTaskScope.html*join()__;Iw!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MxGG4HzA$> >> for >> example). >> 5. Imho, exceptions that we'd rather users not catch and handle but >> instead should mostly just propagate up as is, should be unchecked. >> >> There is also a side discussion >> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/comment/m4z4f8c/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MyZYl02k$> >> about >> whether mapConcurrent() is better off preserving input order or push to >> downstream as soon as an element is computed. I'd love to discuss that >> topic too but maybe it's better to start a separate thread? >> >> Thank you and cheers! >> >> Ben Yu >> >>