[ 
https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1655:
-------------------------------------------
    Description: 
*Problem:*

StreamProcessor skips all events from zookeeper server after zkClient session 
expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 

  was:
*Problem:* StreamProcessor skips all events from zookeeper server after 
zkClient session expiration.

*Reason:*

All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
instantiated with an inmemory generationId(zkUtils.generationId).

All the zkWatchers cache this generationId passed through the constructor.

This inmemory generationId is incremented whenever the zkClient session 
expiration occurs.

Any event from zookeeper server is dropped if the inmemory generationId is not 
equal to the generationId cached in a zkWatcher.

After the zkClient session expiration in a StreamProcessor, this will result in 
zombie StreamProcessors and will stall message processing.

In worst case, if this happens to the leader StreamProcessor then the whole 
processors group will be stalled.

*Purpose of generationId:*

GenerationId was added initially to skip older stale events that are buffered 
in worker queue after a session expiration phase and guard against the 
following scenario:
 1. Session expiration happens to the leader stream processor of the group.
 2. Leader stream processor joins the group as a follower after the session 
reconnect.
 3. Leader might have zkEvents buffered in it's worker queue which were 
delivered to it when it was a leader. If the leader acts upon these events it 
will cause global state corruption.

 


> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> -----------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1655
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1655
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> *Problem:*
> StreamProcessor skips all events from zookeeper server after zkClient session 
> expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are 
> instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session 
> expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is 
> not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result 
> in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole 
> processors group will be stalled.
> *Purpose of generationId:*
> GenerationId was added initially to skip older stale events that are buffered 
> in worker queue after a session expiration phase and guard against the 
> following scenario:
>  1. Session expiration happens to the leader stream processor of the group.
>  2. Leader stream processor joins the group as a follower after the session 
> reconnect.
>  3. Leader might have zkEvents buffered in it's worker queue which were 
> delivered to it when it was a leader. If the leader acts upon these events it 
> will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to