[ 
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445966#comment-15445966
 ] 

ASF GitHub Bot commented on FLINK-4496:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2434#discussion_r76612209
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
    @@ -57,4 +67,49 @@ public void shutdownService() throws Exception {
                }
                timerService.shutdownNow();
        }
    +
    +   /**
    +    * Internal task that is invoked by the timer service and triggers the 
target.
    +    */
    +   private static final class TriggerTask implements Runnable {
    +
    +           private final Object lock;
    +           private final Triggerable target;
    +           private final long timestamp;
    +           private final StreamTask<?, ?> task;
    +
    +           TriggerTask(StreamTask<?, ?> task, final Object lock, 
Triggerable target, long timestamp) {
    +                   this.task = task;
    +                   this.lock = lock;
    +                   this.target = target;
    +                   this.timestamp = timestamp;
    +           }
    +
    +           @Override
    +           public void run() {
    +                   synchronized (lock) {
    +                           try {
    +                                   target.trigger(timestamp);
    +                           } catch (Throwable t) {
    +
    +                                   if (task != null) {
    +                                           // registers the exception with 
the calling task
    +                                           // so that it can be logged and 
(later) detected
    +                                           TimerException asyncException = 
new TimerException(t);
    +                                           
task.registerAsyncException("Caught exception while processing timer.", 
asyncException);
    +                                   } else {
    +                                           // this is for when we are in 
testing mode and we
    +                                           // want to have real processing 
time.
    +                                           System.err.println("!!! Caught 
exception while processing timer. !!!");
    --- End diff --
    
    Why are you not using the regular logging for this?


> Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-4496
>                 URL: https://issues.apache.org/jira/browse/FLINK-4496
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to