CheckpointNotificationListener extends CheckpointListener and hence
"committed" method is going to be there. So I think other operator who
tires to use "committed" with THREAD_LOCAL combination in future will also
suffer this behavior.

I would suggest we should fix it in platform. We should think more on
possible solutions at platform level.

-Priyanka

On Thu, Dec 29, 2016 at 3:04 PM, Francis Fernandes <[email protected]>
wrote:

> Hi all,
>
> Need your inputs on the following:
>
> Issue -  I have  an application with 2 operators. The second operator is
> AbstractFileOutputOperator
> <https://github.com/apache/apex-malhar/blob/master/
> library/src/main/java/com/datatorrent/lib/io/fs/
> AbstractFileOutputOperator.java>
> which implements the Operator.CheckpointListener(deprecated) committed
> method. When the locality of the stream connecting the two operators is
> Locality.THREAD_LOCAL, the committed method is not called.
>
> RCA -
> Inside processHeartbeatResponse
> <https://github.com/apache/apex-core/blob/master/engine/
> src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L763>,
> we check for the thread != null. In case of THREAD_LOCAL, we have thread =
> null, and so the committed method isn't called.
>
> Approach 1 - Change in apex-core/engine
> For thread local
> <https://github.com/apache/apex-core/blob/master/engine/
> src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1372>
> during
> activate  we do not set the thread
> <https://github.com/apache/apex-core/blob/master/engine/
> src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1462>
> in the node's context
> Because the thread is not set, we skip this operator in the
> processHeartBeatResponse
> <https://github.com/apache/apex-core/blob/master/engine/
> src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L782>
> and the committed is not called
> if (thread == null || !thread.isAlive()) {
>           continue;
>         }
> We need this if condition for invalid operators in case of other
> localities. Should we keep the OperatorDeployInfo persistent for each node
> so that it can be used later to identify the locality? This might be an
> intrusive change.
>
> Approach 2 - Change in malhar/AbstractFileOutputOperator
> Refactor the code from the committed
> <https://github.com/apache/apex-malhar/blob/master/
> library/src/main/java/com/datatorrent/lib/io/fs/
> AbstractFileOutputOperator.java#L1260>
> method to beforeCheckpoint
> <https://github.com/apache/apex-malhar/blob/master/
> library/src/main/java/com/datatorrent/lib/io/fs/
> AbstractFileOutputOperator.java#L1238>.
> This will require no change in the core platform, but will work only for
> this particular operator. New operators should not run into this problem
> because the CheckpointListener is now deprecated and the
> CheckpointNotificationListener is to be used.
>
>
> Please let me know your thoughts on which approach would be the right way
> to solve this.
>
> Thanks,
> Francis
>

Reply via email to