Hi So the statefulMapConat was what I used at first, but the implementation became a mess. I'll try to give a richer example of what I am missing in streams in this particular case.
My original approach was a bit naive. I wished to model the protocol and state transitions as a flow graph. I realized that sharing the connection/transaction specific state across several flow stages and model the protocol state machine in a streaming fashion became a very complex affair of messages as nested Eithers together with broadcasts, filtering and priority merges for semi-dynamic routing. The challenge I had was that without guarantees about how much data is available in each ByteString (my original question) I must be able to pull in more data into a buffer until I have enough data to continue processing. I couldn't know how much of the buffer was consumed until downstream stages had finished so I had to feed back the buffer and how much the buffer size (payload) was required back into the first stage that pulled in ByteStrings from Akka TCP IO. When I noticed that I was modeling if-else logic and pattern matching in a complex maze of (built in) flow stages I stopped and looked for a different approach. If I were to do it again I could have had to crammed everything into a single statefulMapConcat instead. Much like what I did with the GraphStage in the end. The result would be similar. But that is not using the stream DSL to model the protocol at all just putting a big blob of imperative code on one stage. This means that easy composition is lost. On the plus side the reactive properties are still there. Many protocols are designed in a way that implies keeping track of state for the duration of the connection or batch of messages. Same goes for file formats or other data structures. A result/validation is built up in successions based on previously received data. In these cases my conclusion is that the provided in Streams DSL is not enough/suitable, I have to create custom stages or use actors which gives me more control. Your thought on this as implementors/designers of the Streams DSL would be interesting. /Magnus Den onsdag 20 april 2016 kl. 16:43:47 UTC+2 skrev drewhk: > > Yes, this is a simple approach and works without having to do a custom > stage (exactly why statefulMapConcat was introduced). > > -Endre > > On Wed, Apr 20, 2016 at 4:03 PM, Derek Williams <[email protected] > <javascript:>> wrote: > >> If it helps, a simple custom framing stage could look like this: >> >> Flow[ByteString].statefulMapConcat[MyThing] { () => >> var buffer = ByteString.empty >> in => { >> val (things, remaining) = parseThings(buffer ++ in) >> buffer = remaining >> things >> } >> } >> >> def parseThings(bytes: ByteString): (Vector[MyThing], ByteStrings) = { >> ... } >> >> where parseThings finds as many things as it can from the available >> bytes, returning the found things and the unused bytestring. >> Other state can also be kept alongside the buffer if you need that as >> well. >> >> On Wed, Apr 20, 2016 at 2:41 PM Magnus Andersson <[email protected] >> <javascript:>> wrote: >> >>> Solved it by doing my own version of something similar to the framing >>> classes. >>> >>> Perhaps a custom stage is what I should use for problems like a protocol >>> with variable length payloads, acking on application level, messages >>> received in a specific order? >>> >>> Doing it with flows or graphs seemed to create tons of boilerplate and >>> was not really easy to follow along with. I'd really like to do some type >>> of state machine for this, mixing Akka FSM and streams. Do you know of any >>> idiomatic examples doing something like that? >>> >>> /Magnus >>> >>> Den sön 10 apr. 2016 08:51Endre Varga <[email protected] >>> <javascript:>> skrev: >>> >>>> Hi Magnus, >>>> >>>> >>>> >>>> On Sat, Apr 9, 2016 at 11:00 PM, Magnus Andersson <[email protected] >>>> <javascript:>> wrote: >>>> >>>>> Hi >>>>> >>>>> I'm implementing a TCP protocol (Lumberjack2/Beats) in Akka Streams. >>>>> It is a streaming application level protocol where each messages varies >>>>> data length. In Akka streams I get ByteStrings fed through the stream but >>>>> I >>>>> can't find any information about how these correlate with TCP packets >>>>> sent >>>>> over wire. >>>>> >>>> >>>> Because TCP is a stream protocol. Hence no sent buffers correlate with >>>> TCP frames at all, TCP is free to rechunk them as it sees fit. >>>> >>>> >>>>> Without that knowledge I have to create a buffer for partial message >>>>> and handle all theoretical cases that could occur when ByteString does >>>>> not >>>>> correlate 1:1 with TCP payloads. The resulting implementation becomes >>>>> quite >>>>> hairy. >>>>> >>>>> 1. What are the guarantees around correlation between Akka ByteStrings >>>>> and TCP packets? >>>>> >>>> >>>> Nothing. This is the nature of TCP and has nothing to do with Akka. >>>> >>>> >>>>> 2. Examples of TCP protocols implemented with Akka streams that uses >>>>> dynamic framing examples? >>>>> >>>> >>>> Check the akka.io Framing classes. >>>> >>>> -Endre >>>> >>>> >>>>> >>>>> /Magnus >>>>> >>>> -- >>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>> >>>>>>>>>> Check the FAQ: >>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>> >>>>>>>>>> Search the archives: >>>>> https://groups.google.com/group/akka-user >>>>> --- >>>>> >>>> You received this message because you are subscribed to the Google >>>>> Groups "Akka User List" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected] <javascript:>. >>>> >>>> >>>>> To post to this group, send email to [email protected] >>>>> <javascript:>. >>>>> Visit this group at https://groups.google.com/group/akka-user. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> >>> You received this message because you are subscribed to a topic in the >>>> Google Groups "Akka User List" group. >>>> To unsubscribe from this topic, visit >>>> https://groups.google.com/d/topic/akka-user/o8djg3OKV3Q/unsubscribe. >>>> To unsubscribe from this group and all its topics, send an email to >>>> [email protected] <javascript:>. >>> >>> >>>> To post to this group, send email to [email protected] >>>> <javascript:>. >>>> Visit this group at https://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: >>> http://doc.akka.io/docs/akka/current/additional/faq.html >>> >>>>>>>>>> Search the archives: >>> https://groups.google.com/group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected] <javascript:>. >>> To post to this group, send email to [email protected] >>> <javascript:>. >>> Visit this group at https://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To post to this group, send email to [email protected] >> <javascript:>. >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
