Andrew, I just thought that I should mention that I've deployed my acknowledged stream implementation to maven central. It uses promises, and both Roland/Victor hate that. :). But, it works with stream operations that modify cardinality, such as group, groupedWithin, filter, mapConcat, etc.
https://github.com/timcharper/acked-stream Sent from my iPhone > On Aug 25, 2015, at 10:31, Andrew Rollins <[email protected]> wrote: > > Dom, > > Thanks so much for your reply. > > I have one question. It seems like the approach you outlined assumes you > broadcast one object per acknowledgment needed later, which means the zip > will be in lock step. However, what if the object you are sending through the > pipeline is very large and you want to break it into N smaller parts of work? > For example, let's say you are processing pointers to large files in HDFS or > AWS S3, and you'd like to chunk those files into smaller pieces through the > pipeline. Have you thought about how you might approach that? > >> On Fri, Aug 21, 2015 at 4:40 AM, Dom B <[email protected]> wrote: >> Hi Andrew, >> >> The way I have solved that problem was to broadcast the flow, Flow.map out >> the Job wrapper, go into the premade flow, then zip the two together before >> mapping the resultant back into the Job wrapper >> >> ~> broadcast ~> mapOutWrapper ~> f1 ~> zip ~> mapIntoWrapper ~> >> broadcast ~> zip >> >> This only works though, when your f1 task remains ordered. I suppose I could >> abstract it into the Flowz implementation, but because of that ordered >> requirement, I decided in the one case I needed it, just to put it in >> directly to make the requirement clear during review. >> >> Cheers, >> >> Dom >> >>> On Thursday, 20 August 2015 05:20:28 UTC+1, Andrew Rollins wrote: >>> Dom, >>> >>> I'm interested in this as well. I like the sample you provided. I just have >>> a couple questions to better understand your solution. >>> >>> In your example, you have: >>> >>> val process = Flowz[Option, Int].map(_ * 2 toString) // Outputs >>> Flow[Option[Int], Option[String]] >>> >>> I'm assuming that in your beanstalk case, you use a type other than Option >>> which lets you thread the job id all the way to the end, so maybe you end >>> up doing some sort of Flowz[Job, Int]. >>> >>> This approach works great if you use it everywhere. I have a question >>> though. What if you already have a Flow[Int, String], and you want to >>> somehow augment it to pass through the Job? Have you considered how you >>> might handle it, or do you just avoid that altogether? >>> >>> >>> >>>> On Monday, August 17, 2015 at 4:24:59 AM UTC-4, Dom B wrote: >>>> Hi Roland, >>>> >>>> In the end I solved it used various type classes from Scalaz and created a >>>> "Flowz" class to remove the boiler plat from using it with "Flow" DSL. >>>> Then included the beanstalk deleter as the sink at the end of all possible >>>> routes in my graph. >>>> >>>> See the Flowz implementation here: >>>> https://gist.github.com/DomBlack/568752041e4ec0cecc18 >>>> >>>> Cheers, >>>> >>>> Dom >>>> >>>>> On Friday, 14 August 2015 11:13:04 UTC+1, Akka Team wrote: >>>>> Hi Dom, >>>>> >>>>> this keeps coming up in different forms and the use-case is a valid one. >>>>> So far I’d say that it is possible to write F1 and F2 generically such >>>>> that they carry some auxiliary information of type T around and that may >>>>> or may not be used; when applied in the beanstalk context this would be a >>>>> job ID, during tests it could be some debug info that helps identify what >>>>> went wrong (in case something ever does). The problem with trying to lift >>>>> this into the framework is that we have stages/junctions that are >>>>> non-obvious to deal with (essentially everything that is not 1:1 >>>>> element-wise). >>>>> >>>>> As an example you can take a look at the HTTP client which provides such >>>>> generic flows, threading some T through from request to response. >>>>> >>>>> Would this be adequate in your situation? If not, do you have a brilliant >>>>> idea of what would be? >>>>> >>>>> Regards, >>>>> >>>>> Roland >>>>> >>>>>> On Thu, May 28, 2015 at 5:19 PM, Dom B <[email protected]> wrote: >>>>>> Hi, >>>>>> >>>>>> I'm looking at the new Streams API and trying to work out if it's a best >>>>>> fit for my use case. The problem I'm having is with acknowledgement. >>>>>> >>>>>> I have an external system placing requests onto a queue (Beanstalk in >>>>>> this case), once the external system receives the OK from beanstalk, >>>>>> like an event during replay, I'm committed to preforming that job. >>>>>> >>>>>> My plan was to build a ActorPublisher based Beanstalk client based on >>>>>> the existing Akka IO code, then reserve the jobs on the beanstalk >>>>>> server. This starts a time to live on the beanstalk server, if I do not >>>>>> delete or release the job within that window, the server will assume I'm >>>>>> dead and release the job back onto the queue >>>>>> (https://github.com/kr/beanstalkd/wiki/faq#how-does-ttr-work). This is >>>>>> perfect in a HA system, because if my Akka node dies, beanstalk will >>>>>> simply reset the job for another node to process. >>>>>> >>>>>> My problem is once my ActorPublisher gives the job to the "onNext" >>>>>> method, it has no way to track if and when the job is completed, thus >>>>>> the only time it can delete from beanstalk is straight away and then I >>>>>> just lost the whole advantage of the job TTL. >>>>>> >>>>>> I'm leaning towards a Sink such that the stream has to be >>>>>> `BeanstalkPublisher ~> F1 ~> F2 ~> BeanstalkDeleter`, but this means the >>>>>> data type flowing though F1 and F2 has to include the job id. This makes >>>>>> Flow composition a little more annoying as I had planned to reuse F1 and >>>>>> F2 but without a Beanstalk source. >>>>>> >>>>>> Am I missing something obvious, or is this the only way to achieve an >>>>>> ACK back to the upstream publisher once F2 has completed processing? >>>>>> (Note in this example F2 is the persistence point) >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Dom >>>>>> -- >>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>> >>>>>>>>>> Check the FAQ: >>>>>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >>>>>> --- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Akka User List" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>>> an email to [email protected]. >>>>>> To post to this group, send email to [email protected]. >>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>>> >>>>> >>>>> -- >>>>> Akka Team >>>>> Typesafe - Reactive apps on the JVM >>>>> Blog: letitcrash.com >>>>> Twitter: @akkateam >> >> >>> On Thursday, 20 August 2015 05:20:28 UTC+1, Andrew Rollins wrote: >>> Dom, >>> >>> I'm interested in this as well. I like the sample you provided. I just have >>> a couple questions to better understand your solution. >>> >>> In your example, you have: >>> >>> val process = Flowz[Option, Int].map(_ * 2 toString) // Outputs >>> Flow[Option[Int], Option[String]] >>> >>> I'm assuming that in your beanstalk case, you use a type other than Option >>> which lets you thread the job id all the way to the end, so maybe you end >>> up doing some sort of Flowz[Job, Int]. >>> >>> This approach works great if you use it everywhere. I have a question >>> though. What if you already have a Flow[Int, String], and you want to >>> somehow augment it to pass through the Job? Have you considered how you >>> might handle it, or do you just avoid that altogether? >>> >>> >>> >>>> On Monday, August 17, 2015 at 4:24:59 AM UTC-4, Dom B wrote: >>>> Hi Roland, >>>> >>>> In the end I solved it used various type classes from Scalaz and created a >>>> "Flowz" class to remove the boiler plat from using it with "Flow" DSL. >>>> Then included the beanstalk deleter as the sink at the end of all possible >>>> routes in my graph. >>>> >>>> See the Flowz implementation here: >>>> https://gist.github.com/DomBlack/568752041e4ec0cecc18 >>>> >>>> Cheers, >>>> >>>> Dom >>>> >>>>> On Friday, 14 August 2015 11:13:04 UTC+1, Akka Team wrote: >>>>> Hi Dom, >>>>> >>>>> this keeps coming up in different forms and the use-case is a valid one. >>>>> So far I’d say that it is possible to write F1 and F2 generically such >>>>> that they carry some auxiliary information of type T around and that may >>>>> or may not be used; when applied in the beanstalk context this would be a >>>>> job ID, during tests it could be some debug info that helps identify what >>>>> went wrong (in case something ever does). The problem with trying to lift >>>>> this into the framework is that we have stages/junctions that are >>>>> non-obvious to deal with (essentially everything that is not 1:1 >>>>> element-wise). >>>>> >>>>> As an example you can take a look at the HTTP client which provides such >>>>> generic flows, threading some T through from request to response. >>>>> >>>>> Would this be adequate in your situation? If not, do you have a brilliant >>>>> idea of what would be? >>>>> >>>>> Regards, >>>>> >>>>> Roland >>>>> >>>>>> On Thu, May 28, 2015 at 5:19 PM, Dom B <[email protected]> wrote: >>>>>> Hi, >>>>>> >>>>>> I'm looking at the new Streams API and trying to work out if it's a best >>>>>> fit for my use case. The problem I'm having is with acknowledgement. >>>>>> >>>>>> I have an external system placing requests onto a queue (Beanstalk in >>>>>> this case), once the external system receives the OK from beanstalk, >>>>>> like an event during replay, I'm committed to preforming that job. >>>>>> >>>>>> My plan was to build a ActorPublisher based Beanstalk client based on >>>>>> the existing Akka IO code, then reserve the jobs on the beanstalk >>>>>> server. This starts a time to live on the beanstalk server, if I do not >>>>>> delete or release the job within that window, the server will assume I'm >>>>>> dead and release the job back onto the queue >>>>>> (https://github.com/kr/beanstalkd/wiki/faq#how-does-ttr-work). This is >>>>>> perfect in a HA system, because if my Akka node dies, beanstalk will >>>>>> simply reset the job for another node to process. >>>>>> >>>>>> My problem is once my ActorPublisher gives the job to the "onNext" >>>>>> method, it has no way to track if and when the job is completed, thus >>>>>> the only time it can delete from beanstalk is straight away and then I >>>>>> just lost the whole advantage of the job TTL. >>>>>> >>>>>> I'm leaning towards a Sink such that the stream has to be >>>>>> `BeanstalkPublisher ~> F1 ~> F2 ~> BeanstalkDeleter`, but this means the >>>>>> data type flowing though F1 and F2 has to include the job id. This makes >>>>>> Flow composition a little more annoying as I had planned to reuse F1 and >>>>>> F2 but without a Beanstalk source. >>>>>> >>>>>> Am I missing something obvious, or is this the only way to achieve an >>>>>> ACK back to the upstream publisher once F2 has completed processing? >>>>>> (Note in this example F2 is the persistence point) >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Dom >>>>>> -- >>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>> >>>>>>>>>> Check the FAQ: >>>>>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >>>>>> --- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Akka User List" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>>> an email to [email protected]. >>>>>> To post to this group, send email to [email protected]. >>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>>> >>>>> >>>>> -- >>>>> Akka Team >>>>> Typesafe - Reactive apps on the JVM >>>>> Blog: letitcrash.com >>>>> Twitter: @akkateam >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to a topic in the >> Google Groups "Akka User List" group. >> To unsubscribe from this topic, visit >> https://groups.google.com/d/topic/akka-user/XaSpX48hoJY/unsubscribe. >> To unsubscribe from this group and all its topics, send an email to >> [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. > > > > -- > Andrew Rollins > Chief Software Architect, Localytics > @andrew311, LinkedIn > T: (617) 418-4422 x506 > Localytics.com | Our Blog | Twitter | Facebook | LinkedIn > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to a topic in the Google > Groups "Akka User List" group. > To unsubscribe from this topic, visit > https://groups.google.com/d/topic/akka-user/XaSpX48hoJY/unsubscribe. > To unsubscribe from this group and all its topics, send an email to > [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
