Chris Riccomini created SAMZA-623:
-------------------------------------
Summary: Samza torture test Checker has a race condition
Key: SAMZA-623
URL: https://issues.apache.org/jira/browse/SAMZA-623
Project: Samza
Issue Type: Bug
Components: test
Affects Versions: 0.9.0
Reporter: Chris Riccomini
Fix For: 0.10.0
The SAMZA-394 integration test includes a {{Checker}} class. This class has a
race condition in it, which causes the test to fail every so often. The bug is
in this code:
{code}
for(int i = 0; i < numPartitions; i++) {
logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " +
Integer.toString(nextEpoch));
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka",
"epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
}
this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
{code}
If the container is killed (or fails) right before the this.store.put() method,
then the epoch will have been advanced (i.e. there will be messages in the
stream with nextEpoch as their epoch), but the store will still think it's the
prior epoch.
When the container starts back up, the container will fall back to its last
checkpointed offset, and start reading messages. If the container gets all the
way to messages with nextEpoch in them before window() is invoked again, then
it will see a message with nextEpoch, but checkEpoch will still be using the
older epoch (nextEpoch - 1). This causes the container to fail:
{code}
if (currentEpochInMsg <= currentEpochInStore) {
if (currentEpochInMsg < currentEpochInStore)
logger.info("#### Ignoring received epoch = " + epoch + " less than
what is in store " + curr);
} else { // should have curr > epoch
throw new IllegalArgumentException("Got epoch " + epoch + " but have
not yet completed " + curr);
}
{code}
The container then just fails over and over, as long as it reads the nextEpoch
message consistently before window() is called (with task.window.ms=60000).
I think this can be fixed simply by doing the window() check one more time in
checkEpoch, if we are about to throw an exception
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)