[
https://issues.apache.org/jira/browse/SAMZA-623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-623:
----------------------------------
Attachment: SAMZA-623-0.patch
Attaching a really hacky fix. This fix shouldn't be committed, but I did verify
that this patch fixes the problem. We should come up with a cleaner fix to
commit.
> Samza torture test Checker has a race condition
> -----------------------------------------------
>
> Key: SAMZA-623
> URL: https://issues.apache.org/jira/browse/SAMZA-623
> Project: Samza
> Issue Type: Bug
> Components: test
> Affects Versions: 0.9.0
> Reporter: Chris Riccomini
> Fix For: 0.10.0
>
> Attachments: SAMZA-623-0.patch
>
>
> The SAMZA-394 integration test includes a {{Checker}} class. This class has a
> race condition in it, which causes the test to fail every so often. The bug
> is in this code:
> {code}
> for(int i = 0; i < numPartitions; i++) {
> logger.info("Emitting next epoch - " + Integer.toString(i) + " -> "
> + Integer.toString(nextEpoch));
> collector.send(new OutgoingMessageEnvelope(new
> SystemStream("kafka", "epoch"), Integer.toString(i),
> Integer.toString(nextEpoch)));
> }
> this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
> {code}
> If the container is killed (or fails) right before the this.store.put()
> method, then the epoch will have been advanced (i.e. there will be messages
> in the stream with nextEpoch as their epoch), but the store will still think
> it's the prior epoch.
> When the container starts back up, the container will fall back to its last
> checkpointed offset, and start reading messages. If the container gets all
> the way to messages with nextEpoch in them before window() is invoked again,
> then it will see a message with nextEpoch, but checkEpoch will still be using
> the older epoch (nextEpoch - 1). This causes the container to fail:
> {code}
> if (currentEpochInMsg <= currentEpochInStore) {
> if (currentEpochInMsg < currentEpochInStore)
> logger.info("#### Ignoring received epoch = " + epoch + " less than
> what is in store " + curr);
> } else { // should have curr > epoch
> throw new IllegalArgumentException("Got epoch " + epoch + " but have
> not yet completed " + curr);
> }
> {code}
> The container then just fails over and over, as long as it reads the
> nextEpoch message consistently before window() is called (with
> task.window.ms=60000).
> I think this can be fixed simply by doing the window() check one more time in
> checkEpoch, if we are about to throw an exception
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)