Shanthoosh Venkataraman updated SAMZA-1655:
    Comment: was deleted

(was: *Fix:*
 A. Route all the watcher events from the zookeeper server to the 
ScheduleAfterDebounce worker queue and execute it through the worker thread. 
After this change, all the events before an session expiration will be buffered 
in the ScheduleAfterDebounce worker queue. Upon session expiration(in 
IZkStateListener.handleNewSession), stale buffered events in the queue will be 
deleted. When a processor has it's session expired from one zookeeper server 
and connects to other zookeeper server in the ensemble, we recreate a new 
ephemeral processor node for the processor(because the previous ephemeral node 
of the processor is gone). This situation is synonymous to a new processor 
joining the processors group. So clearing the stale buffered zookeeper events 
from the previous session in IZkStateListener.handleNewSession is mandatory and 
will not cause corruption issues.
 B. Remove generationId based check before zkWatcherEvent handling and the 
associated relevant code.)

> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -----------------------------------------------------------------------------------------
>                 Key: SAMZA-1655
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1655
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase to guard against the 
> following scenario:
> 1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the reconnect 
> to a different zkServer in the ensemble.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.

This message was sent by Atlassian JIRA

Reply via email to