Hi Eno,

thanks for the response and sorry for not getting back earlier. I think it
makes sense and the example is great! To make it possible to experiment
with the guarantees/semantics, I've created a tool available at:
https://github.com/mkhq/kafka-qs-verify. Basically it can be used to trace
read requests over time, e.g. one read per second using a client that
queries multiple instances.

By doing this I observed a couple of interesting things with the current
implementation. Note that these observations can also be a result of me
using kafka streams in the wrong way, e.g. assumptions, setup or tool
implementation. The topology the tool uses is counting strings and stores
them in an output table.

The experiments are setup with an input topic containing "hello" 5 times.
StoreUnavailable in the traces below means that the call to
streams.store(table) returned null and KeyNotFound happens when
store.get(key) is null. The first run of the tool creates a trace where the
store is unavailable followed by key not found and finally the correct
value of 5.

> Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> hello -> 5

The second run, i.e. a restart of the java app with the same app-id and
table name as the previous run, creates the following trace:

> Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> hello -> 10

I assumed that two things would be different:

a) KeyNotFound should not happen since the store was already initialized
and had associated hello with 5. This could violate the "never read an
older value"-rule if an instance restarts.

b) hello should still have the value 5.

Does a) have to do with the start-up sequence of the stream instance? E.g.
replay of the changelog for a table can take longer?

For b) it looks like the stream instance re-processes the input topic data.
Maybe it's related to setup? In this case, the consumer config
"AUTO_OFFSET_RESET_CONFIG" was not defined in the settings. Defining
"latest" results in only KeyNotFound.

I've also tried out a couple of other scenarios when querying multiple
instances with "random" restarts, but maybe its better if we start with the
simplest cases.

Thanks,
Mikael

> One more thing, there is an example of an end-to-end REST service that 
> demonstrates one possible
> way to query at 
> https://github.com/confluentinc/examples/tree/master/kafka-streams/src/main/java/io/confluent/examples/streams/queryablestate
> <https://github.com/confluentinc/examples/tree/master/kafka-streams/src/main/java/io/confluent/examples/streams/queryablestate>.
> The instructions on how to run are in QueryableStateExample.java.
>
> Thanks
> Eno
>
>
> > On 26 Aug 2016, at 18:07, Eno Thereska <eno.there...@gmail.com> wrote:
> >
> > Hi Mikael,
> >
> > Very good question. You are correct about the desired semantics.
> >
> > The semantic of case (a) depends on the local store as you mention. For 
> > case (b), the
> final check is always performed again on get(), and if the store has 
> disappeared between the
> lookup and get, the user will get an exception and will have to retry. The 
> state store in
> A does become invalid when the state is re-assigned. There isn't any other 
> way to detect the
> change, since we wanted to hide the system details (e.g., rebalance) from the 
> user.
> >
> > Does this make sense?
> >
> > Thanks
> > Eno
> >
> >> On 26 Aug 2016, at 16:26, Mikael Högqvist <hoegqv...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> I've tried to understand the implementation and APIs from KIP-67 and would
> >> like to know the possible semantics for read requests from a client
> >> perspective. As a developer of a queryable state client, the access
> >> semantics I would like to have (I think...) is one where subsequent reads
> >> always return the value from the last read or a newer value (if the state
> >> store is available). This should be independent of the current system
> >> configuration, e.g. re-balancing, failures etc. .
> >>
> >> A client-side get(k) can be implemented by starting with a lookup for the
> >> instances that store k followed by a retrieve of the value associated with
> >> k from the instances returned by the lookup. In the worst case we can
> >> always do scatter+gather over all instances.
> >>
> >> We can start by considering a get(k) under two failure-free cases: a)
> >> single instance and b) a system going from one instance to two instances. 
> >> In
> >> case a) the lookup will always return the same instance and the following
> >> get will read from a local store. The semantics in this case depends on the
> >> local store.
> >>
> >> For case b) the lookup returns instance A, but in between the lookup and
> >> the get, a new instance B is introduced to which k is transferred? Does the
> >> state store on A become invalid when the state is re-assigned? Is there
> >> another way for the client to detect the change?
> >>
> >> Best Regards,
> >> Mikael
> >
>
>
>

Reply via email to