Jiangjie,

Thanks for the explanation, now I understands the scenario. It is one of
the CEP in stream processing, in which I think the local state should be
used for some sort of pattern matching. More concretely, let's say in this
case we have a local state storing what have been observed. Then the
sequence would be:

T0:                         local state {}
T1:    message 0,  local state {0}
T2:    message 1,  local state {0, 1}
T3:    message 2,  local state {1}, matching 0 and 2, output some result
and remove 0/2 from local state.
T4:    message 3,  local state {0}, matching 1 and 3, output some result
and remove 1/3 from local state.

Let's say user calls commit on T2, it will commit offset at message 2 as
well as the local state {0, 1}; then upon failure recovery, it can recover
the state as along with the committed offsets to continue.

More generally, the current design of the processor will let users to
specify their subscribed topics before starting the process, and users will
not change topic subscription on the fly, users will not be committing on
arbitrary offsets. The rationale behind this is to abstract the producer /
consumer details from the processor developers as much as possible, i.e. if
user do not want, they should not be exposed with message offsets /
partition ids / topic names etc. For most cases, the subscribed topics
should be able to specify before starting the processing job, so we let
users to specify them once and then focus on the computational logic in
implementing the process function.

Guozhang


On Tue, Aug 11, 2015 at 10:26 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Guozhang,
>
> By interleaved groups of message, I meant something like this: Say we have
> message 0,1,2,3, message 0 and 2 together completes a business logic,
> message 1 and 3 together completes a business logic. In that case, after
> user processed message 2, they cannot commit offsets because if they crash
> before processing message 3, message 1 will not be reconsumed. That means
> it is possible that user are not able to find a point where the current
> state is safe to be committed.
>
> This is one example in the use case space table. It is still not clear to
> me which use cases in the use case space table KIP-28 wants to cover. Are
> we only covering the case for static topic stream with semi-auto commit?
> i.e. user cannot change topic subscription on the fly and they can only
> commit the current offset.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Aug 10, 2015 at 6:57 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello folks,
> >
> > I have updated the KIP page with some detailed API / architecture /
> > packaging proposals, along with the long promised first patch in PR:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client
> >
> > https://github.com/apache/kafka/pull/130
> >
> >
> > Any feedbacks / comments are more than welcomed.
> >
> > Guozhang
> >
> >
> > On Mon, Aug 10, 2015 at 6:55 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Jun,
> > >
> > > 1. I have removed the streamTime in punctuate() since it is not only
> > > triggered by clock time, detailed explanation can be found here:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamTime
> > >
> > > 2. Yes, if users do not schedule a task, then punctuate will never
> fire.
> > >
> > > 3. Yes, I agree. The reason it was implemented in this way is that the
> > > state store registration call is triggered by the users. However I
> think
> > it
> > > is doable to change that API so that it will be more natural to have
> sth.
> > > like:
> > >
> > > context.createStore(store-name, store-type).
> > >
> > > Guozhang
> > >
> > > On Tue, Aug 4, 2015 at 9:17 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > >> A few questions/comments.
> > >>
> > >> 1. What's streamTime passed to punctuate()? Is that just the current
> > time?
> > >> 2. Is punctuate() only called if schedule() is called?
> > >> 3. The way the KeyValueStore is created seems a bit weird. Since this
> is
> > >> part of the internal state managed by KafkaProcessorContext, it seems
> > >> there
> > >> should be an api to create the KeyValueStore from
> KafkaProcessorContext,
> > >> instead of passing context to the constructor of KeyValueStore?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > I just posted KIP-28: Add a transform client for data processing
> > >> > <
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
> > >> > >
> > >> > .
> > >> >
> > >> > The wiki page does not yet have the full design / implementation
> > >> details,
> > >> > and this email is to kick-off the conversation on whether we should
> > add
> > >> > this new client with the described motivations, and if yes what
> > >> features /
> > >> > functionalities should be included.
> > >> >
> > >> > Looking forward to your feedback!
> > >> >
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to