[ 
https://issues.apache.org/jira/browse/SAMZA-623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-623:
----------------------------------
    Attachment: SAMZA-623-0.patch

Attaching a really hacky fix. This fix shouldn't be committed, but I did verify 
that this patch fixes the problem. We should come up with a cleaner fix to 
commit.

> Samza torture test Checker has a race condition
> -----------------------------------------------
>
>                 Key: SAMZA-623
>                 URL: https://issues.apache.org/jira/browse/SAMZA-623
>             Project: Samza
>          Issue Type: Bug
>          Components: test
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>             Fix For: 0.10.0
>
>         Attachments: SAMZA-623-0.patch
>
>
> The SAMZA-394 integration test includes a {{Checker}} class. This class has a 
> race condition in it, which causes the test to fail every so often. The bug 
> is in this code:
> {code}
>       for(int i = 0; i < numPartitions; i++) {
>           logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " 
> + Integer.toString(nextEpoch));
>           collector.send(new OutgoingMessageEnvelope(new 
> SystemStream("kafka", "epoch"), Integer.toString(i), 
> Integer.toString(nextEpoch)));
>       }
>       this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
> {code}
> If the container is killed (or fails) right before the this.store.put() 
> method, then the epoch will have been advanced (i.e. there will be messages 
> in the stream with nextEpoch as their epoch), but the store will still think 
> it's the prior epoch. 
> When the container starts back up, the container will fall back to its last 
> checkpointed offset, and start reading messages. If the container gets all 
> the way to messages with nextEpoch in them before window() is invoked again, 
> then it will see a message with nextEpoch, but checkEpoch will still be using 
> the older epoch (nextEpoch - 1). This causes the container to fail:
> {code}
>       if (currentEpochInMsg <= currentEpochInStore) {
>         if (currentEpochInMsg < currentEpochInStore)
>           logger.info("#### Ignoring received epoch = " + epoch + " less than 
> what is in store " + curr);
>       } else { // should have curr > epoch
>         throw new IllegalArgumentException("Got epoch " + epoch + " but have 
> not yet completed " + curr);
>       }
> {code}
> The container then just fails over and over, as long as it reads the 
> nextEpoch message consistently before window() is called (with 
> task.window.ms=60000).
> I think this can be fixed simply by doing the window() check one more time in 
> checkEpoch, if we are about to throw an exception



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to