Jiangjie, Thanks for the explanation, now I understands the scenario. It is one of the CEP in stream processing, in which I think the local state should be used for some sort of pattern matching. More concretely, let's say in this case we have a local state storing what have been observed. Then the sequence would be:
T0: local state {} T1: message 0, local state {0} T2: message 1, local state {0, 1} T3: message 2, local state {1}, matching 0 and 2, output some result and remove 0/2 from local state. T4: message 3, local state {0}, matching 1 and 3, output some result and remove 1/3 from local state. Let's say user calls commit on T2, it will commit offset at message 2 as well as the local state {0, 1}; then upon failure recovery, it can recover the state as along with the committed offsets to continue. More generally, the current design of the processor will let users to specify their subscribed topics before starting the process, and users will not change topic subscription on the fly, users will not be committing on arbitrary offsets. The rationale behind this is to abstract the producer / consumer details from the processor developers as much as possible, i.e. if user do not want, they should not be exposed with message offsets / partition ids / topic names etc. For most cases, the subscribed topics should be able to specify before starting the processing job, so we let users to specify them once and then focus on the computational logic in implementing the process function. Guozhang On Tue, Aug 11, 2015 at 10:26 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Guozhang, > > By interleaved groups of message, I meant something like this: Say we have > message 0,1,2,3, message 0 and 2 together completes a business logic, > message 1 and 3 together completes a business logic. In that case, after > user processed message 2, they cannot commit offsets because if they crash > before processing message 3, message 1 will not be reconsumed. That means > it is possible that user are not able to find a point where the current > state is safe to be committed. > > This is one example in the use case space table. It is still not clear to > me which use cases in the use case space table KIP-28 wants to cover. Are > we only covering the case for static topic stream with semi-auto commit? > i.e. user cannot change topic subscription on the fly and they can only > commit the current offset. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Aug 10, 2015 at 6:57 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello folks, > > > > I have updated the KIP page with some detailed API / architecture / > > packaging proposals, along with the long promised first patch in PR: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client > > > > https://github.com/apache/kafka/pull/130 > > > > > > Any feedbacks / comments are more than welcomed. > > > > Guozhang > > > > > > On Mon, Aug 10, 2015 at 6:55 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi Jun, > > > > > > 1. I have removed the streamTime in punctuate() since it is not only > > > triggered by clock time, detailed explanation can be found here: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime > > > > > > 2. Yes, if users do not schedule a task, then punctuate will never > fire. > > > > > > 3. Yes, I agree. The reason it was implemented in this way is that the > > > state store registration call is triggered by the users. However I > think > > it > > > is doable to change that API so that it will be more natural to have > sth. > > > like: > > > > > > context.createStore(store-name, store-type). > > > > > > Guozhang > > > > > > On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao <j...@confluent.io> wrote: > > > > > >> A few questions/comments. > > >> > > >> 1. What's streamTime passed to punctuate()? Is that just the current > > time? > > >> 2. Is punctuate() only called if schedule() is called? > > >> 3. The way the KeyValueStore is created seems a bit weird. Since this > is > > >> part of the internal state managed by KafkaProcessorContext, it seems > > >> there > > >> should be an api to create the KeyValueStore from > KafkaProcessorContext, > > >> instead of passing context to the constructor of KeyValueStore? > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > >> > Hi all, > > >> > > > >> > I just posted KIP-28: Add a transform client for data processing > > >> > < > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing > > >> > > > > >> > . > > >> > > > >> > The wiki page does not yet have the full design / implementation > > >> details, > > >> > and this email is to kick-off the conversation on whether we should > > add > > >> > this new client with the described motivations, and if yes what > > >> features / > > >> > functionalities should be included. > > >> > > > >> > Looking forward to your feedback! > > >> > > > >> > -- Guozhang > > >> > > > >> > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang