[ 
https://issues.apache.org/jira/browse/SAMZA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459946#comment-16459946
 ] 

Shanthoosh Venkataraman commented on SAMZA-1695:
------------------------------------------------

*Fix:* Remove events in the debounce queue on session reconnect.

Follow up fix to this patch will be to use ACL or epoch from zk (czxid of 
ephemeral node) to guard access to shared state in zk (JobModel/Barrier). 
Existing implementation guards this with local state checking just for 
jobModelVersionNumber which is insufficient. If there're pending zk operations 
initiated before session expiry of leader, they will retried till 
operationRetryTimeOut. If reconnect happens within operationRetryTimeOut, then 
previous generation leader would proceed to execute those operations.

Relevant code from zkClient:

{code:java}
 public <T> T retryUntilConnected(Callable<T> callable) {
 while (true) {
 ....
 try {
 return callable.call();
 } catch (ConnectionLossException e) {
 // we give the event thread some time to update the status to 'Disconnected'
 Thread.yield();
 waitForRetry();
 } catch (SessionExpiredException e) {
 // we give the event thread some time to update the status to 'Expired'
 Thread.yield();
 waitForRetry();
 }
{code}

Note: Vanilla Zkclient doesn't have any retry logic in 
it(https://issues.apache.org/jira/browse/ZOOKEEPER-22). Above failure retry 
comes from IOTec.ZkClient library.

> Clear events in debounce queue on session expiration
> ----------------------------------------------------
>
>                 Key: SAMZA-1695
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1695
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Priority: Major
>
> *Scenario:*
> Let's assume there're three processors in the group [P1, P2, P3] and P1 is 
> the leader.
> 1. Leader processor(P1) loses connectivity with a zookeeper server in the 
> ensemble and it's ephemeral processor node is deleted(due to session 
> expiration).
> 2. Immediate successor(P2) to the leader(P1) finds out that the leader is 
> dead and declares itself as leader. Processor P2 Schedules onProcessorChange 
> to publish JobModel.
> 3. ZkClient connection retry logic helps the Leader to reconnect to another 
> zkServer in the ensemble and it joins as follower.
> 4. Processor P1 acts on the stale buffered event in the debounce queue(which 
> it received when it's a leader) and acts as leader. At this point, there're 
> two processors acting as leader(P1 & P2). If P1 proceeds to execute leader 
> actions before P2, P2 will fail(and in worst case can cause state corruption).
> *Sample exception logs:* 
> https://gist.github.com/shanthoosh/55410fe4ebf3cfb65281b35f16397cad
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to