Hi Navina, *>> after it has exhausted all messages from the source, it will generate a special SENTINEL envelope that will not have any key or message and only contain a special offset. Is that a valid understanding?*
The table in the doc already specifies how the sentinel envelope is constructed, and how individual fields are populated. I've re-worded some parts of the document to make it explicit and address your feedback. Hopefully, it makes it clearer. *>> I haven't reviewed the RB yet and I would really appreciate if you can explain the behavior in the design doc.* I've updated the design doc with a section that has a couple of examples on restart scenarios. Thanks, Jagadish On Thu, Sep 1, 2016 at 10:35 AM, Navina Ramesh <nram...@linkedin.com.invalid > wrote: > It means that the message is not delivered to the "*StreamTask > implementation*" ie. user code. Yes, we are creating a contract between > SystemConsumer and Samza - " the SystemConsumer implementation will > generate an IncomingMessageEnvelope by invoking buildEndOfStream()" > >> Ok. Perhaps I didn't frame my question right. I understand that the > sentinel message will not be delivered to the StreamTask. It is not clear > why the "last message" in the source is not delivered to the task. I think > you meant to say that the contract with the SystemConsumer is - after it > has exhausted all messages from the source, it will generate a special > SENTINEL envelope that will not have any key or message and only contain a > special offset. Is that a valid understanding? > > "Detecting" end of stream from the source should be simply parsing the > offset from the message that the consumer returns during poll() and > checking if it is end-of-stream. I'm happy to add this to the document. > >> That sounds good. I think it will be useful to clearly describe the > contract expected from a SystemConsumer that needs to provide a bounded > data stream. > > Nope, It does not. However, the container would not terminate (if you > have a source that has not reached end of stream yet) > >> Awesome! > > No, we do not. The dance in the AsyncRunLoop state's machine / flow > control in https://reviews.apache.org/r/51346/ guarantees that > end-of-stream SSPs are not polled any more. > >> I haven't reviewed the RB yet and I would really appreciate if you can > explain the behavior in the design doc. I will get to your RB soon > (hopefully). Having a comprehensive design doc will also help you update > the documentation later on. > > Thanks for solving this problem so graciously. It is a big step for Samza! > > Cheers! > Navina > > On Thu, Sep 1, 2016 at 5:04 AM, Renato Marroquín Mogrovejo < > renatoj.marroq...@gmail.com> wrote: > > > Thanks Jagadish! This is great! > > Can you share some thoughts/opinions on how this feature relates on using > > punctuations (at some point) in Samza? I mean do you think that using > > punctuated streams could be seen as a generalization of this problem? And > > if so, could this feature be used later on as a building block for > > implementing punctuations into Samza? > > > > > > Best, > > > > Renato M. > > > > 2016-09-01 1:37 GMT+02:00 Jagadish Venkatraman <jagadish1...@gmail.com>: > > > > > Thanks for reviewing and the comments. Please find my replies inline. > > > > > > > > > > > > > > > > > > > > > > > > > > > *"The offset in the partition that the message was received from. > Ifthis > > is > > > the last message in the SSP, this field is set to END_OF_STREAM.Such a > > > message is not delivered to the actual StreamTask implementation."Does > > this > > > mean the last message is not delivered to the Task? Does the > > sourceprovide > > > that info? If that is the case, then it kind of creates contractbetween > > any > > > bounded system consumer and samza. Or did you mean to say thatwe assume > > > end-of-stream has been reached when there is no message returnedon > poll? > > * > > > > > > > > > >> It means that the message is not delivered to the "*StreamTask > > > implementation*" ie. user code. Yes, we are creating a contract between > > > SystemConsumer and Samza - " the SystemConsumer implementation will > > > generate an IncomingMessageEnvelope by invoking buildEndOfStream()" > > > > > > > > > > > > *" Please do clarify. I think what is missing in this document ishow to > > > "detect" an end-of-stream from the source."* > > > > > > >>"Detecting" end of stream from the source should be simply parsing > the > > > offset from the message that the consumer returns during poll() and > > > checking if it is end-of-stream. I'm happy to add this to the document. > > > > > > 2. Does this design preclude the possibility of consuming bounded and > > > unbounded stream partitions in the task ? > > > > > > >> Nope, It does not. However, the container would not terminate (if > you > > > have a source that has not reached end of stream yet) > > > > > > 3. During checkpoint, let's say some of the partitions have reached > EOF. > > Do > > > we write a special offset in the checkpoint message that indicates that > > it > > > has reached end of stream and don't need to poll anymore? > > > > > > >> No, we do not. The dance in the AsyncRunLoop state's machine / flow > > > control in https://reviews.apache.org/r/51346/ guarantees that > > > end-of-stream SSPs are not polled any more. > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 31, 2016 at 2:01 PM, Navina Ramesh > > > <nram...@linkedin.com.invalid > > > > wrote: > > > > > > > Hi Jagadish, > > > > Thanks for sharing the design with the community. I have a couple of > > > > questions that were not very clear from the design document. > > > > > > > > 1. Under mechanism for indicating the end-of-stream to Samza, you > > mention > > > > "The offset in the partition that the message was received from. If > > > > this is the last message in the SSP, this field is set to > > END_OF_STREAM. > > > > Such a message is not delivered to the actual StreamTask > > implementation." > > > > Does this mean the last message is not delivered to the Task? How do > > you > > > > identify that it is indeed the last message in the SSP? Does the > source > > > > provide that info? If that is the case, then it kind of creates > > contract > > > > between any bounded system consumer and samza. Or did you mean to say > > > that > > > > we assume end-of-stream has been reached when there is no message > > > returned > > > > on poll? Please do clarify. I think what is missing in this document > > is > > > > how to "detect" an end-of-stream from the source. > > > > > > > > 2. Does this design preclude the possibility of consuming bounded and > > > > unbounded stream partitions in the task ? > > > > > > > > 3. During checkpoint, let's say some of the partitions have reached > > EOF. > > > Do > > > > we write a special offset in the checkpoint message that indicates > that > > > it > > > > has reached end of stream and don't need to poll anymore? > > > > > > > > Thanks! > > > > Navina > > > > > > > > On Tue, Aug 30, 2016 at 4:50 PM, Julian Hyde <jh...@apache.org> > wrote: > > > > > > > > > > > > > > > On Aug 30, 2016, at 4:44 PM, Xinyu Liu <xinyuliu...@gmail.com> > > > wrote: > > > > > > > > > > > > It's very exciting that Samza is adding support of bounded input > > > > streams. > > > > > > > > > > +1! > > > > > > > > > > > > > > > > > > > > > > -- > > > > Navina R. > > > > > > > > > > > > > > > > -- > > > Jagadish V, > > > Graduate Student, > > > Department of Computer Science, > > > Stanford University > > > > > > > > > -- > Navina R. > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University