Hi Damian,

thanks for fixing this so quickly, I re-ran the test and it works fine.

The next test I tried was to read from two service instances implementing
the same string count topology. First, the client is started sending two
read requests, one per instance, every second. Next, I start the first
instance and let it complete the store init before the next instance is
started.

Below is the initial part of the trace when going from 0 to 1 instance. The
trace log has the following columns: request id, instance, response code
and value.

3,localhost:2030,503,
3,localhost:2031,503,
4,localhost:2030,503,
4,localhost:2031,503,
5,localhost:2030,200,2
5,localhost:2031,503,
6,localhost:2030,200,2
6,localhost:2031,503,

Before the instance is started, both return 503, the status returned by the
client when it cannot connect to an instance. When the first instance has
started it returns the expected value 2 for request pair 5, 6 and so on.
The trace below is from when the second instance starts.

18,localhost:2030,200,2
18,localhost:2031,503,
19,localhost:2030,404,
19,localhost:2031,503,
20,localhost:2030,404,
20,localhost:2031,503,
21,localhost:2030,404,
21,localhost:2031,200,2
22,localhost:2030,404,
22,localhost:2031,200,2

The new instance takes over responsibility for the partition containing the
key "hello". During this period the new instance returns 503 as expected
until the store is ready. The issue is that the first instance that stored
the value starts returning 404 from request pair 19. A client doing
requests for this key would then have the following sequence:

18 -> 2
19 -> Not found
20 -> Not found
21 -> 2

>From the client perspective, I think this violates the guarantee of always
reading the latest value.

Am I making the wrong assumptions or is there some way to detect that the
local store is not responsible for the key anymore?

Best,
Mikael

On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <damian....@gmail.com> wrote:

> Hi Mikael,
>
> A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123>
> (the
> issue you found with receiving null values) has now been committed to
> trunk. I've tried it with your github repo and it appears to be working.
> You will have to make a small change to your code as we now throw
> InvalidStateStoreException when the Stores are unavailable (previously we
> returned null).
>
> We added a test here
> <
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java#L431
> >
> to
> make sure we only get a value once the store has been (re-)initialized.
> Please give it a go and thanks for your help in finding this issue.
>
> Thanks,
> Damian
>
> On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <hoegqv...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > > > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > hello -> 10
> > > >
> > > >
> > > The case where you get KeyNotFound looks like a bug to me. This
> shouldn't
> > > happen. I can see why it might happen and we will create a JIRA and fix
> > it
> > > right away.
> > >
> >
> > Great, thanks for looking into this. I'll try again once the PR is
> merged.
> >
> >
> > >
> > > I'm not sure how you end up with (hello -> 10). It could indicate that
> > the
> > > offsets for the topic you are consuming from weren't committed so the
> > data
> > > gets processed again on the restart.
> > >
> >
> > Yes, it didn't commit the offsets since streams.close() was not called on
> > ctrl-c. Fixed by adding a shutdown hook.
> >
> > Thanks,
> > Mikael
> >
> >
> > > Thanks,
> > > Damian
> > >
> >
>

Reply via email to