`(>->)` is defined in terms of `(+>>)`, like this:

    p1 >-> p2 = (\() -> p1) +>> p2

In other words:

* Control begins from downstream, which is `p2` (because it internally uses `(+>>)`) * The upstream function is waiting on an empty value of type `()` (because `p2` does not send any useful data upstream) * Therefore, we can conveniently omit the argument because we're not throwing away any useful data

The only reason `test` was awkward was because you were staggering the inputs, which means that there is a corner case for the first value (you never `peek` the first value in the stream).

On 8/21/2015 7:05 AM, Matthew Pickering wrote:
Gabriel, that explanation was very useful to me as well. Thank you.

A question I have is why this is different from the >-> combinator
where neither argument is a function type and if we have (p1 >-> p2)
then execution starts from p2 before transferring to p1. The way you
had to rewrite test seems a bit awkward to me.



On Fri, Aug 21, 2015 at 3:55 PM, Gabriel Gonzalez <gabriel...@gmail.com> wrote:
`Proxy`s are fundamentally single-threaded coroutines meaning that:

(A) Only one `Proxy` is active at any given point in time
(B) They cooperatively transfer control to each other

The easiest way to explain this is in terms of the two primitive
combinators: `(>>~)` and `(+>>)` which are mutually defined in terms of each
other.  I've simplified the types to more closely match your example:

     (>>~)
         :: Server PeekerMessage String IO r
         -> (String -> Client PeekerMessage String IO r)
         -> Effect IO r

     (+>>)
         :: (PeekerMessage -> Server PeekerMessage String IO r)
         -> Client PeekerMessage String IO r
         -> Effect IO r

The intuition for control flow that you should have is:

* When you use `p1 >>~ p2` the flow of control begins from `p1` and
transfers to `p2` the first time that `p1` `respond`s
* When you use `p1 +>> p2` the flow of control begins from `p2` and
transfers to `p1` the first time that `p2` `request`s

Notice that for each operator, the `Proxy` the flow of control begins from
the `Proxy` that takes no function argument:

     (>>~)
         :: Server PeekerMessage String IO r  -- This one initiates the flow
of control for `(>>~)`
         -> (String -> Client PeekerMessage String IO r)
         -> Effect IO r

     (+>>)
         :: (PeekerMessage -> Server PeekerMessage String IO r)
         -> Client PeekerMessage String IO r  -- This one initiates the flow
of control for `(+>>)`
         -> Effect IO r

Also notice that the `Proxy` that does not initiate the flow of control
takes a function argument:

* In the case of `p1 >>~ p2`, `p2` is a function whose argument is the first
value that `p1` emits via `respond`.
* In the case of `p1 +>> p2`, `p1` is a function whose argument is the first
value that `p2` emits via `request`

This is consistent with the types, too:

     (>>~)
         :: Server PeekerMessage String IO r
         -> (String -> Client PeekerMessage String IO r)  -- I wait for the
first `String` from `p1`
         -> Effect IO r

     (+>>)
         :: (PeekerMessage -> Server PeekerMessage String IO r)  -- I wait
for the first `PeekerMessage` from `p2`
         -> Client PeekerMessage String IO r
         -> Effect IO r

So the trick to deciding which operators to use is to just decide which
`Proxy` in the pipeline you want to initiate control.

For example, suppose that you have a pipeline with 3 `Proxy`s: `p1`, `p2`,
and `p3`.  If you want to start from the most upstream `Proxy`, you would
use this:

     (p1 >>~ p2) >>~ p3 = p1 >>~ p2 >>~ p3

If you wanted to start from the most downstream `Proxy`, you would use this:

     p1 +>> (p2 +>> p3) = p1 +>> p2 +>> p3

If you wanted to start from the middle `Proxy`, you would use this:

     (p1 +>> p2) >>~ p3 = p1 +>> (p2 >>~ p3) = p1 +>> p2 >>~ p3

However, you're not limited to starting from a `Proxy` inside the pipeline.
If you want to start from something upstream of `p1`, you would write this:

     p1 >~> p2 >~> p3

... and if you wanted to start from something downstream of `p3`, you would
write this:

     p1 >+> p2 >+> p3

So going back to your original question, it seems from your code that you
wanted control to begin from the `source` `Proxy`, which means that you want
your pipeline to look like this:

     runEffect (source >>~ peeker >>~ test)

That in turn tells you which `Proxy`s need to be functions and which ones
are not.  Remember that a most one `Proxy` in the pipeline will initiate the
flow of control (`source` in this case), so that will be the one `Proxy`
that is not a function.  Every other `Proxy` will be a function and you just
have to follow the type of the `(>>~)` operator to deduce what the function
argument types must be.

We know that `source` will have type:

     Producer String m () = Proxy X () () String m ()

If you plug that type as the first argument to `(>>~) you get this
specialized type:

     (>>~)
         :: Monad m
         => Proxy X () () String m ()
         -> (String -> Proxy () String c' c m ())
         -> Proxy c' c m ()

So that means that when you transform `peeker` into a function it takes an
initial `String` as an argument (the first `String` that `source` emits).
Notice how convenient that is because you can use that `String` to seed
`peeker` instead of having to explicitly call `await` to retrieve the first
value.  In fact, this is exactly the type that your internal `responder`
loop has, so you can simplify `peeker` to just:

     peeker :: (Monad m) => b -> Proxy () b PeekerMessage b m ()
     peeker = responder
       where
       responder x = do
         m <- respond x
         case m of
           Peek    -> responder x
           Acquire -> peeker

And now if you compose `source` with `peeker` you would get this type:

     source >>~ peeker
         :: Monad m => Proxy X () PeekerMessage String m ()
     --  :: Monad m => Server PeekerMessage String m ()

Now we can feed that type as the left argument to `(>>~)` to figure out what
the type of `test` should be:

     (>>~)
         :: Monad m
         => Proxy X () PeekerMessage String m ()
         -> (String -> Proxy PeekerMessage String c' c m ())
         -> Proxy X () c' c m ()

In other words, `test` needs to also be a function that takes a `String` as
its initial argument.  That `String` is the first value emitted by `peeker`.

This is why you got weird behavior because you were throwing away that
initial value when you used `const`.  What you really wanted to write was:

     test :: (Show a') => a' -> Proxy PeekerMessage a' b b' IO ()
     test a = do
       b <- peek
       lift . putStrLn . show $ (a, b)
       a' <- acquire
       test a'

That would then give an expected output of:

     ("hello", "hello")
     ("world", "world")
     ("test", "test")
     ("message", "message")

If you wanted to stagger the values you would need to modify `test` like
this:

     test :: (Show a') => a' -> Proxy PeekerMessage a' b b' IO ()
     test a = do
         b <- acquire
         lift . putStrLn . show $ (a, b)
         loop
       where
         loop = do
             a <- peek
             b <- acquire
             lift . putStrLn . show $ (a, b)
             loop

That would then give an expected output of:

     ("hello","world")
     ("world","test")
     ("test","message")


On 8/21/2015 5:13 AM, Eric Brisco wrote:

Hello,

I am new to pipes and am having lots of confusion. For reasons I'll omit, I
want to be able to peek at the next element in the stream without consuming
it, much like you might want to peek at the next input character in C rather
than doing getchar().

This is my attempt at doing that (see http://lpaste.net/139351).

A couple problems. First, the output does not match what I expect, and I
don't have any clue why. I also notice that if I use >+> to glue my program
together rather than >~> the output changes. Don't know why that is either.

Second, it seems the only way I can combine these proxies is using the
combinators such as >+> and >~> but oddly they all take function arguments
and the only thing I can see to do with that is to use const. So, my
thinking is I am not using the combinators correctly or the correct
combinators.

Thanks for your time.
--
You received this message because you are subscribed to the Google Groups
"Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.


--
You received this message because you are subscribed to the Google Groups
"Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Haskell 
Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.

Reply via email to