Chris Riccomini created SAMZA-623:
-------------------------------------

             Summary: Samza torture test Checker has a race condition
                 Key: SAMZA-623
                 URL: https://issues.apache.org/jira/browse/SAMZA-623
             Project: Samza
          Issue Type: Bug
          Components: test
    Affects Versions: 0.9.0
            Reporter: Chris Riccomini
             Fix For: 0.10.0


The SAMZA-394 integration test includes a {{Checker}} class. This class has a 
race condition in it, which causes the test to fail every so often. The bug is 
in this code:

{code}
      for(int i = 0; i < numPartitions; i++) {
          logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + 
Integer.toString(nextEpoch));
          collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
      }
      this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
{code}

If the container is killed (or fails) right before the this.store.put() method, 
then the epoch will have been advanced (i.e. there will be messages in the 
stream with nextEpoch as their epoch), but the store will still think it's the 
prior epoch. 

When the container starts back up, the container will fall back to its last 
checkpointed offset, and start reading messages. If the container gets all the 
way to messages with nextEpoch in them before window() is invoked again, then 
it will see a message with nextEpoch, but checkEpoch will still be using the 
older epoch (nextEpoch - 1). This causes the container to fail:

{code}
      if (currentEpochInMsg <= currentEpochInStore) {
        if (currentEpochInMsg < currentEpochInStore)
          logger.info("#### Ignoring received epoch = " + epoch + " less than 
what is in store " + curr);
      } else { // should have curr > epoch
        throw new IllegalArgumentException("Got epoch " + epoch + " but have 
not yet completed " + curr);
      }
{code}

The container then just fails over and over, as long as it reads the nextEpoch 
message consistently before window() is called (with task.window.ms=60000).

I think this can be fixed simply by doing the window() check one more time in 
checkEpoch, if we are about to throw an exception



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to