Hi all, Need your inputs on the following:
Issue - I have an application with 2 operators. The second operator is AbstractFileOutputOperator <https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java> which implements the Operator.CheckpointListener(deprecated) committed method. When the locality of the stream connecting the two operators is Locality.THREAD_LOCAL, the committed method is not called. RCA - Inside processHeartbeatResponse <https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L763>, we check for the thread != null. In case of THREAD_LOCAL, we have thread = null, and so the committed method isn't called. Approach 1 - Change in apex-core/engine For thread local <https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1372>during activate we do not set the thread <https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1462> in the node's context Because the thread is not set, we skip this operator in the processHeartBeatResponse <https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L782> and the committed is not called if (thread == null || !thread.isAlive()) { continue; } We need this if condition for invalid operators in case of other localities. Should we keep the OperatorDeployInfo persistent for each node so that it can be used later to identify the locality? This might be an intrusive change. Approach 2 - Change in malhar/AbstractFileOutputOperator Refactor the code from the committed <https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java#L1260> method to beforeCheckpoint <https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java#L1238>. This will require no change in the core platform, but will work only for this particular operator. New operators should not run into this problem because the CheckpointListener is now deprecated and the CheckpointNotificationListener is to be used. Please let me know your thoughts on which approach would be the right way to solve this. Thanks, Francis
