Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package ghc-async for openSUSE:Factory 
checked in at 2026-06-10 15:57:46
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/ghc-async (Old)
 and      /work/SRC/openSUSE:Factory/.ghc-async.new.2375 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "ghc-async"

Wed Jun 10 15:57:46 2026 rev:34 rq:1358329 version:2.2.6

Changes:
--------
--- /work/SRC/openSUSE:Factory/ghc-async/ghc-async.changes      2024-12-20 
23:10:15.491578171 +0100
+++ /work/SRC/openSUSE:Factory/.ghc-async.new.2375/ghc-async.changes    
2026-06-10 15:58:11.657252096 +0200
@@ -1,0 +2,21 @@
+Wed Jan  7 11:26:16 UTC 2026 - Peter Simons <[email protected]>
+
+- Update async to version 2.2.6.
+  ## Changes in 2.2.6
+
+   - Added Control.Concurrent.Stream for processing streams with a fixed
+     number of workers. Includes a bounded version of mapConcurrently:
+     mapConcurrentlyBounded.
+   - Added Control.Concurrent.Async.Warden for a way to create Asyncs that
+     is more flexible than 'withAsync' but retains the guarantee of cancelling
+     orphaned threads, unlike 'async'.
+   - support GHC 9.12, GHC 9.14, and MicroHs
+   - cabal flag debug-auto-label: label threads automatically (#167)
+
+-------------------------------------------------------------------
+Tue Jan  6 10:11:14 UTC 2026 - Peter Simons <[email protected]>
+
+- Update async to version 2.2.5 revision 4.
+  Upstream has revised the Cabal build instructions on Hackage.
+
+-------------------------------------------------------------------

Old:
----
  async-2.2.5.tar.gz
  async.cabal

New:
----
  async-2.2.6.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ ghc-async.spec ++++++
--- /var/tmp/diff_new_pack.m6nHu3/_old  2026-06-10 15:58:12.629292377 +0200
+++ /var/tmp/diff_new_pack.m6nHu3/_new  2026-06-10 15:58:12.633292543 +0200
@@ -1,7 +1,7 @@
 #
 # spec file for package ghc-async
 #
-# Copyright (c) 2024 SUSE LLC
+# Copyright (c) 2026 SUSE LLC
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -20,13 +20,12 @@
 %global pkgver %{pkg_name}-%{version}
 %bcond_with tests
 Name:           ghc-%{pkg_name}
-Version:        2.2.5
+Version:        2.2.6
 Release:        0
 Summary:        Run IO operations asynchronously and wait for their results
 License:        BSD-3-Clause
 URL:            https://hackage.haskell.org/package/%{pkg_name}
 Source0:        
https://hackage.haskell.org/package/%{pkg_name}-%{version}/%{pkg_name}-%{version}.tar.gz
-Source1:        
https://hackage.haskell.org/package/%{pkg_name}-%{version}/revision/3.cabal#/%{pkg_name}.cabal
 BuildRequires:  ghc-Cabal-devel
 BuildRequires:  ghc-base-devel
 BuildRequires:  ghc-base-prof
@@ -35,6 +34,8 @@
 BuildRequires:  ghc-rpm-macros
 BuildRequires:  ghc-stm-devel
 BuildRequires:  ghc-stm-prof
+BuildRequires:  ghc-unordered-containers-devel
+BuildRequires:  ghc-unordered-containers-prof
 ExcludeArch:    %{ix86}
 %if %{with tests}
 BuildRequires:  ghc-HUnit-devel
@@ -88,7 +89,6 @@
 
 %prep
 %autosetup -n %{pkg_name}-%{version}
-cp -p %{SOURCE1} %{pkg_name}.cabal
 
 %build
 %ghc_lib_build

++++++ async-2.2.5.tar.gz -> async-2.2.6.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Async/Internal.hs 
new/async-2.2.6/Control/Concurrent/Async/Internal.hs
--- old/async-2.2.5/Control/Concurrent/Async/Internal.hs        2001-09-09 
03:46:40.000000000 +0200
+++ new/async-2.2.6/Control/Concurrent/Async/Internal.hs        2001-09-09 
03:46:40.000000000 +0200
@@ -55,7 +55,21 @@
 
 import GHC.Exts
 import GHC.IO hiding (finally, onException)
-import GHC.Conc
+import GHC.Conc (ThreadId(..))
+
+#if defined(__MHS__)
+import Data.Traversable
+#endif
+
+#ifdef DEBUG_AUTO_LABEL
+import qualified GHC.Stack
+#endif
+
+#ifdef DEBUG_AUTO_LABEL
+#define CALLSTACK GHC.Stack.HasCallStack =>
+#else
+#define CALLSTACK
+#endif
 
 -- 
-----------------------------------------------------------------------------
 -- STM Async API
@@ -95,40 +109,53 @@
 -- (see module-level documentation for details).
 --
 -- __Use 'withAsync' style functions wherever you can instead!__
-async :: IO a -> IO (Async a)
+async ::
+  CALLSTACK
+  IO a -> IO (Async a)
 async = inline asyncUsing rawForkIO
 
 -- | Like 'async' but using 'forkOS' internally.
-asyncBound :: IO a -> IO (Async a)
+asyncBound ::
+  CALLSTACK
+  IO a -> IO (Async a)
 asyncBound = asyncUsing forkOS
 
 -- | Like 'async' but using 'forkOn' internally.
-asyncOn :: Int -> IO a -> IO (Async a)
+asyncOn ::
+  CALLSTACK
+  Int -> IO a -> IO (Async a)
 asyncOn = asyncUsing . rawForkOn
 
 -- | Like 'async' but using 'forkIOWithUnmask' internally.  The child
 -- thread is passed a function that can be used to unmask asynchronous
 -- exceptions.
-asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
+asyncWithUnmask ::
+  CALLSTACK
+  ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
 asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask)
 
 -- | Like 'asyncOn' but using 'forkOnWithUnmask' internally.  The
 -- child thread is passed a function that can be used to unmask
 -- asynchronous exceptions.
-asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
+asyncOnWithUnmask ::
+  CALLSTACK
+  Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
 asyncOnWithUnmask cpu actionWith =
   asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)
 
-asyncUsing :: (IO () -> IO ThreadId)
-           -> IO a -> IO (Async a)
-asyncUsing doFork = \action -> do
+asyncUsing ::
+  CALLSTACK
+  (IO () -> IO ThreadId) -> IO a -> IO (Async a)
+asyncUsing doFork action = do
    var <- newEmptyTMVarIO
+   let action_plus = debugLabelMe >> action
    -- t <- forkFinally action (\r -> atomically $ putTMVar var r)
    -- slightly faster:
    t <- mask $ \restore ->
-          doFork $ try (restore action) >>= atomically . putTMVar var
+          doFork $ try (restore action_plus) >>= atomically . putTMVar var
    return (Async t (readTMVar var))
 
+
 -- | Spawn an asynchronous action in a separate thread, and pass its
 -- @Async@ handle to the supplied function.  When the function returns
 -- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
@@ -144,41 +171,51 @@
 -- to `withAsync` returns, so nesting many `withAsync` calls requires
 -- linear memory.
 --
-withAsync :: IO a -> (Async a -> IO b) -> IO b
+withAsync ::
+  CALLSTACK
+  IO a -> (Async a -> IO b) -> IO b
 withAsync = inline withAsyncUsing rawForkIO
 
 -- | Like 'withAsync' but uses 'forkOS' internally.
-withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
+withAsyncBound ::
+  CALLSTACK
+  IO a -> (Async a -> IO b) -> IO b
 withAsyncBound = withAsyncUsing forkOS
 
 -- | Like 'withAsync' but uses 'forkOn' internally.
-withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
+withAsyncOn ::
+  CALLSTACK
+  Int -> IO a -> (Async a -> IO b) -> IO b
 withAsyncOn = withAsyncUsing . rawForkOn
 
 -- | Like 'withAsync' but uses 'forkIOWithUnmask' internally.  The
 -- child thread is passed a function that can be used to unmask
 -- asynchronous exceptions.
-withAsyncWithUnmask
-  :: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
+withAsyncWithUnmask ::
+  CALLSTACK
+  ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
 withAsyncWithUnmask actionWith =
   withAsyncUsing rawForkIO (actionWith unsafeUnmask)
 
 -- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally.  The
 -- child thread is passed a function that can be used to unmask
 -- asynchronous exceptions
-withAsyncOnWithUnmask
-  :: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
+withAsyncOnWithUnmask ::
+  CALLSTACK
+  Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
 withAsyncOnWithUnmask cpu actionWith =
   withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)
 
-withAsyncUsing :: (IO () -> IO ThreadId)
-               -> IO a -> (Async a -> IO b) -> IO b
+withAsyncUsing ::
+  CALLSTACK
+  (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
 -- The bracket version works, but is slow.  We can do better by
 -- hand-coding it:
-withAsyncUsing doFork = \action inner -> do
+withAsyncUsing doFork action inner = do
   var <- newEmptyTMVarIO
   mask $ \restore -> do
-    t <- doFork $ try (restore action) >>= atomically . putTMVar var
+    let action_plus = debugLabelMe >> action
+    t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
     let a = Async t (readTMVar var)
     r <- restore (inner a) `catchAll` \e -> do
       uninterruptibleCancel a
@@ -265,6 +302,8 @@
 -- | Cancel multiple asynchronous actions by throwing the @AsyncCancelled@
 -- exception to each of them in turn, then waiting for all the `Async` threads
 -- to complete.
+--
+-- @since 2.2.5
 cancelMany :: [Async a] -> IO ()
 cancelMany as = do
   mapM_ (\(Async t _) -> throwTo t AsyncCancelled) as
@@ -554,11 +593,15 @@
 -- >   withAsync right $ \b ->
 -- >   waitEither a b
 --
-race :: IO a -> IO b -> IO (Either a b)
+race ::
+  CALLSTACK
+  IO a -> IO b -> IO (Either a b)
 
 -- | Like 'race', but the result is ignored.
 --
-race_ :: IO a -> IO b -> IO ()
+race_ ::
+  CALLSTACK
+  IO a -> IO b -> IO ()
 
 
 -- | Run two @IO@ actions concurrently, and return both results.  If
@@ -570,19 +613,27 @@
 -- >   withAsync left $ \a ->
 -- >   withAsync right $ \b ->
 -- >   waitBoth a b
-concurrently :: IO a -> IO b -> IO (a,b)
+--
+-- To run more than two actions concurrently, see 'mapConcurrently'.
+concurrently ::
+  CALLSTACK
+  IO a -> IO b -> IO (a,b)
 
 
 -- | Run two @IO@ actions concurrently. If both of them end with @Right@,
 -- return both results.  If one of then ends with @Left@, interrupt the other
--- action and return the @Left@. 
+-- action and return the @Left@.
 --
-concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
+concurrentlyE ::
+  CALLSTACK
+  IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
 
 -- | 'concurrently', but ignore the result values
 --
 -- @since 2.1.1
-concurrently_ :: IO a -> IO b -> IO ()
+concurrently_ ::
+  CALLSTACK
+  IO a -> IO b -> IO ()
 
 #define USE_ASYNC_VERSIONS 0
 
@@ -643,9 +694,11 @@
             Left ex -> throwIO ex
             Right r -> collect (r:xs) m
 
-concurrently' :: IO a -> IO b
-             -> (IO (Either SomeException (Either a b)) -> IO r)
-             -> IO r
+concurrently' ::
+  CALLSTACK
+  IO a -> IO b
+  -> (IO (Either SomeException (Either a b)) -> IO r)
+  -> IO r
 concurrently' left right collect = do
     done <- newEmptyMVar
     mask $ \restore -> do
@@ -689,7 +742,7 @@
                   -- ensure the children are really dead
                   replicateM_ count' (tryAgain $ takeMVar done)
 
-        r <- collect (tryAgain $ takeDone) `onException` stop
+        r <- collect (tryAgain takeDone) `onException` stop
         stop
         return r
 
@@ -717,11 +770,18 @@
 --
 -- > pages <- mapConcurrently getURL ["url1", "url2", "url3"]
 --
--- Take into account that @async@ will try to immediately spawn a thread
--- for each element of the @Traversable@, so running this on large
--- inputs without care may lead to resource exhaustion (of memory,
--- file descriptors, or other limited resources).
-mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
+-- If you just have a list of actions, run them concurrently with
+--
+-- > results <- mapConcurrently id [act1, act2, act3]
+--
+-- NOTE: @mapConcurrently@ will immediately spawn a thread for each
+-- element of the @Traversable@, so running this on large inputs can
+-- lead to resource exhaustion (of memory, file descriptors, or other
+-- limited resources). To avoid unbounded resource usage, see
+-- "Control.Concurrent.Stream".
+mapConcurrently ::
+  CALLSTACK
+  Traversable t => (a -> IO b) -> t a -> IO (t b)
 mapConcurrently f = runConcurrently . traverse (Concurrently . f)
 
 -- | `forConcurrently` is `mapConcurrently` with its arguments flipped
@@ -729,29 +789,39 @@
 -- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
 --
 -- @since 2.1.0
-forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
+forConcurrently ::
+  CALLSTACK
+  Traversable t => t a -> (a -> IO b) -> IO (t b)
 forConcurrently = flip mapConcurrently
 
 -- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
 -- a concurrent equivalent of 'mapM_'.
-mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
+mapConcurrently_ ::
+  CALLSTACK
+  F.Foldable f => (a -> IO b) -> f a -> IO ()
 mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f)
 
 -- | `forConcurrently_` is `forConcurrently` with the return value discarded;
 -- a concurrent equivalent of 'forM_'.
-forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
+forConcurrently_ ::
+  CALLSTACK
+  F.Foldable f => f a -> (a -> IO b) -> IO ()
 forConcurrently_ = flip mapConcurrently_
 
 -- | Perform the action in the given number of threads.
 --
 -- @since 2.1.1
-replicateConcurrently :: Int -> IO a -> IO [a]
-replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . 
Concurrently
+replicateConcurrently ::
+  CALLSTACK
+  Int -> IO a -> IO [a]
+replicateConcurrently cnt = runConcurrently . replicateM cnt . Concurrently
 
 -- | Same as 'replicateConcurrently', but ignore the results.
 --
 -- @since 2.1.1
-replicateConcurrently_ :: Int -> IO a -> IO ()
+replicateConcurrently_ ::
+  CALLSTACK
+  Int -> IO a -> IO ()
 replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . 
Concurrently . void
 
 -- 
-----------------------------------------------------------------------------
@@ -831,7 +901,7 @@
     ConcurrentlyE $ fmap (\(f, a) -> f a) <$> concurrentlyE fs eas
 
 #if MIN_VERSION_base(4,9,0)
--- | Either the combination of the successful results, or the first failure. 
+-- | Either the combination of the successful results, or the first failure.
 instance Semigroup a => Semigroup (ConcurrentlyE e a) where
   (<>) = liftA2 (<>)
 
@@ -845,14 +915,16 @@
 -- | Fork a thread that runs the supplied action, and if it raises an
 -- exception, re-runs the action.  The thread terminates only when the
 -- action runs to completion without raising an exception.
-forkRepeat :: IO a -> IO ThreadId
+forkRepeat ::
+  CALLSTACK
+  IO a -> IO ThreadId
 forkRepeat action =
   mask $ \restore ->
     let go = do r <- tryAll (restore action)
                 case r of
                   Left _ -> go
                   _      -> return ()
-    in forkIO go
+    in forkIO (debugLabelMe >> go)
 
 catchAll :: IO a -> (SomeException -> IO a) -> IO a
 catchAll = catch
@@ -864,11 +936,29 @@
 -- handler: saves a bit of time when we will be installing our own
 -- exception handler.
 {-# INLINE rawForkIO #-}
-rawForkIO :: IO () -> IO ThreadId
-rawForkIO (IO action) = IO $ \ s ->
-   case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
+rawForkIO ::
+  CALLSTACK
+  IO () -> IO ThreadId
+rawForkIO action = IO $ \ s ->
+   case fork# action_plus s of (# s1, tid #) -> (# s1, ThreadId tid #)
+  where
+    (IO action_plus) = debugLabelMe >> action
 
 {-# INLINE rawForkOn #-}
-rawForkOn :: Int -> IO () -> IO ThreadId
-rawForkOn (I# cpu) (IO action) = IO $ \ s ->
-   case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
+rawForkOn ::
+  CALLSTACK
+  Int -> IO () -> IO ThreadId
+rawForkOn (I# cpu) action = IO $ \ s ->
+   case forkOn# cpu action_plus s of (# s1, tid #) -> (# s1, ThreadId tid #)
+  where
+   (IO action_plus) = debugLabelMe >> action
+
+debugLabelMe ::
+  CALLSTACK
+  IO ()
+debugLabelMe =
+#ifdef DEBUG_AUTO_LABEL
+  myThreadId >>= flip labelThread (GHC.Stack.prettyCallStack callStack)
+#else
+  pure ()
+#endif
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Async/Warden.hs 
new/async-2.2.6/Control/Concurrent/Async/Warden.hs
--- old/async-2.2.5/Control/Concurrent/Async/Warden.hs  1970-01-01 
01:00:00.000000000 +0100
+++ new/async-2.2.6/Control/Concurrent/Async/Warden.hs  2001-09-09 
03:46:40.000000000 +0200
@@ -0,0 +1,95 @@
+{-
+  Copyright (c) Meta Platforms, Inc. and affiliates.
+  All rights reserved.
+
+  This source code is licensed under the BSD-style license found in the
+  LICENSE file in the root directory of this source tree.
+-}
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- | A more flexible way to create 'Async's and have them automatically
+-- cancelled when the 'Warden' is shut down.
+module Control.Concurrent.Async.Warden
+  ( Warden
+  , withWarden
+  , create
+  , shutdown
+  , spawn
+  , spawn_
+  , spawnMask
+  , WardenException(..)
+  ) where
+
+import Control.Concurrent (forkIO)
+import Control.Concurrent.Async (Async)
+import qualified Control.Concurrent.Async as Async
+import Control.Concurrent.MVar
+import Control.Exception
+import Data.HashSet (HashSet)
+import qualified Data.HashSet as HashSet
+import System.IO (fixIO)
+
+#if defined(__MHS__)
+import Prelude hiding(mapM_)
+import Control.Monad hiding(mapM_)
+import Data.Foldable(mapM_)
+#else
+import Control.Monad
+#endif
+
+-- | A 'Warden' is an owner of 'Async's which cancels them on 'shutdown'.
+--
+-- 'Nothing' in the MVar means the 'Warden' has been shut down.
+newtype Warden = Warden (MVar (Maybe (HashSet (Async ()))))
+
+-- | Run the action with a new 'Warden', and call 'shutdown' when the action
+-- exits.
+withWarden :: (Warden -> IO a) -> IO a
+withWarden = bracket create shutdown
+
+-- | Create a new 'Warden'.
+create :: IO Warden
+create = Warden <$> newMVar (Just mempty)
+
+-- | Shutdown a 'Warden', calling 'cancel' on all owned threads. Subsequent
+-- calls to 'spawn' and 'shutdown' will be no-ops. 
+-- 
+-- Note that any exceptions thrown by the threads will be ignored. If you want
+-- exceptions to be propagated, either call `wait` explicitly on the 'Async', 
+-- or use 'link'.
+shutdown :: Warden -> IO ()
+shutdown (Warden v) = do
+  r <- swapMVar v Nothing
+  mapM_ (Async.mapConcurrently_ Async.cancel) r
+
+forget :: Warden -> Async a -> IO ()
+forget (Warden v) async = modifyMVar_ v $ \x -> case x of
+  Just xs -> return $! Just $! HashSet.delete (void async) xs
+  Nothing -> return Nothing
+
+-- | Spawn a thread with masked exceptions and pass an unmask function to the
+-- action.
+spawnMask :: Warden -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
+spawnMask (Warden v) action = modifyMVar v $ \r -> case r of
+  Just asyncs -> do
+    -- Create a new thread which removes itself from the 'HashSet' when it
+    -- exits.
+    this <- fixIO $ \this -> mask_ $ Async.asyncWithUnmask $ \unmask ->
+      action unmask `finally` forget (Warden v) this
+    return (Just $ HashSet.insert (void this) asyncs, this)
+  Nothing -> throwIO $ WardenException "Warden has been shut down"
+
+newtype WardenException = WardenException String
+  deriving (Show)
+
+instance Exception WardenException
+
+-- | Spawn a new thread owned by the 'Warden'.
+spawn :: Warden -> IO a -> IO (Async a)
+spawn warden action = spawnMask warden $ \unmask -> unmask action
+
+-- | Spawn a new thread owned by the 'Warden'.
+spawn_ :: Warden -> IO () -> IO ()
+spawn_ w = void . spawn w
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Async.hs 
new/async-2.2.6/Control/Concurrent/Async.hs
--- old/async-2.2.5/Control/Concurrent/Async.hs 2001-09-09 03:46:40.000000000 
+0200
+++ new/async-2.2.6/Control/Concurrent/Async.hs 2001-09-09 03:46:40.000000000 
+0200
@@ -19,7 +19,7 @@
 -- == High-level API
 --
 -- @async@'s high-level API spawns /lexically scoped/ threads,
--- ensuring the following key poperties that make it safer to use
+-- ensuring the following key properties that make it safer to use
 -- than using plain 'forkIO':
 --
 -- 1. No exception is swallowed (waiting for results propagates exceptions).
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Stream.hs 
new/async-2.2.6/Control/Concurrent/Stream.hs
--- old/async-2.2.5/Control/Concurrent/Stream.hs        1970-01-01 
01:00:00.000000000 +0100
+++ new/async-2.2.6/Control/Concurrent/Stream.hs        2001-09-09 
03:46:40.000000000 +0200
@@ -0,0 +1,138 @@
+{-
+  Copyright (c) Meta Platforms, Inc. and affiliates.
+  All rights reserved.
+
+  This source code is licensed under the BSD-style license found in the
+  LICENSE file in the root directory of this source tree.
+-}
+
+-- | Processing streams with a fixed number of worker threads
+module Control.Concurrent.Stream
+  ( stream
+  , streamBound
+  , streamWithInput
+  , streamWithOutput
+  , streamWithInputOutput
+  , mapConcurrentlyBounded
+  , forConcurrentlyBounded
+  ) where
+
+import Control.Concurrent.Async
+import Control.Concurrent.STM
+import Control.Exception
+import Control.Monad
+import Data.Maybe
+import Data.IORef
+
+data ShouldBindThreads = BoundThreads | UnboundThreads
+
+-- | Maps a fixed number of workers concurrently over a stream of values
+-- produced by a producer function. The producer is passed a function to
+-- call for each work item. If a worker throws a synchronous exception, it
+-- will be propagated to the caller.
+stream
+  :: Int -- ^ Maximum Concurrency
+  -> ((a -> IO ()) -> IO ()) -- ^ Producer
+  -> (a -> IO ()) -- ^ Worker
+  -> IO ()
+stream maxConcurrency producer worker =
+  streamWithInput producer (replicate maxConcurrency ()) $ const worker
+
+-- | Like stream, but uses bound threads for the workers.  See
+-- 'Control.Concurrent.forkOS' for details on bound threads.
+streamBound
+  :: Int -- ^ Maximum Concurrency
+  -> ((a -> IO ()) -> IO ()) -- ^ Producer
+  -> (a -> IO ()) -- ^ Worker
+  -> IO ()
+streamBound maxConcurrency producer worker =
+  stream_ BoundThreads producer (replicate maxConcurrency ()) $ const worker
+
+-- | Like stream, but each worker is passed an element of an input list.
+streamWithInput
+  :: ((a -> IO ()) -> IO ()) -- ^ Producer
+  -> [b] -- ^ Worker state
+  -> (b -> a -> IO ()) -- ^ Worker
+  -> IO ()
+streamWithInput = stream_ UnboundThreads
+
+-- | Like 'stream', but collects the results of each worker
+streamWithOutput
+  :: Int 
+  -> ((a -> IO ()) -> IO ()) -- ^ Producer
+  -> (a -> IO c) -- ^ Worker
+  -> IO [c]
+streamWithOutput maxConcurrency producer worker =
+  streamWithInputOutput producer (replicate maxConcurrency ()) $ 
+    const worker
+
+-- | Like 'streamWithInput', but collects the results of each worker
+streamWithInputOutput
+  :: ((a -> IO ()) -> IO ()) -- ^ Producer
+  -> [b] -- ^ Worker input
+  -> (b -> a -> IO c) -- ^ Worker
+  -> IO [c]
+streamWithInputOutput producer workerInput worker = do
+  results <- newIORef []
+  let prod write = producer $ \a -> do
+        res <- newIORef Nothing
+        modifyIORef results (res :)
+        write (a, res)
+  stream_ UnboundThreads prod workerInput $ \s (a,ref) -> do
+    worker s a >>= writeIORef ref . Just
+  readIORef results >>= mapM readIORef >>= return . catMaybes . reverse
+    
+stream_
+  :: ShouldBindThreads -- use bound threads?
+  -> ((a -> IO ()) -> IO ()) -- ^ Producer
+  -> [b] -- Worker input
+  -> (b -> a -> IO ()) -- ^ Worker
+  -> IO ()
+stream_ useBoundThreads producer workerInput worker = do
+  let maxConcurrency = length workerInput
+  q <- atomically $ newTBQueue (fromIntegral maxConcurrency)
+  let write x = atomically $ writeTBQueue q (Just x)
+  mask $ \unmask ->
+    concurrently_ (runWorkers unmask q) $ unmask $ do
+      -- run the producer
+      producer write
+      -- write end-markers for all workers
+      replicateM_ maxConcurrency $
+        atomically $ writeTBQueue q Nothing
+  where
+    runWorkers unmask q = case useBoundThreads of
+      BoundThreads ->
+        foldr1 concurrentlyBound $
+          map (runWorker unmask q) workerInput
+      UnboundThreads ->
+        mapConcurrently_ (runWorker unmask q) workerInput
+
+    concurrentlyBound l r =
+      withAsyncBound l $ \a ->
+      withAsyncBound r $ \b ->
+      void $ waitBoth a b
+
+    runWorker unmask q s = do
+      v <- atomically $ readTBQueue q
+      case v of
+        Nothing -> return ()
+        Just t -> do
+          unmask (worker s t)
+          runWorker unmask q s
+
+-- | Concurrent map over a list of values, using a bounded number of threads.
+mapConcurrentlyBounded
+  :: Int -- ^ Maximum concurrency
+  -> (a -> IO b) -- ^ Function to map over the input values
+  -> [a] -- ^ List of input values
+  -> IO [b] -- ^ List of output values
+mapConcurrentlyBounded maxConcurrency f input =
+  streamWithOutput maxConcurrency (forM_ input) f
+  
+-- | 'mapConcurrentlyBounded' but with its arguments reversed
+forConcurrentlyBounded
+  :: Int -- ^ Maximum concurrency
+  -> [a] -- ^ List of input values
+  -> (a -> IO b) -- ^ Function to map over the input values
+  -> IO [b] -- ^ List of output values
+forConcurrentlyBounded = flip . mapConcurrentlyBounded
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/async.cabal new/async-2.2.6/async.cabal
--- old/async-2.2.5/async.cabal 2001-09-09 03:46:40.000000000 +0200
+++ new/async-2.2.6/async.cabal 2001-09-09 03:46:40.000000000 +0200
@@ -1,5 +1,5 @@
 name:                async
-version:             2.2.5
+version:             2.2.6
 -- don't forget to update ./changelog.md!
 synopsis:            Run IO operations asynchronously and wait for their 
results
 
@@ -35,9 +35,12 @@
 homepage:            https://github.com/simonmar/async
 bug-reports:         https://github.com/simonmar/async/issues
 tested-with:
-    GHC == 9.8.1
-    GHC == 9.6.3
-    GHC == 9.4.7
+    GHC == 9.14.1
+    GHC == 9.12.2
+    GHC == 9.10.3
+    GHC == 9.8.4
+    GHC == 9.6.7
+    GHC == 9.4.8
     GHC == 9.2.8
     GHC == 9.0.2
     GHC == 8.10.7
@@ -46,8 +49,8 @@
     GHC == 8.4.4
     GHC == 8.2.2
     GHC == 8.0.2
-    GHC == 7.10.3
-    -- Drop GHC < 7.10 to be able to use the ubuntu-20.04 buildpack
+    -- CI does not support GHC 7
+    -- GHC == 7.10.3
     -- GHC == 7.8.4
     -- GHC == 7.6.3
     -- GHC == 7.4.2
@@ -62,6 +65,16 @@
     type: git
     location: https://github.com/simonmar/async.git
 
+flag debug-auto-label
+   description:
+     Strictly for debugging as it might have a non-negligible overhead.
+
+     Enabling this flag will auto-label the threads spawned by @async@. Use it 
to
+     find where are unlabelled threads spawned in your program (be it your 
code or
+     dependency code).
+   default: False
+   manual: True
+
 library
     default-language:    Haskell2010
     other-extensions:    CPP, MagicHash, RankNTypes, UnboxedTuples
@@ -69,12 +82,18 @@
         other-extensions: Trustworthy
     exposed-modules:     Control.Concurrent.Async
                          Control.Concurrent.Async.Internal
-    build-depends:       base     >= 4.3     && < 4.20,
-                         hashable >= 1.1.2.0 && < 1.5,
-                         stm      >= 2.2     && < 2.6
+                         Control.Concurrent.Async.Warden
+                         Control.Concurrent.Stream
+    build-depends:       base     >= 4.3     && < 4.23,
+                         hashable >= 1.1.2.0 && < 1.6,
+                         stm      >= 2.2     && < 2.6,
+                         unordered-containers >= 0.2 && < 0.3
+    if flag(debug-auto-label)
+      cpp-options: -DDEBUG_AUTO_LABEL
 
 test-suite test-async
     default-language: Haskell2010
+    ghc-options: -threaded
     type:       exitcode-stdio-1.0
     hs-source-dirs: test
     main-is:    test-async.hs
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/changelog.md new/async-2.2.6/changelog.md
--- old/async-2.2.5/changelog.md        2001-09-09 03:46:40.000000000 +0200
+++ new/async-2.2.6/changelog.md        2001-09-09 03:46:40.000000000 +0200
@@ -1,3 +1,14 @@
+## Changes in 2.2.6
+
+ - Added Control.Concurrent.Stream for processing streams with a fixed
+   number of workers. Includes a bounded version of mapConcurrently:
+   mapConcurrentlyBounded.
+ - Added Control.Concurrent.Async.Warden for a way to create Asyncs that
+   is more flexible than 'withAsync' but retains the guarantee of cancelling
+   orphaned threads, unlike 'async'.
+ - support GHC 9.12, GHC 9.14, and MicroHs
+ - cabal flag debug-auto-label: label threads automatically (#167)
+
 ## Changes in 2.2.5
 
  - #117: Document that empty for Concurrently waits forever
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/async-2.2.5/test/test-async.hs 
new/async-2.2.6/test/test-async.hs
--- old/async-2.2.5/test/test-async.hs  2001-09-09 03:46:40.000000000 +0200
+++ new/async-2.2.6/test/test-async.hs  2001-09-09 03:46:40.000000000 +0200
@@ -8,6 +8,8 @@
 
 import Control.Concurrent.STM
 import Control.Concurrent.Async
+import Control.Concurrent.Async.Warden
+import Control.Concurrent.Stream
 import Control.Exception
 import Data.IORef
 import Data.Typeable
@@ -65,6 +67,17 @@
       , testCase "concurrentlyE_Monoid" concurrentlyE_Monoid
       , testCase "concurrentlyE_Monoid_fail" concurrentlyE_Monoid_fail
 #endif
+  , testCase "stream" $ case_stream False
+  , testCase "streamBound" $ case_stream True
+  , testCase "stream_exception" $ case_stream_exception False
+  , testCase "streamBound_exception" $ case_stream_exception True
+  , testCase "streamWithInput" case_streamInput
+  , testCase "streamWithInput_exception" case_streamInput_exception
+  , testCase "mapConcurrentlyBounded" case_mapConcurrentlyBounded
+  , testCase "mapConcurrentlyBounded_exception" 
+      case_mapConcurrentlyBounded_exception
+  , testCase "Warden" case_Warden
+  , testCase "Warden_spawn_after_shutdown" case_Warden_spawn_after_shutdown
   ]
  ]
 
@@ -459,3 +472,79 @@
         r :: Either Char [Char] <- runConcurrentlyE $ foldMap ConcurrentlyE $ 
current
         assertEqual "The earliest failure" (Left 'u') r
 #endif
+
+case_stream :: Bool -> Assertion
+case_stream bound = do
+  ref <- newIORef []
+  let inp = [1..100]
+  let producer write = forM_ inp $ \x -> write (show x)
+  (if bound then streamBound else stream) 4 producer $ \s -> atomicModifyIORef 
ref (\l -> (s:l, ()))
+  res <- readIORef ref
+  sort res @?= sort (map show inp)
+
+case_stream_exception :: Bool -> Assertion
+case_stream_exception bound = do
+  let inp = [1..100]
+  let producer write = forM_ inp $ \x -> write (show x)
+  r <- try $ (if bound then streamBound else stream) 4 producer $ \s -> 
+    when (s == "3") $ throwIO (ErrorCall s)
+  r @?= Left (ErrorCall "3" :: ErrorCall)
+
+case_streamInput :: Assertion
+case_streamInput = do
+  ref <- newIORef []
+  let inp = [1..100]; workers = [1..4] :: [Int]
+  let producer write = forM_ inp $ \x -> write (show x)
+  streamWithInput producer workers $ \s t -> atomicModifyIORef ref (\l -> 
((s,t):l, ()))
+  res <- readIORef ref
+  sort (map snd res) @?= sort (map show inp)
+  all ((`elem` workers) . fst) res @?= True
+
+case_streamInput_exception :: Assertion
+case_streamInput_exception = do
+  let inp = [1..100]; workers = [1..4] :: [Int]
+  let producer write = forM_ inp $ \x -> write (show x)
+  r <- try $ streamWithInput producer workers $ \s t -> 
+    when (t == "3") $ throwIO (ErrorCall t)
+  r @?= Left (ErrorCall "3" :: ErrorCall)
+
+case_mapConcurrentlyBounded :: Assertion
+case_mapConcurrentlyBounded = do
+  let inp = [1..100]
+  let f x = threadDelay 1000 >> return (x * 2)
+  res <- mapConcurrentlyBounded 4 f inp
+  res @?= map (*2) inp
+
+case_mapConcurrentlyBounded_exception :: Assertion
+case_mapConcurrentlyBounded_exception = do
+  let inp = [1..100]
+  let f x | x == 3 = throwIO $ ErrorCall "3"
+          | otherwise = threadDelay 1000 >> return (x * 2)
+  res <- try $ mapConcurrentlyBounded 4 f inp
+  res @?= Left (ErrorCall "3" :: ErrorCall)
+
+case_Warden :: Assertion
+case_Warden = do
+  a3 <- withWarden $ \warden -> do
+    a1 <- spawn warden $ return 1
+    a2 <- spawnMask warden $ \unmask -> unmask (return 2)
+    a3 <- spawn warden $ threadDelay 10000000
+    spawn_ warden $ throwIO (ErrorCall "a4") -- ignored
+    r1 <- wait a1
+    r1 @?= 1
+    r2 <- wait a2
+    r2 @?= 2
+    return a3
+  r3 <- waitCatch a3
+  case r3 of
+    Right _ -> assertFailure "Expected AsyncCancelled"
+    Left e -> fromException e @?= Just AsyncCancelled
+
+case_Warden_spawn_after_shutdown :: Assertion
+case_Warden_spawn_after_shutdown = do
+  warden <- create
+  shutdown warden
+  r <- try $ spawn warden $ return ()
+  case r of
+    Left (WardenException{}) -> return ()  -- expected
+    Right _ -> assertFailure "Expected WardenException"
\ No newline at end of file

Reply via email to