[hotfix] Various code cleanups around time service and asynchronous exceptions
- DefaultTimeServiceProvider now owns scheduled executor - Enforce that an asynchronous exception handler is always set Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954ef08f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954ef08f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954ef08f Branch: refs/heads/master Commit: 954ef08f374d7e7c1f2b469201b1ea05c6376b33 Parents: 8ff451b Author: Stephan Ewen <[email protected]> Authored: Tue Oct 4 16:15:05 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 19:36:13 2016 +0200 ---------------------------------------------------------------------- .../AbstractFetcherTimestampsTest.java | 122 +++++++++++-------- .../kafka/testutils/MockRuntimeContext.java | 40 +++--- .../api/operators/StreamSourceContexts.java | 6 +- .../runtime/io/StreamInputProcessor.java | 4 +- .../runtime/tasks/AsyncExceptionHandler.java | 1 + .../runtime/tasks/AsynchronousException.java | 11 +- .../tasks/DefaultTimeServiceProvider.java | 57 +++++---- .../runtime/tasks/OneInputStreamTask.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 18 +-- .../runtime/tasks/TestTimeServiceProvider.java | 2 +- .../runtime/tasks/TwoInputStreamTask.java | 2 +- .../runtime/operators/TimeProviderTest.java | 45 +++++-- ...AlignedProcessingTimeWindowOperatorTest.java | 84 ++++++++----- ...AlignedProcessingTimeWindowOperatorTest.java | 101 +++++++++------ .../operators/windowing/NoOpTimerService.java | 49 ++++++++ .../util/OneInputStreamOperatorTestHarness.java | 6 +- 16 files changed, 347 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 8c68fbe..c3ba7b7 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -25,7 +25,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.util.SerializedValue; import org.junit.Test; @@ -34,6 +37,7 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@ -110,6 +114,7 @@ public class AbstractFetcherTimestampsTest { @Test public void testPeriodicWatermarks() throws Exception { + ExecutionConfig config = new ExecutionConfig(); config.setAutoWatermarkInterval(10); @@ -120,61 +125,70 @@ public class AbstractFetcherTimestampsTest { TestSourceContext<Long> sourceContext = new TestSourceContext<>(); - TestFetcher<Long> fetcher = new TestFetcher<>( - sourceContext, originalPartitions, - new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), - null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock())); - - final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; - final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; - final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; - - // elements generate a watermark if the timestamp is a multiple of three - - // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L); - fetcher.emitRecord(2L, part1, 2L); - fetcher.emitRecord(3L, part1, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L); - assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock()); - // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L); - fetcher.emitRecord(102L, part3, 2L); - assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); - - // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L); - fetcher.emitRecord(1004L, part3, 4L); - fetcher.emitRecord(1005L, part3, 5L); - assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); - - // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L); - assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); - - // this blocks until the periodic thread emitted the watermark - assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L); - fetcher.emitRecord(14L, part2, 3L); - fetcher.emitRecord(15L, part2, 3L); - - // this blocks until the periodic thread emitted the watermark - long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); - assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); + try { + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, originalPartitions, + new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), + null, new MockRuntimeContext(17, 3, config, timerService)); + + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 2 + fetcher.emitRecord(12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 3 + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + fetcher.emitRecord(30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + + // this blocks until the periodic thread emitted the watermark + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + fetcher.emitRecord(13L, part2, 2L); + fetcher.emitRecord(14L, part2, 3L); + fetcher.emitRecord(15L, part2, 3L); + + // this blocks until the periodic thread emitted the watermark + long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); + assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); + } + finally { + timerService.shutdownService(); + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 5be4195..e1ec4cb 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -32,45 +32,46 @@ import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; -import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; @SuppressWarnings("deprecation") public class MockRuntimeContext extends StreamingRuntimeContext { private final int numberOfParallelSubtasks; private final int indexOfThisSubtask; - - private final ExecutionConfig execConfig; - private final TimeServiceProvider timerService; + private final ExecutionConfig execConfig; + private final TimeServiceProvider timeServiceProvider; + public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { - this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), new Object()); + this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig()); } public MockRuntimeContext( - int numberOfParallelSubtasks, - int indexOfThisSubtask, - ExecutionConfig execConfig, - Object checkpointLock) { - + int numberOfParallelSubtasks, + int indexOfThisSubtask, + ExecutionConfig execConfig) { + this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, null); + } + + public MockRuntimeContext( + int numberOfParallelSubtasks, + int indexOfThisSubtask, + ExecutionConfig execConfig, + TimeServiceProvider timeServiceProvider) { + super(new MockStreamOperator(), new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), Collections.<String, Accumulator<?, ?>>emptyMap()); @@ -78,8 +79,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { this.numberOfParallelSubtasks = numberOfParallelSubtasks; this.indexOfThisSubtask = indexOfThisSubtask; this.execConfig = execConfig; - this.timerService = DefaultTimeServiceProvider. - createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock); + this.timeServiceProvider = timeServiceProvider; } @Override @@ -189,7 +189,11 @@ public class MockRuntimeContext extends StreamingRuntimeContext { @Override public TimeServiceProvider getTimeServiceProvider() { - return timerService; + if (timeServiceProvider == null) { + throw new UnsupportedOperationException(); + } else { + return timeServiceProvider; + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index abaf4e7..a290deb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -35,9 +35,9 @@ public class StreamSourceContexts { * Depending on the {@link TimeCharacteristic}, this method will return the adequate * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is: * <ul> - * <li> {@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext} - * <li> {@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext} - * <li> {@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext} + * <li>{@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}</li> + * <li>{@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}</li> + * <li>{@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}</li> * </ul> * */ public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 85e9297..2dbc6d4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -83,7 +83,9 @@ public class StreamInputProcessor<IN> { private Counter numRecordsIn; @SuppressWarnings("unchecked") - public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, + public StreamInputProcessor( + InputGate[] inputGates, + TypeSerializer<IN> inputSerializer, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, IOManager ioManager, http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java index 4c55055..a8125c3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.tasks; /** http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java index 311e0cd..cda0511 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java @@ -15,22 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; /** - * {@code RuntimeException} for wrapping exceptions that are thrown in Threads that are not the - * main compute Thread. + * An exception for wrapping exceptions that are thrown by an operator in threads other than the + * main compute thread of that operator. */ @Internal -public class AsynchronousException extends RuntimeException { +public class AsynchronousException extends Exception { private static final long serialVersionUID = 1L; public AsynchronousException(Throwable cause) { super(cause); } + public AsynchronousException(String message, Throwable cause) { + super(message, cause); + } + @Override public String toString() { return "AsynchronousException{" + getCause() + "}"; http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java index 9534b3c..5664eac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java @@ -17,15 +17,15 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.util.Preconditions; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A {@link TimeServiceProvider} which assigns as current processing time the result of calling * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. @@ -35,24 +35,34 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { /** The containing task that owns this time service provider. */ private final AsyncExceptionHandler task; + /** The lock that timers acquire upon triggering */ private final Object checkpointLock; /** The executor service that schedules and calls the triggers of this task*/ - private final ScheduledExecutorService timerService; + private final ScheduledThreadPoolExecutor timerService; + - public static DefaultTimeServiceProvider create( - AsyncExceptionHandler exceptionHandler, - ScheduledExecutorService executor, - Object checkpointLock) { - return new DefaultTimeServiceProvider(exceptionHandler, executor, checkpointLock); + public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) { + this(failureHandler, checkpointLock, null); } - private DefaultTimeServiceProvider(AsyncExceptionHandler task, - ScheduledExecutorService threadPoolExecutor, - Object checkpointLock) { - this.task = Preconditions.checkNotNull(task); - this.timerService = Preconditions.checkNotNull(threadPoolExecutor); - this.checkpointLock = Preconditions.checkNotNull(checkpointLock); + public DefaultTimeServiceProvider( + AsyncExceptionHandler task, + Object checkpointLock, + ThreadFactory threadFactory) { + + this.task = checkNotNull(task); + this.checkpointLock = checkNotNull(checkpointLock); + + if (threadFactory == null) { + this.timerService = new ScheduledThreadPoolExecutor(1); + } else { + this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory); + } + + // allow trigger tasks to be removed if all timers for + // that timestamp are removed by user + this.timerService.setRemoveOnCancelPolicy(true); } @Override @@ -76,6 +86,13 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { timerService.shutdownNow(); } + // safety net to destroy the thread pool + @Override + protected void finalize() throws Throwable { + super.finalize(); + timerService.shutdownNow(); + } + /** * Internal task that is invoked by the timer service and triggers the target. */ @@ -105,14 +122,4 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { } } } - - @VisibleForTesting - public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) { - return new DefaultTimeServiceProvider(new AsyncExceptionHandler() { - @Override - public void handleAsyncException(String message, Throwable exception) { - exception.printStackTrace(); - } - }, executor, checkpointLock); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index cf8853e..0a6534b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO final Object lock = getCheckpointLock(); while (running && inputProcessor.processInput(operator, lock)) { - + // all the work happens in the "processInput" method } } http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 33317fa..040ec66 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -67,7 +67,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; /** * Base class for all streaming tasks. A task is the unit of local processing that is deployed @@ -223,15 +223,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { + ThreadFactory timerThreadFactory = + new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()); - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, - new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); - - // allow trigger tasks to be removed if all timers for - // that timestamp are removed by user - executor.setRemoveOnCancelPolicy(true); - - timerService = DefaultTimeServiceProvider.create(this, executor, getCheckpointLock()); + timerService = new DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory); } operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter()); @@ -305,10 +300,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // stop all timers and threads if (timerService != null) { try { - if (!timerService.isTerminated()) { - LOG.info("Timer service is shutting down."); - timerService.shutdownService(); - } + timerService.shutdownService(); } catch (Throwable t) { // catch and log the exception to not replace the original exception http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java index a21a2e1..81faec9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java @@ -86,7 +86,7 @@ public class TestTimeServiceProvider extends TimeServiceProvider { return isTerminated; } - public int getNoOfRegisteredTimers() { + public int getNumRegisteredTimers() { int count = 0; for (List<Triggerable> tasks: registeredTasks.values()) { count += tasks.size(); http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 0197c53..fb08959 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS final Object lock = getCheckpointLock(); while (running && inputProcessor.processInput(operator, lock)) { - + // all the work happens in the "processInput" method } } http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java index 0351978..8d3e621 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -28,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,13 +39,14 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; @RunWith(PowerMockRunner.class) -@PrepareForTest(ResultPartitionWriter.class) +@PrepareForTest({ResultPartitionWriter.class}) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class TimeProviderTest { @@ -52,8 +55,10 @@ public class TimeProviderTest { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); - TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider - .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); final List<Long> timestamps = new ArrayList<>(); @@ -114,6 +119,8 @@ public class TimeProviderTest { lastTs = timestamp; counter++; } + + assertNull(error.get()); } @Test @@ -124,14 +131,14 @@ public class TimeProviderTest { final Object lock = new Object(); - TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider - .create(new AsyncExceptionHandler() { + TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider( + new AsyncExceptionHandler() { @Override public void handleAsyncException(String message, Throwable exception) { exceptionWasThrown.compareAndSet(false, true); latch.trigger(); } - }, Executors.newSingleThreadScheduledExecutor(), lock); + }, lock); long now = System.currentTimeMillis(); timeServiceProvider.registerTimer(now, new Triggerable() { @@ -182,7 +189,7 @@ public class TimeProviderTest { } }); - Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4); + Assert.assertEquals(provider.getNumRegisteredTimers(), 4); provider.setCurrentTime(100); long seen = 0; @@ -233,14 +240,30 @@ public class TimeProviderTest { } }); - assertEquals(2, tp.getNoOfRegisteredTimers()); + assertEquals(2, tp.getNumRegisteredTimers()); tp.setCurrentTime(35); - assertEquals(1, tp.getNoOfRegisteredTimers()); + assertEquals(1, tp.getNumRegisteredTimers()); tp.setCurrentTime(40); - assertEquals(0, tp.getNoOfRegisteredTimers()); + assertEquals(0, tp.getNumRegisteredTimers()); tp.shutdownService(); } + + // ------------------------------------------------------------------------ + + public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler { + + private final AtomicReference<Throwable> errorReference; + + public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) { + this.errorReference = errorReference; + } + + @Override + public void handleAsyncException(String message, Throwable exception) { + errorReference.compareAndSet(null, exception); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 30f38e3..4c6d391 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -58,7 +59,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -182,14 +183,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testWindowTriggerTimeAlignment() throws Exception { - final Object lock = new Object(); - TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final TimeServiceProvider timerService = new NoOpTimerService(); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object()); AccumulatingProcessingTimeWindowOperator<String, String, String> op; @@ -201,11 +199,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); - timerService.shutdownService(); - timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - mockTask = createMockTaskWithTimer(timerService, lock); - op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -214,11 +207,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); - timerService.shutdownService(); - timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - mockTask = createMockTaskWithTimer(timerService, lock); - op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -227,11 +215,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); - timerService.shutdownService(); - timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - mockTask = createMockTaskWithTimer(timerService, lock); - op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -244,16 +227,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testTumblingWindow() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; @@ -285,6 +267,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); List<Integer> result = out.getElements(); @@ -294,6 +278,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { for (int i = 0; i < numElements; i++) { assertEquals(i, result.get(i).intValue()); } + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -307,8 +295,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testSlidingWindow() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); @@ -335,6 +325,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); // get and verify the result @@ -361,6 +353,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { lastCount = 1; } } + + if (error.get() != null) { + throw new Exception(error.get()); + } } finally { timerService.shutdownService(); } @@ -369,8 +365,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testTumblingWindowSingleElements() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); @@ -412,7 +410,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -426,8 +430,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testSlidingWindowSingleElements() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); @@ -460,7 +466,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -798,4 +810,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } return result; } + + private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception { + timers.shutdownService(); + + while (!timers.isTerminated()) { + Thread.sleep(2); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 7539c2d..88e28bc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -40,14 +40,15 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; - import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + import org.junit.After; import org.junit.Test; @@ -60,7 +61,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -190,15 +191,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testWindowTriggerTimeAlignment() throws Exception { - final Object lock = new Object(); - TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); - + final TimeServiceProvider timerService = new NoOpTimerService(); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object()); + AggregatingProcessingTimeWindowOperator<String, String> op; op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, @@ -209,11 +207,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); - timerService.shutdownService(); - timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - mockTask = createMockTaskWithTimer(timerService, lock); - op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -222,11 +215,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); - timerService.shutdownService(); - timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - mockTask = createMockTaskWithTimer(timerService, lock); - op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -235,11 +223,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); - timerService.shutdownService(); - timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); - mockTask = createMockTaskWithTimer(timerService, lock); - op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); @@ -251,16 +234,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - } finally { - timerService.shutdownService(); } } @Test public void testTumblingWindowUniqueElements() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; @@ -297,6 +280,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); @@ -305,6 +290,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertEquals(i, result.get(i).f0.intValue()); assertEquals(i, result.get(i).f1.intValue()); } + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -318,8 +307,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testTumblingWindowDuplicateElements() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); @@ -364,6 +355,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); // we have ideally one element per window. we may have more, when we emitted a value into the @@ -373,6 +366,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // deduplicate for more accurate checks HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result); assertTrue(set.size() == 10); + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -386,8 +383,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testSlidingWindow() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); @@ -418,6 +417,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); // get and verify the result @@ -445,6 +446,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { lastCount = 1; } } + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -458,8 +463,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testSlidingWindowSingleElements() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); @@ -504,7 +511,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { synchronized (lock) { op.close(); } + + shutdownTimerServiceAndWait(timerService); op.dispose(); + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -518,8 +531,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testPropagateExceptionsFromProcessElement() throws Exception { final Object lock = new Object(); - final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting( - Executors.newSingleThreadScheduledExecutor(), lock); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(); @@ -556,7 +571,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue(e.getMessage().contains("Artificial Test Exception")); } + shutdownTimerServiceAndWait(timerService); op.dispose(); + + if (error.get() != null) { + throw new Exception(error.get()); + } } catch (Exception e) { e.printStackTrace(); @@ -971,8 +991,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) { - StreamConfig cfg = new StreamConfig(new Configuration()); - return cfg; + return new StreamConfig(new Configuration()); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -985,4 +1004,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } return result; } + + private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception { + timers.shutdownService(); + + while (!timers.isTerminated()) { + Thread.sleep(2); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java new file mode 100644 index 0000000..16e658e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; + +import java.util.concurrent.ScheduledFuture; + +class NoOpTimerService extends TimeServiceProvider { + + private volatile boolean terminated; + + @Override + public long getCurrentProcessingTime() { + return System.currentTimeMillis(); + } + + @Override + public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + return null; + } + + @Override + public boolean isTerminated() { + return terminated; + } + + @Override + public void shutdownService() throws Exception { + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index d8a0ee2..9d8e6a5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -42,12 +42,12 @@ import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -145,7 +145,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { @Override public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable { - final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0]; + final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0]; return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName()); } }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class)); @@ -154,7 +154,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } timeServiceProvider = testTimeProvider != null ? testTimeProvider : - DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock); + new DefaultTimeServiceProvider(mockTask, this.checkpointLock); doAnswer(new Answer<TimeServiceProvider>() { @Override
