I would suggest to include fix in the platform. Operators should work correctly regardless of the stream locality used for connection.
-Tushar On Thu, Dec 29, 2016, 15:42 Priyanka Gugale <[email protected]> wrote: > CheckpointNotificationListener extends CheckpointListener and hence > "committed" method is going to be there. So I think other operator who > tires to use "committed" with THREAD_LOCAL combination in future will also > suffer this behavior. > > I would suggest we should fix it in platform. We should think more on > possible solutions at platform level. > > -Priyanka > > On Thu, Dec 29, 2016 at 3:04 PM, Francis Fernandes < > [email protected]> > wrote: > > > Hi all, > > > > Need your inputs on the following: > > > > Issue - I have an application with 2 operators. The second operator is > > AbstractFileOutputOperator > > <https://github.com/apache/apex-malhar/blob/master/ > > library/src/main/java/com/datatorrent/lib/io/fs/ > > AbstractFileOutputOperator.java> > > which implements the Operator.CheckpointListener(deprecated) committed > > method. When the locality of the stream connecting the two operators is > > Locality.THREAD_LOCAL, the committed method is not called. > > > > RCA - > > Inside processHeartbeatResponse > > <https://github.com/apache/apex-core/blob/master/engine/ > > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L763>, > > we check for the thread != null. In case of THREAD_LOCAL, we have thread > = > > null, and so the committed method isn't called. > > > > Approach 1 - Change in apex-core/engine > > For thread local > > <https://github.com/apache/apex-core/blob/master/engine/ > > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1372> > > during > > activate we do not set the thread > > <https://github.com/apache/apex-core/blob/master/engine/ > > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1462> > > in the node's context > > Because the thread is not set, we skip this operator in the > > processHeartBeatResponse > > <https://github.com/apache/apex-core/blob/master/engine/ > > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L782> > > and the committed is not called > > if (thread == null || !thread.isAlive()) { > > continue; > > } > > We need this if condition for invalid operators in case of other > > localities. Should we keep the OperatorDeployInfo persistent for each > node > > so that it can be used later to identify the locality? This might be an > > intrusive change. > > > > Approach 2 - Change in malhar/AbstractFileOutputOperator > > Refactor the code from the committed > > <https://github.com/apache/apex-malhar/blob/master/ > > library/src/main/java/com/datatorrent/lib/io/fs/ > > AbstractFileOutputOperator.java#L1260> > > method to beforeCheckpoint > > <https://github.com/apache/apex-malhar/blob/master/ > > library/src/main/java/com/datatorrent/lib/io/fs/ > > AbstractFileOutputOperator.java#L1238>. > > This will require no change in the core platform, but will work only for > > this particular operator. New operators should not run into this problem > > because the CheckpointListener is now deprecated and the > > CheckpointNotificationListener is to be used. > > > > > > Please let me know your thoughts on which approach would be the right way > > to solve this. > > > > Thanks, > > Francis > > >
