Repository: flink Updated Branches: refs/heads/release-1.0 8949ccf66 -> 43e5975d5
[FLINK-3554] [streaming] Emit a MAX Watermark after finite sources finished This closes #1750 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e5975d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e5975d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e5975d Branch: refs/heads/release-1.0 Commit: 43e5975d5426e22eb4ef90e0f468bd7f6cd35736 Parents: 8949ccf Author: Stephan Ewen <[email protected]> Authored: Tue Mar 1 14:31:26 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Mar 1 17:04:34 2016 +0100 ---------------------------------------------------------------------- .../util/IncrementalLearningSkeletonData.java | 4 +- .../java/org/apache/flink/cep/CEPITCase.java | 4 +- .../minicluster/LocalFlinkMiniCluster.scala | 37 +++ .../operators/testutils/DummyEnvironment.java | 3 +- .../api/operators/StoppableStreamSource.java | 7 + .../streaming/api/operators/StreamSource.java | 52 +++- .../streaming/api/watermark/Watermark.java | 15 +- .../flink/streaming/api/SourceFunctionTest.java | 8 +- .../operators/StreamSourceOperatorTest.java | 251 +++++++++++++++++++ .../operators/windowing/CoGroupJoinITCase.java | 15 +- .../operators/windowing/WindowFoldITCase.java | 10 +- .../streaming/timestamp/TimestampITCase.java | 141 ++++++++++- .../util/OneInputStreamOperatorTestHarness.java | 2 +- .../util/TwoInputStreamOperatorTestHarness.java | 3 - .../streaming/api/scala/CoGroupJoinITCase.scala | 15 +- .../streaming/api/scala/WindowFoldITCase.scala | 6 +- 16 files changed, 511 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java index 1f4bfd8..144af99 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java @@ -20,12 +20,12 @@ package org.apache.flink.streaming.examples.ml.util; public class IncrementalLearningSkeletonData { public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + - "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + - "0\n" + "0\n" + "0\n" + "0\n"; + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"; private IncrementalLearningSkeletonData() { } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 4129e34..40c3c86 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -296,9 +296,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { Tuple2.of(new Event(2, "end", 2.0), 8L), Tuple2.of(new Event(1, "middle", 5.0), 7L), Tuple2.of(new Event(3, "middle", 6.0), 9L), - Tuple2.of(new Event(3, "end", 7.0), 7L), - // last element for high final watermark - Tuple2.of(new Event(3, "end", 7.0), 100L) + Tuple2.of(new Event(3, "end", 7.0), 7L) ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index c803429..a4c10e7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,16 +18,23 @@ package org.apache.flink.runtime.minicluster +import java.util + import akka.actor.{ActorRef, ActorSystem} +import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} +import org.apache.flink.runtime.messages.JobManagerMessages +import org.apache.flink.runtime.messages.JobManagerMessages.{StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs} import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation +import scala.concurrent.Await + /** * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same * JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to setup Flink's @@ -211,4 +218,34 @@ class LocalFlinkMiniCluster( JobManager.ARCHIVE_NAME } } + + // -------------------------------------------------------------------------- + // Actions on running jobs + // -------------------------------------------------------------------------- + + def currentlyRunningJobs: Iterable[JobID] = { + val leader = getLeaderGateway(timeout) + val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout) + .mapTo[RunningJobsStatus] + Await.result(future, timeout).runningJobs.map(_.getJobId) + } + + def getCurrentlyRunningJobsJava(): java.util.List[JobID] = { + val list = new java.util.ArrayList[JobID]() + currentlyRunningJobs.foreach(list.add) + list + } + + def stopJob(id: JobID) : Unit = { + val leader = getLeaderGateway(timeout) + val response = leader.ask(new JobManagerMessages.StopJob(id), timeout) + .mapTo[StoppingResponse] + val rc = Await.result(response, timeout) + + rc match { + case failure: StoppingFailure => + throw new Exception(s"Stopping the job with ID $id failed.", failure.cause) + case _ => + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index ff6593f..3fcc425 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.testutils; +import java.util.Collections; import java.util.Map; import java.util.concurrent.Future; @@ -105,7 +106,7 @@ public class DummyEnvironment implements Environment { @Override public Map<String, Future<Path>> getDistributedCacheEntries() { - return null; + return Collections.emptyMap(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java index 927f61f..ce8f6cd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java @@ -41,7 +41,14 @@ public class StoppableStreamSource<OUT, SRC extends SourceFunction<OUT> & Stoppa super(sourceFunction); } + /** + * Marks the source a stopped and calls {@link StoppableFunction#stop()} on the user function. + */ public void stop() { + // important: marking the source as stopped has to happen before the function is stopped. + // the flag that tracks this status is volatile, so the memory model also guarantees + // the happens-before relationship + markCanceledOrStopped(); userFunction.stop(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 2470417..84f59ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -42,14 +42,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> private transient SourceFunction.SourceContext<OUT> ctx; + private transient volatile boolean canceledOrStopped = false; + + public StreamSource(SRC sourceFunction) { super(sourceFunction); this.chainingStrategy = ChainingStrategy.HEAD; } + public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); + final SourceFunction.SourceContext<OUT> ctx; switch (timeCharacteristic) { case EventTime: @@ -66,19 +71,60 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> throw new Exception(String.valueOf(timeCharacteristic)); } - userFunction.run(ctx); - - ctx.close(); + // copy to a field to give the 'cancel()' method access + this.ctx = ctx; + + try { + userFunction.run(ctx); + + // if we get here, then the user function either exited after being done (finite source) + // or the function was canceled or stopped. For the finite source case, we should emit + // a final watermark that indicates that we reached the end of event-time + if (!isCanceledOrStopped()) { + ctx.emitWatermark(Watermark.MAX_WATERMARK); + } + } finally { + // make sure that the context is closed in any case + ctx.close(); + } } public void cancel() { + // important: marking the source as stopped has to happen before the function is stopped. + // the flag that tracks this status is volatile, so the memory model also guarantees + // the happens-before relationship + markCanceledOrStopped(); userFunction.cancel(); + // the context may not be initialized if the source was never running. if (ctx != null) { ctx.close(); } } + + /** + * Marks this source as canceled or stopped. + * + * <p>This indicates that any exit of the {@link #run(Object, Output)} method + * cannot be interpreted as the result of a finite source. + */ + protected void markCanceledOrStopped() { + this.canceledOrStopped = true; + } + /** + * Checks whether the source has been canceled or stopped. + * @return True, if the source is canceled or stopped, false is not. + */ + protected boolean isCanceledOrStopped() { + return canceledOrStopped; + } + + /** + * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...) + * has caused an exception. If one of these threads caused an exception, this method will + * throw that exception. + */ void checkAsyncException() { getContainingTask().checkTimerException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java index 1375c0d..cb9eb99 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.watermark; import org.apache.flink.annotation.PublicEvolving; @@ -38,11 +39,15 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; * <p> * When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}. When * an operator receives this it will know that no more input will be arriving in the future. - * */ @PublicEvolving -public class Watermark extends StreamElement { +public final class Watermark extends StreamElement { + /** The watermark that signifies end-of-event-time */ + public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE); + + // ------------------------------------------------------------------------ + /** The timestamp of the watermark */ private final long timestamp; @@ -60,6 +65,8 @@ public class Watermark extends StreamElement { return timestamp; } + // ------------------------------------------------------------------------ + @Override public boolean equals(Object o) { return this == o || http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java index b53649a..4b99202 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java @@ -61,9 +61,9 @@ public class SourceFunctionTest { assertEquals(expectedList, actualList); } - @Test - public void socketTextStreamTest() throws Exception { - // TODO: does not work because we cannot set the internal socket anymore +// TODO: does not work because we cannot set the internal socket anymore +// @Test +// public void socketTextStreamTest() throws Exception { // List<String> expectedList = Arrays.asList("a", "b", "c"); // List<String> actualList = new ArrayList<String>(); // @@ -80,5 +80,5 @@ public class SourceFunctionTest { // actualList.add(source.next()); // } // assertEquals(expectedList, actualList); - } +// } } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java new file mode 100644 index 0000000..b368019 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@SuppressWarnings("serial") +public class StreamSourceOperatorTest { + + @Test + public void testEmitMaxWatermarkForFiniteSource() throws Exception { + + // regular stream source operator + StreamSource<String, FiniteSource<String>> operator = + new StreamSource<>(new FiniteSource<String>()); + + final List<StreamElement> output = new ArrayList<>(); + + setupSourceOperator(operator); + operator.run(new Object(), new CollectorOutput<String>(output)); + + assertEquals(1, output.size()); + assertEquals(Watermark.MAX_WATERMARK, output.get(0)); + } + + @Test + public void testNoMaxWatermarkOnImmediateCancel() throws Exception { + + final List<StreamElement> output = new ArrayList<>(); + + // regular stream source operator + final StreamSource<String, InfiniteSource<String>> operator = + new StreamSource<>(new InfiniteSource<String>()); + + + setupSourceOperator(operator); + operator.cancel(); + + // run and exit + operator.run(new Object(), new CollectorOutput<String>(output)); + + assertTrue(output.isEmpty()); + } + + @Test + public void testNoMaxWatermarkOnAsyncCancel() throws Exception { + + final List<StreamElement> output = new ArrayList<>(); + final Thread runner = Thread.currentThread(); + + // regular stream source operator + final StreamSource<String, InfiniteSource<String>> operator = + new StreamSource<>(new InfiniteSource<String>()); + + + setupSourceOperator(operator); + + // trigger an async cancel in a bit + new Thread("canceler") { + @Override + public void run() { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) {} + operator.cancel(); + runner.interrupt(); + } + }.start(); + + // run and wait to be canceled + try { + operator.run(new Object(), new CollectorOutput<String>(output)); + } + catch (InterruptedException ignored) {} + + assertTrue(output.isEmpty()); + } + + @Test + public void testNoMaxWatermarkOnImmediateStop() throws Exception { + + final List<StreamElement> output = new ArrayList<>(); + + // regular stream source operator + final StoppableStreamSource<String, InfiniteSource<String>> operator = + new StoppableStreamSource<>(new InfiniteSource<String>()); + + + setupSourceOperator(operator); + operator.stop(); + + // run and stop + operator.run(new Object(), new CollectorOutput<String>(output)); + + assertTrue(output.isEmpty()); + } + + @Test + public void testNoMaxWatermarkOnAsyncStop() throws Exception { + + final List<StreamElement> output = new ArrayList<>(); + + // regular stream source operator + final StoppableStreamSource<String, InfiniteSource<String>> operator = + new StoppableStreamSource<>(new InfiniteSource<String>()); + + + setupSourceOperator(operator); + + // trigger an async cancel in a bit + new Thread("canceler") { + @Override + public void run() { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) {} + operator.stop(); + } + }.start(); + + // run and wait to be stopped + operator.run(new Object(), new CollectorOutput<String>(output)); + + assertTrue(output.isEmpty()); + } + + + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private static <T> void setupSourceOperator(StreamSource<T, ?> operator) { + ExecutionConfig executionConfig = new ExecutionConfig(); + StreamConfig cfg = new StreamConfig(new Configuration()); + + cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); + + Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0); + + StreamTask<?, ?> mockTask = mock(StreamTask.class); + when(mockTask.getName()).thenReturn("Mock Task"); + when(mockTask.getCheckpointLock()).thenReturn(new Object()); + when(mockTask.getConfiguration()).thenReturn(cfg); + when(mockTask.getEnvironment()).thenReturn(env); + when(mockTask.getExecutionConfig()).thenReturn(executionConfig); + when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); + + operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class)); + } + + // ------------------------------------------------------------------------ + + private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction { + + @Override + public void run(SourceContext<T> ctx) {} + + @Override + public void cancel() {} + + @Override + public void stop() {} + } + + private static final class InfiniteSource<T> implements SourceFunction<T>, StoppableFunction { + + private volatile boolean running = true; + + @Override + public void run(SourceContext<T> ctx) throws Exception { + while (running) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void stop() { + running = false; + } + } + + // ------------------------------------------------------------------------ + + private static class CollectorOutput<T> implements Output<StreamRecord<T>> { + + private final List<StreamElement> list; + + private CollectorOutput(List<StreamElement> list) { + this.list = list; + } + + @Override + public void emitWatermark(Watermark mark) { + list.add(mark); + } + + @Override + public void collect(StreamRecord<T> record) { + list.add(record); + } + + @Override + public void close() {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java index c40874c..5e67c72 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -74,8 +74,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple2.of("a", 7)); ctx.collect(Tuple2.of("a", 8)); - // so we get a final big watermark - ctx.collect(Tuple2.of("a", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override @@ -96,8 +95,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple2.of("c", 7)); ctx.collect(Tuple2.of("c", 8)); - // so we get a final big watermark - ctx.collect(Tuple2.of("a", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override @@ -172,8 +170,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple3.of("a", "j", 7)); ctx.collect(Tuple3.of("a", "k", 8)); - // so we get a final big watermark - ctx.collect(Tuple3.of("a", "k", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override @@ -194,8 +191,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple3.of("a", "x", 6)); ctx.collect(Tuple3.of("a", "z", 8)); - // so we get a final high watermark - ctx.collect(Tuple3.of("a", "z", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override @@ -272,8 +268,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple3.of("a", "j", 7)); ctx.collect(Tuple3.of("a", "k", 8)); - // so we get a final high watermark - ctx.collect(Tuple3.of("a", "k", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java index 3b859f0..fbe03c5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java @@ -74,13 +74,12 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple2.of("a", 7)); ctx.collect(Tuple2.of("a", 8)); - // so that we get a high final watermark to process the previously sent elements - ctx.collect(Tuple2.of("a", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override - public void cancel() { - } + public void cancel() {} + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); source1 @@ -139,8 +138,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { ctx.collect(Tuple2.of("b", 5)); ctx.collect(Tuple2.of("a", 5)); - // so that we get a high final watermark to process the previously sent elements - ctx.collect(Tuple2.of("a", 20)); + // source is finite, so it will have an implicit MAX watermark when it finishes } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index d918ba8..1a59ab3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -18,8 +18,10 @@ package org.apache.flink.streaming.timestamp; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -52,7 +54,8 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -156,14 +159,88 @@ public class TimestampITCase { System.err.println(CustomOperator.finalWatermarks[i].get(k)); } - Assert.fail("Wrong watermark."); + fail("Wrong watermark."); } } - assertFalse(CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1).equals(new Watermark(Long.MAX_VALUE))); + + assertEquals(Watermark.MAX_WATERMARK, + CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1)); } } + @Test + public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception { + + // for this test to work, we need to be sure that no other jobs are being executed + while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) { + Thread.sleep(100); + } + + final int NUM_WATERMARKS = 10; + + long initialTime = 0L; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS)); + DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2)); + + source1.union(source2) + .map(new IdentityMap()) + .connect(source2).map(new IdentityCoMap()) + .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) + .addSink(new NoOpSink<Integer>()); + new Thread("stopper") { + @Override + public void run() { + try { + // try until we get the running jobs + List<JobID> running; + while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) { + Thread.sleep(100); + } + + JobID id = running.get(0); + + // send stop until the job is stopped + do { + cluster.stopJob(id); + Thread.sleep(50); + } while (!cluster.getCurrentlyRunningJobsJava().isEmpty()); + } + catch (Throwable t) { + t.printStackTrace(); + } + } + }.start(); + + env.execute(); + + // verify that all the watermarks arrived at the final custom operator + for (int i = 0; i < PARALLELISM; i++) { + // we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the + // other source stops emitting after that + for (int j = 0; j < NUM_WATERMARKS / 2; j++) { + if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) { + System.err.println("All Watermarks: "); + for (int k = 0; k <= NUM_WATERMARKS / 2; k++) { + System.err.println(CustomOperator.finalWatermarks[i].get(k)); + } + + fail("Wrong watermark."); + } + } + + assertNotEquals(Watermark.MAX_WATERMARK, + CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1)); + } + } /** * These check whether timestamps are properly assigned at the sources and handled in @@ -200,8 +277,7 @@ public class TimestampITCase { @Test public void testDisabledTimestamps() throws Exception { final int NUM_ELEMENTS = 10; - - + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", cluster.getLeaderRPCPort()); @@ -222,7 +298,7 @@ public class TimestampITCase { } /** - * This thests whether timestamps are properly extracted in the timestamp + * This tests whether timestamps are properly extracted in the timestamp * extractor and whether watermarks are also correctly forwared from this with the auto watermark * interval. */ @@ -279,7 +355,10 @@ public class TimestampITCase { Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]); } } - assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE))); + + // the input is finite, so it should have a MAX Watermark + assertEquals(Watermark.MAX_WATERMARK, + CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); } /** @@ -339,7 +418,10 @@ public class TimestampITCase { Assert.fail("Wrong watermark."); } } - assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE))); + + // the input is finite, so it should have a MAX Watermark + assertEquals(Watermark.MAX_WATERMARK, + CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); } /** @@ -400,7 +482,9 @@ public class TimestampITCase { Assert.fail("Wrong watermark."); } } - assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE))); + // the input is finite, so it should have a MAX Watermark + assertEquals(Watermark.MAX_WATERMARK, + CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); } /** @@ -710,8 +794,8 @@ public class TimestampITCase { public static class MyTimestampSource implements SourceFunction<Integer> { - private long initialTime; - private int numWatermarks; + private final long initialTime; + private final int numWatermarks; public MyTimestampSource(long initialTime, int numWatermarks) { this.initialTime = initialTime; @@ -730,6 +814,41 @@ public class TimestampITCase { public void cancel() {} } + public static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction { + + private final long initialTime; + private final int numWatermarks; + + private volatile boolean running = true; + + public MyTimestampSourceInfinite(long initialTime, int numWatermarks) { + this.initialTime = initialTime; + this.numWatermarks = numWatermarks; + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 0; i < numWatermarks; i++) { + ctx.collectWithTimestamp(i, initialTime + i); + ctx.emitWatermark(new Watermark(initialTime + i)); + } + + while (running) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void stop() { + running = false; + } + } + public static class MyNonWatermarkingSource implements SourceFunction<Integer> { int numWatermarks; http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index c484eae..46e74e7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -102,7 +102,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } }).when(mockTask).createStateBackend(any(String.class), any(TypeSerializer.class)); } catch (Exception e) { - e.printStackTrace(); + throw new RuntimeException(e.getMessage(), e); } doAnswer(new Answer<Void>() { http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index e23673a..d848d2a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -28,12 +28,9 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.mockito.stubbing.OngoingStubbing; import java.util.concurrent.ConcurrentLinkedQueue; http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala index f700fa3..fddbe00 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -56,8 +56,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", 7)) ctx.collect(("a", 8)) - // so that we get a high final watermark to process the previously sent elements - ctx.collect(("a", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() {} @@ -73,8 +72,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("c", 7)) ctx.collect(("c", 8)) - // so that we get a high final watermark to process the previously sent elements - ctx.collect(("c", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() { @@ -126,8 +124,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", "j", 7)) ctx.collect(("a", "k", 8)) - // so that we get a high final watermark to process the previously sent elements - ctx.collect(("a", "k", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() {} @@ -145,8 +142,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", "x", 6)) ctx.collect(("a", "z", 8)) - // so that we get a high final watermark to process the previously sent elements - ctx.collect(("a", "z", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() {} @@ -208,8 +204,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", "j", 7)) ctx.collect(("a", "k", 8)) - // so that we get a high final watermark to process the previously sent elements - ctx.collect(("a", "k", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() {} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index b1d6367..7833651 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -59,8 +59,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("a", 7)) ctx.collect(("a", 8)) - // so we get a big watermark to trigger processing of the previous elements - ctx.collect(("a", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() { @@ -107,8 +106,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { ctx.collect(("b", 5)) ctx.collect(("a", 5)) - // so we get a big watermark to trigger processing of the previous elements - ctx.collect(("a", 20)) + // source is finite, so it will have an implicit MAX watermark when it finishes } def cancel() {
