CheckpointNotificationListener extends CheckpointListener and hence "committed" method is going to be there. So I think other operator who tires to use "committed" with THREAD_LOCAL combination in future will also suffer this behavior.
I would suggest we should fix it in platform. We should think more on possible solutions at platform level. -Priyanka On Thu, Dec 29, 2016 at 3:04 PM, Francis Fernandes <[email protected]> wrote: > Hi all, > > Need your inputs on the following: > > Issue - I have an application with 2 operators. The second operator is > AbstractFileOutputOperator > <https://github.com/apache/apex-malhar/blob/master/ > library/src/main/java/com/datatorrent/lib/io/fs/ > AbstractFileOutputOperator.java> > which implements the Operator.CheckpointListener(deprecated) committed > method. When the locality of the stream connecting the two operators is > Locality.THREAD_LOCAL, the committed method is not called. > > RCA - > Inside processHeartbeatResponse > <https://github.com/apache/apex-core/blob/master/engine/ > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L763>, > we check for the thread != null. In case of THREAD_LOCAL, we have thread = > null, and so the committed method isn't called. > > Approach 1 - Change in apex-core/engine > For thread local > <https://github.com/apache/apex-core/blob/master/engine/ > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1372> > during > activate we do not set the thread > <https://github.com/apache/apex-core/blob/master/engine/ > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1462> > in the node's context > Because the thread is not set, we skip this operator in the > processHeartBeatResponse > <https://github.com/apache/apex-core/blob/master/engine/ > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L782> > and the committed is not called > if (thread == null || !thread.isAlive()) { > continue; > } > We need this if condition for invalid operators in case of other > localities. Should we keep the OperatorDeployInfo persistent for each node > so that it can be used later to identify the locality? This might be an > intrusive change. > > Approach 2 - Change in malhar/AbstractFileOutputOperator > Refactor the code from the committed > <https://github.com/apache/apex-malhar/blob/master/ > library/src/main/java/com/datatorrent/lib/io/fs/ > AbstractFileOutputOperator.java#L1260> > method to beforeCheckpoint > <https://github.com/apache/apex-malhar/blob/master/ > library/src/main/java/com/datatorrent/lib/io/fs/ > AbstractFileOutputOperator.java#L1238>. > This will require no change in the core platform, but will work only for > this particular operator. New operators should not run into this problem > because the CheckpointListener is now deprecated and the > CheckpointNotificationListener is to be used. > > > Please let me know your thoughts on which approach would be the right way > to solve this. > > Thanks, > Francis >
