For posterity, using Roland's advice, I encoded the state of the processing
into the flow itself, and it cleaned up all of my issues and got rid of any
stream-based state.
sealed trait HandlerEvent
case class Open(ctxt: Context) extends HandlerEvent
case class Handle(ctxt: Context, state: State) extends HandlerEvent
case class Close(ctxt: Context) extends HandlerEvent
private def toFlow(p: Processor[Context, State]) = {
Flow[HandlerEvent].map {
(evt: HandlerEvent) => {
evt match {
case Open(ctxt) => p.handleContext(ctxt)
case Handle(ctxt, state) => p.handleState(state); p.concludeState(
state)
case Close(ctxt) => p.concludeContext()
}
evt
}
}
}
On Tuesday, October 27, 2015 at 8:20:36 PM UTC-4, rkuhn wrote:
>
> You're welcome!
>
> Sent from my iPhone
>
> On 27 Oct 2015, at 21:12, Rich Henry <[email protected] <javascript:>>
> wrote:
>
> Sorry i mean, thanks Roland. MT.
>
> On Tuesday, October 27, 2015 at 4:12:13 PM UTC-4, Rich Henry wrote:
>>
>> This looks like exactly what i was looking for Ronald, thanks. I knew I
>> was missing a better solution.
>>
>>
>>
>> On Tuesday, October 27, 2015 at 3:14:57 PM UTC-4, rkuhn wrote:
>>>
>>> Agreed; on a more constructive note, you can solve the problem by
>>> explicitly managing your state using .scan(zero)(f) and by
>>> concatenating a single marker element to the end of the input before
>>> scanning (using for example .concat(Source.single((Context.end,
>>> State.empty)))) that then triggers the last concludeState
>>> invocation—the marker values can be null as a last resort (i.e. if you
>>> cannot make up suitably invalid values otherwise).
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 27 okt 2015 kl. 20:03 skrev Viktor Klang <[email protected]>:
>>>
>>> This is not going to answer your question but:
>>>
>>> def toFlow(handler: Handler) = {
>>> var ctxt: Option[Context] = None <--- *DO NOT DO THIS*
>>>
>>> Flow[(Context, State)].map {
>>> (in: (Context, State)) => {
>>> ctxt match {
>>> case Some(last) => if (!last.equals(in._1)) {
>>> last.concludeContext()
>>> ctxt = Some(in._1)
>>> }
>>> case None => ctxt = Some(in._1)
>>> }
>>>
>>> handler.handleState(in._2)
>>> handler.concludeState(in._2)
>>> in
>>> }
>>> }
>>> }
>>>
>>> The problem is that now your Flow cannot be materialized more than once
>>> (until the previous has run to completion) simply because it is using
>>> global mutable state.
>>>
>>> On Tue, Oct 27, 2015 at 7:49 PM, Rich Henry <[email protected]> wrote:
>>>
>>>> *TL;DR -- Is there any way to provide an individual Flow component
>>>> with logic to execute on Graph completion?*
>>>>
>>>> I'm trying to wrap an existing API that is used like this:
>>>>
>>>> obj.handleContext(c1)
>>>> obj.handleState(s1)
>>>> obj.handleState(s2)
>>>> obj.handleState(sN)
>>>> obj.concludeContext(c1)
>>>>
>>>> .. in a flow, very much like this:
>>>>
>>>> def toFlow(handler: Handler) = {
>>>> var ctxt: Option[Context] = None
>>>>
>>>> Flow[(Context, State)].map {
>>>> (in: (Context, State)) => {
>>>> ctxt match {
>>>> case Some(last) => if (!last.equals(in._1)) {
>>>> last.concludeContext()
>>>> ctxt = Some(in._1)
>>>> }
>>>> case None => ctxt = Some(in._1)
>>>> }
>>>>
>>>> handler.handleState(in._2)
>>>> handler.concludeState(in._2)
>>>> in
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> I then combine the various handlers created using this method together
>>>> using ~> and use .grouped() and Sink.head to collect the resulting values.
>>>>
>>>>
>>>> *I need to call handler.concludeContext() on the last context passed in
>>>> to the flow on completion for each sub-flow.*
>>>>
>>>>
>>>> My backup plan it to wrap the flow components in another class that
>>>> holds the context and manually calling the conclude method on them en
>>>> masse
>>>> once the flow completes, but would much prefer to use some built in
>>>> mechanism instead if there were one.
>>>>
>>>>
>>>> I'm also open to other strategies that may help me avoid being
>>>> stateful, but to date haven't thought of any.
>>>>
>>>> (I'm using akka-actor 2.4.0 and akka-streams-experiental 1.0)
>>>>
>>>> Thanks,
>>>> Rich
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ:
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>> √
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>>
>>>
>>> *Dr. Roland Kuhn*
>>> *Akka Tech Lead*
>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>> twitter: @rolandkuhn
>>> <http://twitter.com/#!/rolandkuhn>
>>>
>>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected] <javascript:>.
> To post to this group, send email to [email protected]
> <javascript:>.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.