[FLINK-4496] Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4779c7ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4779c7ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4779c7ec Branch: refs/heads/master Commit: 4779c7eca0f7e91dd5ee38122baa3fe99c8b7bea Parents: 568845a Author: kl0u <kklou...@gmail.com> Authored: Thu Aug 25 17:38:49 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Sep 23 15:01:06 2016 +0200 ---------------------------------------------------------------------- .../kafka/testutils/MockRuntimeContext.java | 41 +--- .../runtime/tasks/AsyncExceptionHandler.java | 31 +++ .../tasks/DefaultTimeServiceProvider.java | 76 ++++++- .../streaming/runtime/tasks/StreamTask.java | 81 +++---- .../runtime/tasks/TestTimeServiceProvider.java | 44 ++-- .../runtime/tasks/TimeServiceProvider.java | 6 +- .../operators/StreamSourceOperatorTest.java | 14 +- .../runtime/operators/StreamTaskTimerTest.java | 53 ----- .../runtime/operators/TimeProviderTest.java | 214 +++++++++++++++++++ ...AlignedProcessingTimeWindowOperatorTest.java | 12 +- ...AlignedProcessingTimeWindowOperatorTest.java | 2 + .../runtime/tasks/StreamTaskTestHarness.java | 2 +- .../util/OneInputStreamOperatorTestHarness.java | 85 ++++---- .../streaming/util/WindowingTestHarness.java | 2 - 14 files changed, 436 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 2d5e2d8..7a50569 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -58,38 +58,28 @@ public class MockRuntimeContext extends StreamingRuntimeContext { private final int indexOfThisSubtask; private final ExecutionConfig execConfig; - private final Object checkpointLock; private final TimeServiceProvider timerService; public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { - this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null); + this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), new Object()); } public MockRuntimeContext( - int numberOfParallelSubtasks, int indexOfThisSubtask, + int numberOfParallelSubtasks, + int indexOfThisSubtask, ExecutionConfig execConfig, Object checkpointLock) { - this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock, - DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor())); - } - - public MockRuntimeContext( - int numberOfParallelSubtasks, int indexOfThisSubtask, - ExecutionConfig execConfig, - Object checkpointLock, - TimeServiceProvider timerService) { - super(new MockStreamOperator(), - new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), - Collections.<String, Accumulator<?, ?>>emptyMap()); - + new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), + Collections.<String, Accumulator<?, ?>>emptyMap()); + this.numberOfParallelSubtasks = numberOfParallelSubtasks; this.indexOfThisSubtask = indexOfThisSubtask; this.execConfig = execConfig; - this.checkpointLock = checkpointLock; - this.timerService = timerService; + this.timerService = DefaultTimeServiceProvider. + createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock); } @Override @@ -216,20 +206,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { @Override public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) { Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); - - return timerService.registerTimer(time, new Runnable() { - @Override - public void run() { - synchronized (checkpointLock) { - try { - target.trigger(time); - } catch (Throwable t) { - System.err.println("!!! Caught exception while processing timer. !!!"); - t.printStackTrace(); - } - } - } - }); + return timerService.registerTimer(time, target); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java new file mode 100644 index 0000000..85a4115 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + +/** + * An interface marking a task as capable to register exceptions thrown by different + * threads, other than the one executing the taks itself. + */ +public interface AsyncExceptionHandler { + + /** + * Registers to the main thread an exception that was thrown by another thread + * (e.g. a TriggerTask), other than the one executing the main task. + */ + void registerAsyncException(String message, AsynchronousException exception); +} http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java index b803b82..c7339b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.runtime.operators.Triggerable; + import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -28,15 +31,26 @@ import java.util.concurrent.TimeUnit; */ public class DefaultTimeServiceProvider extends TimeServiceProvider { + /** The containing task that owns this time service provider. */ + private final AsyncExceptionHandler task; + + private final Object checkpointLock; + /** The executor service that schedules and calls the triggers of this task*/ private final ScheduledExecutorService timerService; - public static DefaultTimeServiceProvider create (ScheduledExecutorService executor) { - return new DefaultTimeServiceProvider(executor); + public static DefaultTimeServiceProvider create(AsyncExceptionHandler task, + ScheduledExecutorService executor, + Object checkpointLock) { + return new DefaultTimeServiceProvider(task, executor, checkpointLock); } - private DefaultTimeServiceProvider(ScheduledExecutorService threadPoolExecutor) { + private DefaultTimeServiceProvider(AsyncExceptionHandler task, + ScheduledExecutorService threadPoolExecutor, + Object checkpointLock) { + this.task = task; this.timerService = threadPoolExecutor; + this.checkpointLock = checkpointLock; } @Override @@ -45,16 +59,62 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { } @Override - public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) { + public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { long delay = Math.max(timestamp - getCurrentProcessingTime(), 0); - return timerService.schedule(target, delay, TimeUnit.MILLISECONDS); + return timerService.schedule(new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); + } + + @Override + public boolean isTerminated() { + return timerService.isTerminated(); } @Override public void shutdownService() throws Exception { - if (!timerService.isTerminated()) { - StreamTask.LOG.info("Timer service is shutting down."); - } timerService.shutdownNow(); } + + /** + * Internal task that is invoked by the timer service and triggers the target. + */ + private static final class TriggerTask implements Runnable { + + private final Object lock; + private final Triggerable target; + private final long timestamp; + private final AsyncExceptionHandler task; + + TriggerTask(AsyncExceptionHandler task, final Object lock, Triggerable target, long timestamp) { + this.task = task; + this.lock = lock; + this.target = target; + this.timestamp = timestamp; + } + + @Override + public void run() { + synchronized (lock) { + try { + target.trigger(timestamp); + } catch (Throwable t) { + + if (task != null) { + // registers the exception with the calling task + // so that it can be logged and (later) detected + TimerException asyncException = new TimerException(t); + task.registerAsyncException("Caught exception while processing timer.", asyncException); + } else { + // this is for when we are in testing mode and we + // want to have real processing time. + t.printStackTrace(); + } + } + } + } + } + + @VisibleForTesting + public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) { + return new DefaultTimeServiceProvider(null, executor, checkpointLock); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 49bbee7..80d51a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -110,13 +110,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; @Internal public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> extends AbstractInvokable - implements StatefulTask { + implements StatefulTask, AsyncExceptionHandler { /** The thread group that holds all trigger timer threads */ public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers"); /** The logger used by the StreamTask and its subclasses */ - protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); + private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); // ------------------------------------------------------------------------ @@ -207,7 +207,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> timerService = timeProvider; } + /** + * Returns the current processing time. + */ public long getCurrentProcessingTime() { + if (timerService == null) { + throw new IllegalStateException("The timer service has not been initialized."); + } return timerService.getCurrentProcessingTime(); } @@ -237,7 +243,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // that timestamp are removed by user executor.setRemoveOnCancelPolicy(true); - timerService = DefaultTimeServiceProvider.create(executor); + timerService = DefaultTimeServiceProvider.create(this, executor, getCheckpointLock()); } headOperator = configuration.getStreamOperator(getUserCodeClassLoader()); @@ -319,7 +325,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // stop all timers and threads if (timerService != null) { try { - timerService.shutdownService(); + if (!timerService.isTerminated()) { + LOG.info("Timer service is shutting down."); + timerService.shutdownService(); + } } catch (Throwable t) { // catch and log the exception to not replace the original exception @@ -475,7 +484,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> protected void finalize() throws Throwable { super.finalize(); if (timerService != null) { - timerService.shutdownService(); + if (!timerService.isTerminated()) { + LOG.info("Timer service is shutting down."); + timerService.shutdownService(); + } } closeAllClosables(); @@ -819,7 +831,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> if (timerService == null) { throw new IllegalStateException("The timer service has not been initialized."); } - return timerService.registerTimer(timestamp, new TriggerTask(this, lock, target, timestamp)); + return timerService.registerTimer(timestamp, target); } /** @@ -836,6 +848,17 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } + @Override + public void registerAsyncException(String message, AsynchronousException exception) { + if (isRunning) { + LOG.error(message, exception); + } + + if (this.asyncException == null) { + this.asyncException = exception; + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -863,42 +886,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } // ------------------------------------------------------------------------ - - /** - * Internal task that is invoked by the timer service and triggers the target. - */ - private static final class TriggerTask implements Runnable { - - private final Object lock; - private final Triggerable target; - private final long timestamp; - private final StreamTask<?, ?> task; - - TriggerTask(StreamTask<?, ?> task, final Object lock, Triggerable target, long timestamp) { - this.task = task; - this.lock = lock; - this.target = target; - this.timestamp = timestamp; - } - - @Override - public void run() { - synchronized (lock) { - try { - target.trigger(timestamp); - } catch (Throwable t) { - if (task.isRunning) { - LOG.error("Caught exception while processing timer.", t); - } - if (task.asyncException == null) { - task.asyncException = new TimerException(t); - } - } - } - } - } - - // ------------------------------------------------------------------------ private static class AsyncCheckpointRunnable implements Runnable, Closeable { @@ -961,12 +948,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } catch (Exception e) { - if (owner.isRunning()) { - LOG.error("Caught exception while materializing asynchronous checkpoints.", e); - } - if (owner.asyncException == null) { - owner.asyncException = new AsynchronousException(e); - } + + // registers the exception and tries to fail the whole task + AsynchronousException asyncException = new AsynchronousException(e); + owner.registerAsyncException("Caught exception while materializing asynchronous checkpoints.", asyncException); } finally { synchronized (cancelables) { http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java index 2314deb..a21a2e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java @@ -17,11 +17,13 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.streaming.runtime.operators.Triggerable; + import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ScheduledFuture; /** @@ -32,30 +34,34 @@ public class TestTimeServiceProvider extends TimeServiceProvider { private long currentTime = 0; - private Map<Long, List<Runnable>> registeredTasks = new HashMap<>(); + private boolean isTerminated = false; + + // sorts the timers by timestamp so that they are processed in the correct order. + private Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); - public void setCurrentTime(long timestamp) { + public void setCurrentTime(long timestamp) throws Exception { this.currentTime = timestamp; // decide which timers to fire and put them in a list // we do not fire them here to be able to accommodate timers - // that register other timers. The latter would through an exception. + // that register other timers. - Iterator<Map.Entry<Long, List<Runnable>>> it = registeredTasks.entrySet().iterator(); - List<Runnable> toRun = new ArrayList<>(); + Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator(); + List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>(); while (it.hasNext()) { - Map.Entry<Long, List<Runnable>> t = it.next(); + Map.Entry<Long, List<Triggerable>> t = it.next(); if (t.getKey() <= this.currentTime) { - for (Runnable r: t.getValue()) { - toRun.add(r); - } + toRun.add(t); it.remove(); } } // now do the actual firing. - for (Runnable r: toRun) { - r.run(); + for (Map.Entry<Long, List<Triggerable>> tasks: toRun) { + long now = tasks.getKey(); + for (Triggerable task: tasks.getValue()) { + task.trigger(now); + } } } @@ -65,8 +71,8 @@ public class TestTimeServiceProvider extends TimeServiceProvider { } @Override - public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) { - List<Runnable> tasks = registeredTasks.get(timestamp); + public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + List<Triggerable> tasks = registeredTasks.get(timestamp); if (tasks == null) { tasks = new ArrayList<>(); registeredTasks.put(timestamp, tasks); @@ -75,9 +81,14 @@ public class TestTimeServiceProvider extends TimeServiceProvider { return null; } + @Override + public boolean isTerminated() { + return isTerminated; + } + public int getNoOfRegisteredTimers() { int count = 0; - for (List<Runnable> tasks: registeredTasks.values()) { + for (List<Triggerable> tasks: registeredTasks.values()) { count += tasks.size(); } return count; @@ -85,7 +96,6 @@ public class TestTimeServiceProvider extends TimeServiceProvider { @Override public void shutdownService() throws Exception { - this.registeredTasks.clear(); - this.registeredTasks = null; + this.isTerminated = true; } } http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java index f3e4f78..42a4fa4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java @@ -16,6 +16,7 @@ */ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.streaming.runtime.operators.Triggerable; import java.util.concurrent.ScheduledFuture; /** @@ -34,7 +35,10 @@ public abstract class TimeServiceProvider { * the task to be executed * @return the result to be returned. */ - public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Runnable target); + public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target); + + /** Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. */ + public abstract boolean isTerminated(); /** Shuts down and clean up the timer service provider. */ public abstract void shutdownService() throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index 9c06b49..d61fec9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -184,6 +184,8 @@ public class StreamSourceOperatorTest { long watermarkInterval = 10; TestTimeServiceProvider timeProvider = new TestTimeServiceProvider(); + timeProvider.setCurrentTime(0); + setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider); final List<StreamElement> output = new ArrayList<>(); @@ -249,17 +251,7 @@ public class StreamSourceOperatorTest { throw new RuntimeException("The time provider is null"); } - timeProvider.registerTimer(execTime, new Runnable() { - - @Override - public void run() { - try { - target.trigger(execTime); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); + timeProvider.registerTimer(execTime, target); return null; } }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index c9f204d..b9435f5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -27,7 +27,6 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,58 +48,6 @@ import static org.junit.Assert.*; public class StreamTaskTimerTest { @Test - public void testCustomTimeServiceProvider() throws Throwable { - TestTimeServiceProvider tp = new TestTimeServiceProvider(); - - final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); - mapTask.setTimeService(tp); - - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( - mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - - StreamConfig streamConfig = testHarness.getStreamConfig(); - - StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>()); - streamConfig.setStreamOperator(mapOperator); - - testHarness.invoke(); - - assertTrue(testHarness.getCurrentProcessingTime() == 0); - - tp.setCurrentTime(11); - assertTrue(testHarness.getCurrentProcessingTime() == 11); - - tp.setCurrentTime(15); - tp.setCurrentTime(16); - assertTrue(testHarness.getCurrentProcessingTime() == 16); - - // register 2 tasks - mapTask.registerTimer(30, new Triggerable() { - @Override - public void trigger(long timestamp) { - - } - }); - - mapTask.registerTimer(40, new Triggerable() { - @Override - public void trigger(long timestamp) { - - } - }); - - assertEquals(2, tp.getNoOfRegisteredTimers()); - - tp.setCurrentTime(35); - assertEquals(1, tp.getNoOfRegisteredTimers()); - - tp.setCurrentTime(40); - assertEquals(0, tp.getNoOfRegisteredTimers()); - - tp.shutdownService(); - } - - @Test public void testOpenCloseAndTimestamps() throws Exception { final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java new file mode 100644 index 0000000..4d4f07b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ResultPartitionWriter.class) +@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) +public class TimeProviderTest { + + @Test + public void testDefaultTimeProvider() throws InterruptedException { + final OneShotLatch latch = new OneShotLatch(); + + final Object lock = new Object(); + TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider + .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock); + + final List<Long> timestamps = new ArrayList<>(); + + long start = System.currentTimeMillis(); + long interval = 50L; + + final long noOfTimers = 20; + + // we add 2 timers per iteration minus the first that would have a negative timestamp + final long expectedNoOfTimers = 2 * noOfTimers - 1; + + for (int i = 0; i < noOfTimers; i++) { + double nextTimer = start + i * interval; + + timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + timestamps.add(timestamp); + if (timestamps.size() == expectedNoOfTimers) { + latch.trigger(); + } + } + }); + + // add also out-of-order tasks to verify that eventually + // they will be executed in the correct order. + + if (i > 0) { + timeServiceProvider.registerTimer((long) (nextTimer - 10), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + timestamps.add(timestamp); + if (timestamps.size() == expectedNoOfTimers) { + latch.trigger(); + } + } + }); + } + } + + if (!latch.isTriggered()) { + latch.await(); + } + + Assert.assertEquals(timestamps.size(), expectedNoOfTimers); + + // verify that the tasks are executed + // in ascending timestamp order + + int counter = 0; + long lastTs = Long.MIN_VALUE; + for (long timestamp: timestamps) { + Assert.assertTrue(timestamp >= lastTs); + lastTs = timestamp; + + long expectedTs = start + (counter/2) * interval; + Assert.assertEquals(timestamp, (expectedTs + ((counter % 2 == 0) ? 0 : 40))); + counter++; + } + } + + @Test + public void testTimerSorting() throws Exception { + + final List<Long> result = new ArrayList<>(); + + TestTimeServiceProvider provider = new TestTimeServiceProvider(); + + provider.registerTimer(45, new Triggerable() { + @Override + public void trigger(long timestamp) { + result.add(timestamp); + } + }); + + provider.registerTimer(50, new Triggerable() { + @Override + public void trigger(long timestamp) { + result.add(timestamp); + } + }); + + provider.registerTimer(30, new Triggerable() { + @Override + public void trigger(long timestamp) { + result.add(timestamp); + } + }); + + provider.registerTimer(50, new Triggerable() { + @Override + public void trigger(long timestamp) { + result.add(timestamp); + } + }); + + Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4); + + provider.setCurrentTime(100); + long seen = 0; + for (Long l: result) { + Assert.assertTrue(l >= seen); + seen = l; + } + } + + @Test + public void testCustomTimeServiceProvider() throws Throwable { + TestTimeServiceProvider tp = new TestTimeServiceProvider(); + + final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); + mapTask.setTimeService(tp); + + final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( + mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + + StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>()); + streamConfig.setStreamOperator(mapOperator); + + testHarness.invoke(); + + assertTrue(testHarness.getCurrentProcessingTime() == 0); + + tp.setCurrentTime(11); + assertTrue(testHarness.getCurrentProcessingTime() == 11); + + tp.setCurrentTime(15); + tp.setCurrentTime(16); + assertTrue(testHarness.getCurrentProcessingTime() == 16); + + // register 2 tasks + mapTask.registerTimer(30, new Triggerable() { + @Override + public void trigger(long timestamp) { + + } + }); + + mapTask.registerTimer(40, new Triggerable() { + @Override + public void trigger(long timestamp) { + + } + }); + + assertEquals(2, tp.getNoOfRegisteredTimers()); + + tp.setCurrentTime(35); + assertEquals(1, tp.getNoOfRegisteredTimers()); + + tp.setCurrentTime(40); + assertEquals(0, tp.getNoOfRegisteredTimers()); + + tp.shutdownService(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 40a6c79..9849bd7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -493,7 +493,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { for (int i = 0; i < 300; i++) { testHarness.processElement(new StreamRecord<>(i + numElementsFirst)); } - + op.dispose(); // re-create the operator and restore the state @@ -502,9 +502,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); - + timerService = new TestTimeServiceProvider(); + testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); testHarness.restore(state); @@ -580,15 +579,16 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { for (int i = numElementsFirst; i < numElements; i++) { testHarness.processElement(new StreamRecord<>(i)); } - + op.dispose(); - + // re-create the operator and restore the state op = new AccumulatingProcessingTimeWindowOperator<>( validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSlide); + timerService = new TestTimeServiceProvider(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 59bfe6f..3dfa395 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -601,6 +601,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); + timerService = new TestTimeServiceProvider(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); @@ -692,6 +693,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); + timerService = new TestTimeServiceProvider(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index cb10c5c..ce634f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -115,7 +115,7 @@ public class StreamTaskTestHarness<OUT> { public long getCurrentProcessingTime() { if (!(task instanceof StreamTask)) { - System.currentTimeMillis(); + throw new UnsupportedOperationException("getCurrentProcessingTime() only supported on StreamTasks."); } return ((StreamTask) task).getCurrentProcessingTime(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 15074a7..6c637bf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,8 +37,10 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -87,7 +89,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { */ private boolean setupCalled = false; - public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) { this(operator, new ExecutionConfig()); } @@ -95,27 +96,35 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) { - this(operator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor())); + this(operator, executionConfig, null); + } + + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator<IN, OUT> operator, + ExecutionConfig executionConfig, + TimeServiceProvider testTimeProvider) { + this(operator, executionConfig, new Object(), testTimeProvider); } public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, + Object checkpointLock, TimeServiceProvider testTimeProvider) { + this.operator = operator; this.outputList = new ConcurrentLinkedQueue<Object>(); Configuration underlyingConfig = new Configuration(); this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); this.executionConfig = executionConfig; - this.checkpointLock = new Object(); + this.checkpointLock = checkpointLock; final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0); mockTask = mock(StreamTask.class); - timeServiceProvider = testTimeProvider; when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); + when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock); when(mockTask.getConfiguration()).thenReturn(config); when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); when(mockTask.getEnvironment()).thenReturn(env); @@ -125,21 +134,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - final long execTime = (Long) invocation.getArguments()[0]; - final Triggerable target = (Triggerable) invocation.getArguments()[1]; - - timeServiceProvider.registerTimer( - execTime, new TriggerTask(checkpointLock, target, execTime)); + // do nothing return null; } - }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); - - doAnswer(new Answer<Long>() { - @Override - public Long answer(InvocationOnMock invocation) throws Throwable { - return timeServiceProvider.getCurrentProcessingTime(); - } - }).when(mockTask).getCurrentProcessingTime(); + }).when(mockTask).registerAsyncException(any(String.class), any(AsynchronousException.class)); try { doAnswer(new Answer<CheckpointStreamFactory>() { @@ -154,6 +152,26 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { throw new RuntimeException(e.getMessage(), e); } + timeServiceProvider = testTimeProvider != null ? testTimeProvider : + DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final long execTime = (Long) invocation.getArguments()[0]; + final Triggerable target = (Triggerable) invocation.getArguments()[1]; + + timeServiceProvider.registerTimer(execTime, target); + return null; + } + }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); + + doAnswer(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + return timeServiceProvider.getCurrentProcessingTime(); + } + }).when(mockTask).getCurrentProcessingTime(); } public void setStateBackend(AbstractStateBackend stateBackend) { @@ -216,7 +234,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { operator.notifyOfCompletedCheckpoint(checkpointId); } - /** * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()} */ @@ -275,32 +292,4 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { // ignore } } - - private static final class TriggerTask implements Runnable { - - private final Object lock; - private final Triggerable target; - private final long timestamp; - - TriggerTask(final Object lock, Triggerable target, long timestamp) { - this.lock = lock; - this.target = target; - this.timestamp = timestamp; - } - - @Override - public void run() { - synchronized (lock) { - try { - target.trigger(timestamp); - } catch (Throwable t) { - try { - throw t; - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java index af1f3fa..d47136c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark;