Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package ghc-async for openSUSE:Factory checked in at 2026-06-10 15:57:46 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/ghc-async (Old) and /work/SRC/openSUSE:Factory/.ghc-async.new.2375 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "ghc-async" Wed Jun 10 15:57:46 2026 rev:34 rq:1358329 version:2.2.6 Changes: -------- --- /work/SRC/openSUSE:Factory/ghc-async/ghc-async.changes 2024-12-20 23:10:15.491578171 +0100 +++ /work/SRC/openSUSE:Factory/.ghc-async.new.2375/ghc-async.changes 2026-06-10 15:58:11.657252096 +0200 @@ -1,0 +2,21 @@ +Wed Jan 7 11:26:16 UTC 2026 - Peter Simons <[email protected]> + +- Update async to version 2.2.6. + ## Changes in 2.2.6 + + - Added Control.Concurrent.Stream for processing streams with a fixed + number of workers. Includes a bounded version of mapConcurrently: + mapConcurrentlyBounded. + - Added Control.Concurrent.Async.Warden for a way to create Asyncs that + is more flexible than 'withAsync' but retains the guarantee of cancelling + orphaned threads, unlike 'async'. + - support GHC 9.12, GHC 9.14, and MicroHs + - cabal flag debug-auto-label: label threads automatically (#167) + +------------------------------------------------------------------- +Tue Jan 6 10:11:14 UTC 2026 - Peter Simons <[email protected]> + +- Update async to version 2.2.5 revision 4. + Upstream has revised the Cabal build instructions on Hackage. + +------------------------------------------------------------------- Old: ---- async-2.2.5.tar.gz async.cabal New: ---- async-2.2.6.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ ghc-async.spec ++++++ --- /var/tmp/diff_new_pack.m6nHu3/_old 2026-06-10 15:58:12.629292377 +0200 +++ /var/tmp/diff_new_pack.m6nHu3/_new 2026-06-10 15:58:12.633292543 +0200 @@ -1,7 +1,7 @@ # # spec file for package ghc-async # -# Copyright (c) 2024 SUSE LLC +# Copyright (c) 2026 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -20,13 +20,12 @@ %global pkgver %{pkg_name}-%{version} %bcond_with tests Name: ghc-%{pkg_name} -Version: 2.2.5 +Version: 2.2.6 Release: 0 Summary: Run IO operations asynchronously and wait for their results License: BSD-3-Clause URL: https://hackage.haskell.org/package/%{pkg_name} Source0: https://hackage.haskell.org/package/%{pkg_name}-%{version}/%{pkg_name}-%{version}.tar.gz -Source1: https://hackage.haskell.org/package/%{pkg_name}-%{version}/revision/3.cabal#/%{pkg_name}.cabal BuildRequires: ghc-Cabal-devel BuildRequires: ghc-base-devel BuildRequires: ghc-base-prof @@ -35,6 +34,8 @@ BuildRequires: ghc-rpm-macros BuildRequires: ghc-stm-devel BuildRequires: ghc-stm-prof +BuildRequires: ghc-unordered-containers-devel +BuildRequires: ghc-unordered-containers-prof ExcludeArch: %{ix86} %if %{with tests} BuildRequires: ghc-HUnit-devel @@ -88,7 +89,6 @@ %prep %autosetup -n %{pkg_name}-%{version} -cp -p %{SOURCE1} %{pkg_name}.cabal %build %ghc_lib_build ++++++ async-2.2.5.tar.gz -> async-2.2.6.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Async/Internal.hs new/async-2.2.6/Control/Concurrent/Async/Internal.hs --- old/async-2.2.5/Control/Concurrent/Async/Internal.hs 2001-09-09 03:46:40.000000000 +0200 +++ new/async-2.2.6/Control/Concurrent/Async/Internal.hs 2001-09-09 03:46:40.000000000 +0200 @@ -55,7 +55,21 @@ import GHC.Exts import GHC.IO hiding (finally, onException) -import GHC.Conc +import GHC.Conc (ThreadId(..)) + +#if defined(__MHS__) +import Data.Traversable +#endif + +#ifdef DEBUG_AUTO_LABEL +import qualified GHC.Stack +#endif + +#ifdef DEBUG_AUTO_LABEL +#define CALLSTACK GHC.Stack.HasCallStack => +#else +#define CALLSTACK +#endif -- ----------------------------------------------------------------------------- -- STM Async API @@ -95,40 +109,53 @@ -- (see module-level documentation for details). -- -- __Use 'withAsync' style functions wherever you can instead!__ -async :: IO a -> IO (Async a) +async :: + CALLSTACK + IO a -> IO (Async a) async = inline asyncUsing rawForkIO -- | Like 'async' but using 'forkOS' internally. -asyncBound :: IO a -> IO (Async a) +asyncBound :: + CALLSTACK + IO a -> IO (Async a) asyncBound = asyncUsing forkOS -- | Like 'async' but using 'forkOn' internally. -asyncOn :: Int -> IO a -> IO (Async a) +asyncOn :: + CALLSTACK + Int -> IO a -> IO (Async a) asyncOn = asyncUsing . rawForkOn -- | Like 'async' but using 'forkIOWithUnmask' internally. The child -- thread is passed a function that can be used to unmask asynchronous -- exceptions. -asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) +asyncWithUnmask :: + CALLSTACK + ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask) -- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. -asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) +asyncOnWithUnmask :: + CALLSTACK + Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a) asyncOnWithUnmask cpu actionWith = asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask) -asyncUsing :: (IO () -> IO ThreadId) - -> IO a -> IO (Async a) -asyncUsing doFork = \action -> do +asyncUsing :: + CALLSTACK + (IO () -> IO ThreadId) -> IO a -> IO (Async a) +asyncUsing doFork action = do var <- newEmptyTMVarIO + let action_plus = debugLabelMe >> action -- t <- forkFinally action (\r -> atomically $ putTMVar var r) -- slightly faster: t <- mask $ \restore -> - doFork $ try (restore action) >>= atomically . putTMVar var + doFork $ try (restore action_plus) >>= atomically . putTMVar var return (Async t (readTMVar var)) + -- | Spawn an asynchronous action in a separate thread, and pass its -- @Async@ handle to the supplied function. When the function returns -- or throws an exception, 'uninterruptibleCancel' is called on the @Async@. @@ -144,41 +171,51 @@ -- to `withAsync` returns, so nesting many `withAsync` calls requires -- linear memory. -- -withAsync :: IO a -> (Async a -> IO b) -> IO b +withAsync :: + CALLSTACK + IO a -> (Async a -> IO b) -> IO b withAsync = inline withAsyncUsing rawForkIO -- | Like 'withAsync' but uses 'forkOS' internally. -withAsyncBound :: IO a -> (Async a -> IO b) -> IO b +withAsyncBound :: + CALLSTACK + IO a -> (Async a -> IO b) -> IO b withAsyncBound = withAsyncUsing forkOS -- | Like 'withAsync' but uses 'forkOn' internally. -withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b +withAsyncOn :: + CALLSTACK + Int -> IO a -> (Async a -> IO b) -> IO b withAsyncOn = withAsyncUsing . rawForkOn -- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions. -withAsyncWithUnmask - :: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b +withAsyncWithUnmask :: + CALLSTACK + ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b withAsyncWithUnmask actionWith = withAsyncUsing rawForkIO (actionWith unsafeUnmask) -- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The -- child thread is passed a function that can be used to unmask -- asynchronous exceptions -withAsyncOnWithUnmask - :: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b +withAsyncOnWithUnmask :: + CALLSTACK + Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b withAsyncOnWithUnmask cpu actionWith = withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask) -withAsyncUsing :: (IO () -> IO ThreadId) - -> IO a -> (Async a -> IO b) -> IO b +withAsyncUsing :: + CALLSTACK + (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b -- The bracket version works, but is slow. We can do better by -- hand-coding it: -withAsyncUsing doFork = \action inner -> do +withAsyncUsing doFork action inner = do var <- newEmptyTMVarIO mask $ \restore -> do - t <- doFork $ try (restore action) >>= atomically . putTMVar var + let action_plus = debugLabelMe >> action + t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var let a = Async t (readTMVar var) r <- restore (inner a) `catchAll` \e -> do uninterruptibleCancel a @@ -265,6 +302,8 @@ -- | Cancel multiple asynchronous actions by throwing the @AsyncCancelled@ -- exception to each of them in turn, then waiting for all the `Async` threads -- to complete. +-- +-- @since 2.2.5 cancelMany :: [Async a] -> IO () cancelMany as = do mapM_ (\(Async t _) -> throwTo t AsyncCancelled) as @@ -554,11 +593,15 @@ -- > withAsync right $ \b -> -- > waitEither a b -- -race :: IO a -> IO b -> IO (Either a b) +race :: + CALLSTACK + IO a -> IO b -> IO (Either a b) -- | Like 'race', but the result is ignored. -- -race_ :: IO a -> IO b -> IO () +race_ :: + CALLSTACK + IO a -> IO b -> IO () -- | Run two @IO@ actions concurrently, and return both results. If @@ -570,19 +613,27 @@ -- > withAsync left $ \a -> -- > withAsync right $ \b -> -- > waitBoth a b -concurrently :: IO a -> IO b -> IO (a,b) +-- +-- To run more than two actions concurrently, see 'mapConcurrently'. +concurrently :: + CALLSTACK + IO a -> IO b -> IO (a,b) -- | Run two @IO@ actions concurrently. If both of them end with @Right@, -- return both results. If one of then ends with @Left@, interrupt the other --- action and return the @Left@. +-- action and return the @Left@. -- -concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b)) +concurrentlyE :: + CALLSTACK + IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b)) -- | 'concurrently', but ignore the result values -- -- @since 2.1.1 -concurrently_ :: IO a -> IO b -> IO () +concurrently_ :: + CALLSTACK + IO a -> IO b -> IO () #define USE_ASYNC_VERSIONS 0 @@ -643,9 +694,11 @@ Left ex -> throwIO ex Right r -> collect (r:xs) m -concurrently' :: IO a -> IO b - -> (IO (Either SomeException (Either a b)) -> IO r) - -> IO r +concurrently' :: + CALLSTACK + IO a -> IO b + -> (IO (Either SomeException (Either a b)) -> IO r) + -> IO r concurrently' left right collect = do done <- newEmptyMVar mask $ \restore -> do @@ -689,7 +742,7 @@ -- ensure the children are really dead replicateM_ count' (tryAgain $ takeMVar done) - r <- collect (tryAgain $ takeDone) `onException` stop + r <- collect (tryAgain takeDone) `onException` stop stop return r @@ -717,11 +770,18 @@ -- -- > pages <- mapConcurrently getURL ["url1", "url2", "url3"] -- --- Take into account that @async@ will try to immediately spawn a thread --- for each element of the @Traversable@, so running this on large --- inputs without care may lead to resource exhaustion (of memory, --- file descriptors, or other limited resources). -mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b) +-- If you just have a list of actions, run them concurrently with +-- +-- > results <- mapConcurrently id [act1, act2, act3] +-- +-- NOTE: @mapConcurrently@ will immediately spawn a thread for each +-- element of the @Traversable@, so running this on large inputs can +-- lead to resource exhaustion (of memory, file descriptors, or other +-- limited resources). To avoid unbounded resource usage, see +-- "Control.Concurrent.Stream". +mapConcurrently :: + CALLSTACK + Traversable t => (a -> IO b) -> t a -> IO (t b) mapConcurrently f = runConcurrently . traverse (Concurrently . f) -- | `forConcurrently` is `mapConcurrently` with its arguments flipped @@ -729,29 +789,39 @@ -- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url -- -- @since 2.1.0 -forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b) +forConcurrently :: + CALLSTACK + Traversable t => t a -> (a -> IO b) -> IO (t b) forConcurrently = flip mapConcurrently -- | `mapConcurrently_` is `mapConcurrently` with the return value discarded; -- a concurrent equivalent of 'mapM_'. -mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO () +mapConcurrently_ :: + CALLSTACK + F.Foldable f => (a -> IO b) -> f a -> IO () mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f) -- | `forConcurrently_` is `forConcurrently` with the return value discarded; -- a concurrent equivalent of 'forM_'. -forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO () +forConcurrently_ :: + CALLSTACK + F.Foldable f => f a -> (a -> IO b) -> IO () forConcurrently_ = flip mapConcurrently_ -- | Perform the action in the given number of threads. -- -- @since 2.1.1 -replicateConcurrently :: Int -> IO a -> IO [a] -replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently +replicateConcurrently :: + CALLSTACK + Int -> IO a -> IO [a] +replicateConcurrently cnt = runConcurrently . replicateM cnt . Concurrently -- | Same as 'replicateConcurrently', but ignore the results. -- -- @since 2.1.1 -replicateConcurrently_ :: Int -> IO a -> IO () +replicateConcurrently_ :: + CALLSTACK + Int -> IO a -> IO () replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void -- ----------------------------------------------------------------------------- @@ -831,7 +901,7 @@ ConcurrentlyE $ fmap (\(f, a) -> f a) <$> concurrentlyE fs eas #if MIN_VERSION_base(4,9,0) --- | Either the combination of the successful results, or the first failure. +-- | Either the combination of the successful results, or the first failure. instance Semigroup a => Semigroup (ConcurrentlyE e a) where (<>) = liftA2 (<>) @@ -845,14 +915,16 @@ -- | Fork a thread that runs the supplied action, and if it raises an -- exception, re-runs the action. The thread terminates only when the -- action runs to completion without raising an exception. -forkRepeat :: IO a -> IO ThreadId +forkRepeat :: + CALLSTACK + IO a -> IO ThreadId forkRepeat action = mask $ \restore -> let go = do r <- tryAll (restore action) case r of Left _ -> go _ -> return () - in forkIO go + in forkIO (debugLabelMe >> go) catchAll :: IO a -> (SomeException -> IO a) -> IO a catchAll = catch @@ -864,11 +936,29 @@ -- handler: saves a bit of time when we will be installing our own -- exception handler. {-# INLINE rawForkIO #-} -rawForkIO :: IO () -> IO ThreadId -rawForkIO (IO action) = IO $ \ s -> - case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #) +rawForkIO :: + CALLSTACK + IO () -> IO ThreadId +rawForkIO action = IO $ \ s -> + case fork# action_plus s of (# s1, tid #) -> (# s1, ThreadId tid #) + where + (IO action_plus) = debugLabelMe >> action {-# INLINE rawForkOn #-} -rawForkOn :: Int -> IO () -> IO ThreadId -rawForkOn (I# cpu) (IO action) = IO $ \ s -> - case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #) +rawForkOn :: + CALLSTACK + Int -> IO () -> IO ThreadId +rawForkOn (I# cpu) action = IO $ \ s -> + case forkOn# cpu action_plus s of (# s1, tid #) -> (# s1, ThreadId tid #) + where + (IO action_plus) = debugLabelMe >> action + +debugLabelMe :: + CALLSTACK + IO () +debugLabelMe = +#ifdef DEBUG_AUTO_LABEL + myThreadId >>= flip labelThread (GHC.Stack.prettyCallStack callStack) +#else + pure () +#endif diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Async/Warden.hs new/async-2.2.6/Control/Concurrent/Async/Warden.hs --- old/async-2.2.5/Control/Concurrent/Async/Warden.hs 1970-01-01 01:00:00.000000000 +0100 +++ new/async-2.2.6/Control/Concurrent/Async/Warden.hs 2001-09-09 03:46:40.000000000 +0200 @@ -0,0 +1,95 @@ +{- + Copyright (c) Meta Platforms, Inc. and affiliates. + All rights reserved. + + This source code is licensed under the BSD-style license found in the + LICENSE file in the root directory of this source tree. +-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | A more flexible way to create 'Async's and have them automatically +-- cancelled when the 'Warden' is shut down. +module Control.Concurrent.Async.Warden + ( Warden + , withWarden + , create + , shutdown + , spawn + , spawn_ + , spawnMask + , WardenException(..) + ) where + +import Control.Concurrent (forkIO) +import Control.Concurrent.Async (Async) +import qualified Control.Concurrent.Async as Async +import Control.Concurrent.MVar +import Control.Exception +import Data.HashSet (HashSet) +import qualified Data.HashSet as HashSet +import System.IO (fixIO) + +#if defined(__MHS__) +import Prelude hiding(mapM_) +import Control.Monad hiding(mapM_) +import Data.Foldable(mapM_) +#else +import Control.Monad +#endif + +-- | A 'Warden' is an owner of 'Async's which cancels them on 'shutdown'. +-- +-- 'Nothing' in the MVar means the 'Warden' has been shut down. +newtype Warden = Warden (MVar (Maybe (HashSet (Async ())))) + +-- | Run the action with a new 'Warden', and call 'shutdown' when the action +-- exits. +withWarden :: (Warden -> IO a) -> IO a +withWarden = bracket create shutdown + +-- | Create a new 'Warden'. +create :: IO Warden +create = Warden <$> newMVar (Just mempty) + +-- | Shutdown a 'Warden', calling 'cancel' on all owned threads. Subsequent +-- calls to 'spawn' and 'shutdown' will be no-ops. +-- +-- Note that any exceptions thrown by the threads will be ignored. If you want +-- exceptions to be propagated, either call `wait` explicitly on the 'Async', +-- or use 'link'. +shutdown :: Warden -> IO () +shutdown (Warden v) = do + r <- swapMVar v Nothing + mapM_ (Async.mapConcurrently_ Async.cancel) r + +forget :: Warden -> Async a -> IO () +forget (Warden v) async = modifyMVar_ v $ \x -> case x of + Just xs -> return $! Just $! HashSet.delete (void async) xs + Nothing -> return Nothing + +-- | Spawn a thread with masked exceptions and pass an unmask function to the +-- action. +spawnMask :: Warden -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) +spawnMask (Warden v) action = modifyMVar v $ \r -> case r of + Just asyncs -> do + -- Create a new thread which removes itself from the 'HashSet' when it + -- exits. + this <- fixIO $ \this -> mask_ $ Async.asyncWithUnmask $ \unmask -> + action unmask `finally` forget (Warden v) this + return (Just $ HashSet.insert (void this) asyncs, this) + Nothing -> throwIO $ WardenException "Warden has been shut down" + +newtype WardenException = WardenException String + deriving (Show) + +instance Exception WardenException + +-- | Spawn a new thread owned by the 'Warden'. +spawn :: Warden -> IO a -> IO (Async a) +spawn warden action = spawnMask warden $ \unmask -> unmask action + +-- | Spawn a new thread owned by the 'Warden'. +spawn_ :: Warden -> IO () -> IO () +spawn_ w = void . spawn w diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Async.hs new/async-2.2.6/Control/Concurrent/Async.hs --- old/async-2.2.5/Control/Concurrent/Async.hs 2001-09-09 03:46:40.000000000 +0200 +++ new/async-2.2.6/Control/Concurrent/Async.hs 2001-09-09 03:46:40.000000000 +0200 @@ -19,7 +19,7 @@ -- == High-level API -- -- @async@'s high-level API spawns /lexically scoped/ threads, --- ensuring the following key poperties that make it safer to use +-- ensuring the following key properties that make it safer to use -- than using plain 'forkIO': -- -- 1. No exception is swallowed (waiting for results propagates exceptions). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/Control/Concurrent/Stream.hs new/async-2.2.6/Control/Concurrent/Stream.hs --- old/async-2.2.5/Control/Concurrent/Stream.hs 1970-01-01 01:00:00.000000000 +0100 +++ new/async-2.2.6/Control/Concurrent/Stream.hs 2001-09-09 03:46:40.000000000 +0200 @@ -0,0 +1,138 @@ +{- + Copyright (c) Meta Platforms, Inc. and affiliates. + All rights reserved. + + This source code is licensed under the BSD-style license found in the + LICENSE file in the root directory of this source tree. +-} + +-- | Processing streams with a fixed number of worker threads +module Control.Concurrent.Stream + ( stream + , streamBound + , streamWithInput + , streamWithOutput + , streamWithInputOutput + , mapConcurrentlyBounded + , forConcurrentlyBounded + ) where + +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Data.Maybe +import Data.IORef + +data ShouldBindThreads = BoundThreads | UnboundThreads + +-- | Maps a fixed number of workers concurrently over a stream of values +-- produced by a producer function. The producer is passed a function to +-- call for each work item. If a worker throws a synchronous exception, it +-- will be propagated to the caller. +stream + :: Int -- ^ Maximum Concurrency + -> ((a -> IO ()) -> IO ()) -- ^ Producer + -> (a -> IO ()) -- ^ Worker + -> IO () +stream maxConcurrency producer worker = + streamWithInput producer (replicate maxConcurrency ()) $ const worker + +-- | Like stream, but uses bound threads for the workers. See +-- 'Control.Concurrent.forkOS' for details on bound threads. +streamBound + :: Int -- ^ Maximum Concurrency + -> ((a -> IO ()) -> IO ()) -- ^ Producer + -> (a -> IO ()) -- ^ Worker + -> IO () +streamBound maxConcurrency producer worker = + stream_ BoundThreads producer (replicate maxConcurrency ()) $ const worker + +-- | Like stream, but each worker is passed an element of an input list. +streamWithInput + :: ((a -> IO ()) -> IO ()) -- ^ Producer + -> [b] -- ^ Worker state + -> (b -> a -> IO ()) -- ^ Worker + -> IO () +streamWithInput = stream_ UnboundThreads + +-- | Like 'stream', but collects the results of each worker +streamWithOutput + :: Int + -> ((a -> IO ()) -> IO ()) -- ^ Producer + -> (a -> IO c) -- ^ Worker + -> IO [c] +streamWithOutput maxConcurrency producer worker = + streamWithInputOutput producer (replicate maxConcurrency ()) $ + const worker + +-- | Like 'streamWithInput', but collects the results of each worker +streamWithInputOutput + :: ((a -> IO ()) -> IO ()) -- ^ Producer + -> [b] -- ^ Worker input + -> (b -> a -> IO c) -- ^ Worker + -> IO [c] +streamWithInputOutput producer workerInput worker = do + results <- newIORef [] + let prod write = producer $ \a -> do + res <- newIORef Nothing + modifyIORef results (res :) + write (a, res) + stream_ UnboundThreads prod workerInput $ \s (a,ref) -> do + worker s a >>= writeIORef ref . Just + readIORef results >>= mapM readIORef >>= return . catMaybes . reverse + +stream_ + :: ShouldBindThreads -- use bound threads? + -> ((a -> IO ()) -> IO ()) -- ^ Producer + -> [b] -- Worker input + -> (b -> a -> IO ()) -- ^ Worker + -> IO () +stream_ useBoundThreads producer workerInput worker = do + let maxConcurrency = length workerInput + q <- atomically $ newTBQueue (fromIntegral maxConcurrency) + let write x = atomically $ writeTBQueue q (Just x) + mask $ \unmask -> + concurrently_ (runWorkers unmask q) $ unmask $ do + -- run the producer + producer write + -- write end-markers for all workers + replicateM_ maxConcurrency $ + atomically $ writeTBQueue q Nothing + where + runWorkers unmask q = case useBoundThreads of + BoundThreads -> + foldr1 concurrentlyBound $ + map (runWorker unmask q) workerInput + UnboundThreads -> + mapConcurrently_ (runWorker unmask q) workerInput + + concurrentlyBound l r = + withAsyncBound l $ \a -> + withAsyncBound r $ \b -> + void $ waitBoth a b + + runWorker unmask q s = do + v <- atomically $ readTBQueue q + case v of + Nothing -> return () + Just t -> do + unmask (worker s t) + runWorker unmask q s + +-- | Concurrent map over a list of values, using a bounded number of threads. +mapConcurrentlyBounded + :: Int -- ^ Maximum concurrency + -> (a -> IO b) -- ^ Function to map over the input values + -> [a] -- ^ List of input values + -> IO [b] -- ^ List of output values +mapConcurrentlyBounded maxConcurrency f input = + streamWithOutput maxConcurrency (forM_ input) f + +-- | 'mapConcurrentlyBounded' but with its arguments reversed +forConcurrentlyBounded + :: Int -- ^ Maximum concurrency + -> [a] -- ^ List of input values + -> (a -> IO b) -- ^ Function to map over the input values + -> IO [b] -- ^ List of output values +forConcurrentlyBounded = flip . mapConcurrentlyBounded diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/async.cabal new/async-2.2.6/async.cabal --- old/async-2.2.5/async.cabal 2001-09-09 03:46:40.000000000 +0200 +++ new/async-2.2.6/async.cabal 2001-09-09 03:46:40.000000000 +0200 @@ -1,5 +1,5 @@ name: async -version: 2.2.5 +version: 2.2.6 -- don't forget to update ./changelog.md! synopsis: Run IO operations asynchronously and wait for their results @@ -35,9 +35,12 @@ homepage: https://github.com/simonmar/async bug-reports: https://github.com/simonmar/async/issues tested-with: - GHC == 9.8.1 - GHC == 9.6.3 - GHC == 9.4.7 + GHC == 9.14.1 + GHC == 9.12.2 + GHC == 9.10.3 + GHC == 9.8.4 + GHC == 9.6.7 + GHC == 9.4.8 GHC == 9.2.8 GHC == 9.0.2 GHC == 8.10.7 @@ -46,8 +49,8 @@ GHC == 8.4.4 GHC == 8.2.2 GHC == 8.0.2 - GHC == 7.10.3 - -- Drop GHC < 7.10 to be able to use the ubuntu-20.04 buildpack + -- CI does not support GHC 7 + -- GHC == 7.10.3 -- GHC == 7.8.4 -- GHC == 7.6.3 -- GHC == 7.4.2 @@ -62,6 +65,16 @@ type: git location: https://github.com/simonmar/async.git +flag debug-auto-label + description: + Strictly for debugging as it might have a non-negligible overhead. + + Enabling this flag will auto-label the threads spawned by @async@. Use it to + find where are unlabelled threads spawned in your program (be it your code or + dependency code). + default: False + manual: True + library default-language: Haskell2010 other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples @@ -69,12 +82,18 @@ other-extensions: Trustworthy exposed-modules: Control.Concurrent.Async Control.Concurrent.Async.Internal - build-depends: base >= 4.3 && < 4.20, - hashable >= 1.1.2.0 && < 1.5, - stm >= 2.2 && < 2.6 + Control.Concurrent.Async.Warden + Control.Concurrent.Stream + build-depends: base >= 4.3 && < 4.23, + hashable >= 1.1.2.0 && < 1.6, + stm >= 2.2 && < 2.6, + unordered-containers >= 0.2 && < 0.3 + if flag(debug-auto-label) + cpp-options: -DDEBUG_AUTO_LABEL test-suite test-async default-language: Haskell2010 + ghc-options: -threaded type: exitcode-stdio-1.0 hs-source-dirs: test main-is: test-async.hs diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/changelog.md new/async-2.2.6/changelog.md --- old/async-2.2.5/changelog.md 2001-09-09 03:46:40.000000000 +0200 +++ new/async-2.2.6/changelog.md 2001-09-09 03:46:40.000000000 +0200 @@ -1,3 +1,14 @@ +## Changes in 2.2.6 + + - Added Control.Concurrent.Stream for processing streams with a fixed + number of workers. Includes a bounded version of mapConcurrently: + mapConcurrentlyBounded. + - Added Control.Concurrent.Async.Warden for a way to create Asyncs that + is more flexible than 'withAsync' but retains the guarantee of cancelling + orphaned threads, unlike 'async'. + - support GHC 9.12, GHC 9.14, and MicroHs + - cabal flag debug-auto-label: label threads automatically (#167) + ## Changes in 2.2.5 - #117: Document that empty for Concurrently waits forever diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/async-2.2.5/test/test-async.hs new/async-2.2.6/test/test-async.hs --- old/async-2.2.5/test/test-async.hs 2001-09-09 03:46:40.000000000 +0200 +++ new/async-2.2.6/test/test-async.hs 2001-09-09 03:46:40.000000000 +0200 @@ -8,6 +8,8 @@ import Control.Concurrent.STM import Control.Concurrent.Async +import Control.Concurrent.Async.Warden +import Control.Concurrent.Stream import Control.Exception import Data.IORef import Data.Typeable @@ -65,6 +67,17 @@ , testCase "concurrentlyE_Monoid" concurrentlyE_Monoid , testCase "concurrentlyE_Monoid_fail" concurrentlyE_Monoid_fail #endif + , testCase "stream" $ case_stream False + , testCase "streamBound" $ case_stream True + , testCase "stream_exception" $ case_stream_exception False + , testCase "streamBound_exception" $ case_stream_exception True + , testCase "streamWithInput" case_streamInput + , testCase "streamWithInput_exception" case_streamInput_exception + , testCase "mapConcurrentlyBounded" case_mapConcurrentlyBounded + , testCase "mapConcurrentlyBounded_exception" + case_mapConcurrentlyBounded_exception + , testCase "Warden" case_Warden + , testCase "Warden_spawn_after_shutdown" case_Warden_spawn_after_shutdown ] ] @@ -459,3 +472,79 @@ r :: Either Char [Char] <- runConcurrentlyE $ foldMap ConcurrentlyE $ current assertEqual "The earliest failure" (Left 'u') r #endif + +case_stream :: Bool -> Assertion +case_stream bound = do + ref <- newIORef [] + let inp = [1..100] + let producer write = forM_ inp $ \x -> write (show x) + (if bound then streamBound else stream) 4 producer $ \s -> atomicModifyIORef ref (\l -> (s:l, ())) + res <- readIORef ref + sort res @?= sort (map show inp) + +case_stream_exception :: Bool -> Assertion +case_stream_exception bound = do + let inp = [1..100] + let producer write = forM_ inp $ \x -> write (show x) + r <- try $ (if bound then streamBound else stream) 4 producer $ \s -> + when (s == "3") $ throwIO (ErrorCall s) + r @?= Left (ErrorCall "3" :: ErrorCall) + +case_streamInput :: Assertion +case_streamInput = do + ref <- newIORef [] + let inp = [1..100]; workers = [1..4] :: [Int] + let producer write = forM_ inp $ \x -> write (show x) + streamWithInput producer workers $ \s t -> atomicModifyIORef ref (\l -> ((s,t):l, ())) + res <- readIORef ref + sort (map snd res) @?= sort (map show inp) + all ((`elem` workers) . fst) res @?= True + +case_streamInput_exception :: Assertion +case_streamInput_exception = do + let inp = [1..100]; workers = [1..4] :: [Int] + let producer write = forM_ inp $ \x -> write (show x) + r <- try $ streamWithInput producer workers $ \s t -> + when (t == "3") $ throwIO (ErrorCall t) + r @?= Left (ErrorCall "3" :: ErrorCall) + +case_mapConcurrentlyBounded :: Assertion +case_mapConcurrentlyBounded = do + let inp = [1..100] + let f x = threadDelay 1000 >> return (x * 2) + res <- mapConcurrentlyBounded 4 f inp + res @?= map (*2) inp + +case_mapConcurrentlyBounded_exception :: Assertion +case_mapConcurrentlyBounded_exception = do + let inp = [1..100] + let f x | x == 3 = throwIO $ ErrorCall "3" + | otherwise = threadDelay 1000 >> return (x * 2) + res <- try $ mapConcurrentlyBounded 4 f inp + res @?= Left (ErrorCall "3" :: ErrorCall) + +case_Warden :: Assertion +case_Warden = do + a3 <- withWarden $ \warden -> do + a1 <- spawn warden $ return 1 + a2 <- spawnMask warden $ \unmask -> unmask (return 2) + a3 <- spawn warden $ threadDelay 10000000 + spawn_ warden $ throwIO (ErrorCall "a4") -- ignored + r1 <- wait a1 + r1 @?= 1 + r2 <- wait a2 + r2 @?= 2 + return a3 + r3 <- waitCatch a3 + case r3 of + Right _ -> assertFailure "Expected AsyncCancelled" + Left e -> fromException e @?= Just AsyncCancelled + +case_Warden_spawn_after_shutdown :: Assertion +case_Warden_spawn_after_shutdown = do + warden <- create + shutdown warden + r <- try $ spawn warden $ return () + case r of + Left (WardenException{}) -> return () -- expected + Right _ -> assertFailure "Expected WardenException" \ No newline at end of file
