Repository: flink
Updated Branches:
  refs/heads/master babee2772 -> b7f0f5f4d


[FLINK-7666] Close TimeService after closing operators.

This was revealed through the continuous file reader. Previously
the StreamTask was calling the quiesceAndAwaitPending() of the
TimerService before the close() of the operator. This meant
that with a periodic watermark emitter and a small file (e.g. one
split), the timer service would be closed before even starting to
read (as soon as the reader received the first split), and no
timers would be registered to emit watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7f0f5f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7f0f5f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7f0f5f4

Branch: refs/heads/master
Commit: b7f0f5f4d1dc49cafd50c7c72148580fe6947ef5
Parents: babee27
Author: kkloudas <[email protected]>
Authored: Thu Oct 19 10:23:39 2017 +0200
Committer: kkloudas <[email protected]>
Committed: Thu Oct 26 00:06:08 2017 +0200

----------------------------------------------------------------------
 .../runtime/tasks/ProcessingTimeService.java    | 13 +++-
 .../streaming/runtime/tasks/StreamTask.java     | 11 ++-
 .../tasks/SystemProcessingTimeService.java      | 77 ++++++++++++++------
 .../tasks/TestProcessingTimeService.java        |  7 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   | 47 ++++++++++++
 .../tasks/SystemProcessingTimeServiceTest.java  | 12 +--
 6 files changed, 132 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 11074a2..b238252 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ScheduledFuture;
  * <p>The registration of timers follows a life cycle of three phases:
  * <ol>
  *     <li>In the initial state, it accepts timer registrations and triggers 
when the time is reached.</li>
- *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ *     <li>After calling {@link #quiesce()}, further calls to
  *         {@link #registerTimer(long, ProcessingTimeCallback)} will not 
register any further timers, and will
  *         return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
  *         timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.</li>
@@ -73,14 +73,19 @@ public abstract class ProcessingTimeService {
        /**
         * This method puts the service into a state where it does not register 
new timers, but
         * returns for each call to {@link #registerTimer(long, 
ProcessingTimeCallback)} only a "mock" future.
-        * Furthermore, the method clears all not yet started timers, and 
awaits the completion
-        * of currently executing timers.
+        * Furthermore, the method clears all not yet started timers.
         *
         * <p>This method can be used to cleanly shut down the timer service. 
The using components
         * will not notice that the service is shut down (as for example via 
exceptions when registering
         * a new timer), but the service will simply not fire any timer any 
more.
         */
-       public abstract void quiesceAndAwaitPending() throws 
InterruptedException;
+       public abstract void quiesce() throws InterruptedException;
+
+       /**
+        * This method can be used after calling {@link #quiesce()}, and awaits 
the completion
+        * of currently executing timers.
+        */
+       public abstract void awaitPendingAfterQuiesce() throws 
InterruptedException;
 
        /**
         * Shuts down and clean up the timer service provider hard and 
immediately. This does not wait

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 631cdfc..68f590e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -194,6 +195,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner 
WindowAssigners}
         * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger 
Triggers}.
         * */
+       @VisibleForTesting
        public void setProcessingTimeService(ProcessingTimeService 
timeProvider) {
                if (timeProvider == null) {
                        throw new RuntimeException("The timeProvider cannot be 
set to null.");
@@ -266,9 +268,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                throw new CancelTaskException();
                        }
 
-                       // make sure all timers finish and no new timers can 
come
-                       timerService.quiesceAndAwaitPending();
-
                        LOG.debug("Finished task {}", getName());
 
                        // make sure no further checkpoint and notification 
actions happen.
@@ -280,11 +279,17 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                // this is part of the main logic, so if this 
fails, the task is considered failed
                                closeAllOperators();
 
+                               // make sure no new timers can come
+                               timerService.quiesce();
+
                                // only set the StreamTask to not running after 
all operators have been closed!
                                // See FLINK-7430
                                isRunning = false;
                        }
 
+                       // make sure all timers finish
+                       timerService.awaitPendingAfterQuiesce();
+
                        LOG.debug("Closed operators for task {}", getName());
 
                        // make sure all buffered data is flushed

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index d00c1b9..71bfdf6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -108,7 +108,7 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                // that way we save unnecessary volatile accesses for each timer
                try {
                        return timerService.schedule(
-                                       new TriggerTask(task, checkpointLock, 
target, timestamp), delay, TimeUnit.MILLISECONDS);
+                                       new TriggerTask(status, task, 
checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
                }
                catch (RejectedExecutionException e) {
                        final int status = this.status.get();
@@ -133,7 +133,7 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                // that way we save unnecessary volatile accesses for each timer
                try {
                        return timerService.scheduleAtFixedRate(
-                               new RepeatedTriggerTask(task, checkpointLock, 
callback, nextTimestamp, period),
+                               new RepeatedTriggerTask(status, task, 
checkpointLock, callback, nextTimestamp, period),
                                initialDelay,
                                period,
                                TimeUnit.MILLISECONDS);
@@ -152,18 +152,34 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                }
        }
 
+       /**
+        * @return {@code true} is the status of the service
+        * is {@link #STATUS_ALIVE}, {@code false} otherwise.
+        */
+       @VisibleForTesting
+       boolean isAlive() {
+               return status.get() == STATUS_ALIVE;
+       }
+
        @Override
        public boolean isTerminated() {
                return status.get() == STATUS_SHUTDOWN;
        }
 
        @Override
-       public void quiesceAndAwaitPending() throws InterruptedException {
+       public void quiesce() throws InterruptedException {
                if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
                        timerService.shutdown();
+               }
+       }
+
+       @Override
+       public void awaitPendingAfterQuiesce() throws InterruptedException {
+               if (!timerService.isTerminated()) {
+                       Preconditions.checkState(timerService.isTerminating() 
|| timerService.isShutdown());
 
                        // await forever (almost)
-                       timerService.awaitTermination(365, TimeUnit.DAYS);
+                       timerService.awaitTermination(365L, TimeUnit.DAYS);
                }
        }
 
@@ -199,15 +215,23 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
         */
        private static final class TriggerTask implements Runnable {
 
+               private final AtomicInteger serviceStatus;
                private final Object lock;
                private final ProcessingTimeCallback target;
                private final long timestamp;
                private final AsyncExceptionHandler exceptionHandler;
 
-               TriggerTask(AsyncExceptionHandler exceptionHandler, final 
Object lock, ProcessingTimeCallback target, long timestamp) {
-                       this.exceptionHandler = exceptionHandler;
-                       this.lock = lock;
-                       this.target = target;
+               private TriggerTask(
+                               final AtomicInteger serviceStatus,
+                               final AsyncExceptionHandler exceptionHandler,
+                               final Object lock,
+                               final ProcessingTimeCallback target,
+                               final long timestamp) {
+
+                       this.serviceStatus = 
Preconditions.checkNotNull(serviceStatus);
+                       this.exceptionHandler = 
Preconditions.checkNotNull(exceptionHandler);
+                       this.lock = Preconditions.checkNotNull(lock);
+                       this.target = Preconditions.checkNotNull(target);
                        this.timestamp = timestamp;
                }
 
@@ -215,7 +239,9 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                public void run() {
                        synchronized (lock) {
                                try {
-                                       target.onProcessingTime(timestamp);
+                                       if (serviceStatus.get() == 
STATUS_ALIVE) {
+                                               
target.onProcessingTime(timestamp);
+                                       }
                                } catch (Throwable t) {
                                        TimerException asyncException = new 
TimerException(t);
                                        
exceptionHandler.handleAsyncException("Caught exception while processing 
timer.", asyncException);
@@ -228,6 +254,8 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
         * Internal task which is repeatedly called by the processing time 
service.
         */
        private static final class RepeatedTriggerTask implements Runnable {
+
+               private final AtomicInteger serviceStatus;
                private final Object lock;
                private final ProcessingTimeCallback target;
                private final long period;
@@ -236,11 +264,14 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                private long nextTimestamp;
 
                private RepeatedTriggerTask(
-                               AsyncExceptionHandler exceptionHandler,
-                               Object lock,
-                               ProcessingTimeCallback target,
-                               long nextTimestamp,
-                               long period) {
+                               final AtomicInteger serviceStatus,
+                               final AsyncExceptionHandler exceptionHandler,
+                               final Object lock,
+                               final ProcessingTimeCallback target,
+                               final long nextTimestamp,
+                               final long period) {
+
+                       this.serviceStatus = 
Preconditions.checkNotNull(serviceStatus);
                        this.lock = Preconditions.checkNotNull(lock);
                        this.target = Preconditions.checkNotNull(target);
                        this.period = period;
@@ -251,15 +282,17 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
 
                @Override
                public void run() {
-                       try {
-                               synchronized (lock) {
-                                       target.onProcessingTime(nextTimestamp);
-                               }
+                       synchronized (lock) {
+                               try {
+                                       if (serviceStatus.get() == 
STATUS_ALIVE) {
+                                               
target.onProcessingTime(nextTimestamp);
+                                       }
 
-                               nextTimestamp += period;
-                       } catch (Throwable t) {
-                               TimerException asyncException = new 
TimerException(t);
-                               exceptionHandler.handleAsyncException("Caught 
exception while processing repeated timer task.", asyncException);
+                                       nextTimestamp += period;
+                               } catch (Throwable t) {
+                                       TimerException asyncException = new 
TimerException(t);
+                                       
exceptionHandler.handleAsyncException("Caught exception while processing 
repeated timer task.", asyncException);
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index a9d5205..080eeb5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -117,7 +117,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
        }
 
        @Override
-       public void quiesceAndAwaitPending() {
+       public void quiesce() {
                if (!isTerminated) {
                        isQuiesced = true;
                        priorityQueue.clear();
@@ -125,6 +125,11 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
        }
 
        @Override
+       public void awaitPendingAfterQuiesce() throws InterruptedException {
+               // do nothing.
+       }
+
+       @Override
        public void shutdownService() {
                this.isTerminated = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 394bc35..d60966a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -541,6 +541,53 @@ public class OneInputStreamTaskTest extends TestLogger {
                TestingStreamOperator.numberRestoreCalls = 0;
        }
 
+       @Test
+       public void testQuiesceTimerServiceAfterOpClose() throws Exception {
+
+               final OneInputStreamTask<String, String> task = new 
OneInputStreamTask<>();
+
+               SystemProcessingTimeService timeService = new 
SystemProcessingTimeService(task, task.getCheckpointLock());
+               task.setProcessingTimeService(timeService);
+
+               // verify that the timer service is running
+               Assert.assertTrue(((SystemProcessingTimeService) 
task.getProcessingTimeService()).isAlive());
+
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<String, String>(
+                               task, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               streamConfig.setStreamOperator(new TestOperator());
+               streamConfig.setOperatorID(new OperatorID());
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+               timeService.shutdownService();
+       }
+
+       private static class TestOperator
+                       extends AbstractStreamOperator<String>
+                       implements OneInputStreamOperator<String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+                       output.collect(element);
+               }
+
+               @Override
+               public void close() throws Exception {
+
+                       // verify that the timer service is still running
+                       Assert.assertTrue(
+                                       ((SystemProcessingTimeService) 
getContainingTask().getProcessingTimeService())
+                                       .isAlive());
+                       super.close();
+               }
+       }
 
        
//==============================================================================================
        // Utility functions and classes

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 890fc23..4c105d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -180,7 +180,8 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
                        assertFalse(scheduledFuture.isDone());
 
                        // this should cancel our future
-                       timer.quiesceAndAwaitPending();
+                       timer.quiesce();
+                       timer.awaitPendingAfterQuiesce();
 
                        // it may be that the cancelled status is not 
immediately visible after the
                        // termination (not necessary a volatile update), so we 
need to "get()" the cancellation
@@ -297,7 +298,7 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
 
                        final ReentrantLock scopeLock = new ReentrantLock();
 
-                       timer.registerTimer(System.currentTimeMillis() + 20, 
new ProcessingTimeCallback() {
+                       timer.registerTimer(timer.getCurrentProcessingTime() + 
20L, new ProcessingTimeCallback() {
                                @Override
                                public void onProcessingTime(long timestamp) 
throws Exception {
                                        scopeLock.lock();
@@ -313,13 +314,14 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
 
                        // after the task triggered, shut the timer down 
cleanly, waiting for the task to finish
                        latch.await();
-                       timer.quiesceAndAwaitPending();
+                       timer.quiesce();
+                       timer.awaitPendingAfterQuiesce();
 
                        // should be able to immediately acquire the lock, 
since the task must have exited by now
                        assertTrue(scopeLock.tryLock());
 
                        // should be able to schedule more tasks (that never 
get executed)
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new 
ProcessingTimeCallback() {
+                       ScheduledFuture<?> future = 
timer.registerTimer(timer.getCurrentProcessingTime() - 5L, new 
ProcessingTimeCallback() {
                                @Override
                                public void onProcessingTime(long timestamp) 
throws Exception {
                                        throw new Exception("test");
@@ -328,7 +330,7 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
                        assertNotNull(future);
 
                        // nothing should be scheduled right now
-                       assertEquals(0, timer.getNumTasksScheduled());
+                       assertEquals(0L, timer.getNumTasksScheduled());
 
                        // check that no asynchronous error was reported - that 
ensures that the newly scheduled
                        // triggerable did, in fact, not trigger

Reply via email to