Repository: flink
Updated Branches:
  refs/heads/release-1.0 8949ccf66 -> 43e5975d5


[FLINK-3554] [streaming] Emit a MAX Watermark after finite sources finished

This closes #1750


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e5975d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e5975d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e5975d

Branch: refs/heads/release-1.0
Commit: 43e5975d5426e22eb4ef90e0f468bd7f6cd35736
Parents: 8949ccf
Author: Stephan Ewen <[email protected]>
Authored: Tue Mar 1 14:31:26 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Mar 1 17:04:34 2016 +0100

----------------------------------------------------------------------
 .../util/IncrementalLearningSkeletonData.java   |   4 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |   4 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  37 +++
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../api/operators/StoppableStreamSource.java    |   7 +
 .../streaming/api/operators/StreamSource.java   |  52 +++-
 .../streaming/api/watermark/Watermark.java      |  15 +-
 .../flink/streaming/api/SourceFunctionTest.java |   8 +-
 .../operators/StreamSourceOperatorTest.java     | 251 +++++++++++++++++++
 .../operators/windowing/CoGroupJoinITCase.java  |  15 +-
 .../operators/windowing/WindowFoldITCase.java   |  10 +-
 .../streaming/timestamp/TimestampITCase.java    | 141 ++++++++++-
 .../util/OneInputStreamOperatorTestHarness.java |   2 +-
 .../util/TwoInputStreamOperatorTestHarness.java |   3 -
 .../streaming/api/scala/CoGroupJoinITCase.scala |  15 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |   6 +-
 16 files changed, 511 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
index 1f4bfd8..144af99 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -20,12 +20,12 @@ package org.apache.flink.streaming.examples.ml.util;
 public class IncrementalLearningSkeletonData {
 
        public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" +
-                       "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "0\n" + "0\n" +
+                       "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "0\n" +
                        "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
                        "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
                        "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
                        "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
-                       "0\n" + "0\n" + "0\n" + "0\n";
+                       "0\n" + "0\n" + "0\n" + "0\n" + "0\n";
 
        private IncrementalLearningSkeletonData() {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 4129e34..40c3c86 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -296,9 +296,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                        Tuple2.of(new Event(2, "end", 2.0), 8L),
                        Tuple2.of(new Event(1, "middle", 5.0), 7L),
                        Tuple2.of(new Event(3, "middle", 6.0), 9L),
-                       Tuple2.of(new Event(3, "end", 7.0), 7L),
-                       // last element for high final watermark
-                       Tuple2.of(new Event(3, "end", 7.0), 100L)
+                       Tuple2.of(new Event(3, "end", 7.0), 7L)
                ).assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index c803429..a4c10e7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,16 +18,23 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.util
+
 import akka.actor.{ActorRef, ActorSystem}
+import org.apache.flink.api.common.JobID
 
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.messages.JobManagerMessages
+import org.apache.flink.runtime.messages.JobManagerMessages.{StoppingFailure, 
StoppingResponse, RunningJobsStatus, RunningJobs}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 
+import scala.concurrent.Await
+
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the 
[[JobManager]] in the same
  * JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to 
setup Flink's
@@ -211,4 +218,34 @@ class LocalFlinkMiniCluster(
       JobManager.ARCHIVE_NAME
     }
   }
+  
+  // --------------------------------------------------------------------------
+  //  Actions on running jobs
+  // --------------------------------------------------------------------------
+  
+  def currentlyRunningJobs: Iterable[JobID] = {
+    val leader = getLeaderGateway(timeout)
+    val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, 
timeout)
+                       .mapTo[RunningJobsStatus]
+    Await.result(future, timeout).runningJobs.map(_.getJobId)
+  }
+
+  def getCurrentlyRunningJobsJava(): java.util.List[JobID] = {
+    val list = new java.util.ArrayList[JobID]()
+    currentlyRunningJobs.foreach(list.add)
+    list
+  }
+  
+  def stopJob(id: JobID) : Unit = {
+    val leader = getLeaderGateway(timeout)
+    val response = leader.ask(new JobManagerMessages.StopJob(id), timeout)
+                         .mapTo[StoppingResponse]
+    val rc = Await.result(response, timeout)
+
+    rc match {
+      case failure: StoppingFailure =>
+        throw new Exception(s"Stopping the job with ID $id failed.", 
failure.cause)
+      case _ =>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index ff6593f..3fcc425 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -105,7 +106,7 @@ public class DummyEnvironment implements Environment {
 
        @Override
        public Map<String, Future<Path>> getDistributedCacheEntries() {
-               return null;
+               return Collections.emptyMap();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
index 927f61f..ce8f6cd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
@@ -41,7 +41,14 @@ public class StoppableStreamSource<OUT, SRC extends 
SourceFunction<OUT> & Stoppa
                super(sourceFunction);
        }
 
+       /**
+        * Marks the source a stopped and calls {@link 
StoppableFunction#stop()} on the user function.
+        */
        public void stop() {
+               // important: marking the source as stopped has to happen 
before the function is stopped.
+               // the flag that tracks this status is volatile, so the memory 
model also guarantees
+               // the happens-before relationship
+               markCanceledOrStopped();
                userFunction.stop();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 2470417..84f59ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -42,14 +42,19 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        
        private transient SourceFunction.SourceContext<OUT> ctx;
 
+       private transient volatile boolean canceledOrStopped = false;
+       
+       
        public StreamSource(SRC sourceFunction) {
                super(sourceFunction);
 
                this.chainingStrategy = ChainingStrategy.HEAD;
        }
 
+       
        public void run(final Object lockingObject, final 
Output<StreamRecord<OUT>> collector) throws Exception {
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
+               final SourceFunction.SourceContext<OUT> ctx;
                
                switch (timeCharacteristic) {
                        case EventTime:
@@ -66,19 +71,60 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                throw new 
Exception(String.valueOf(timeCharacteristic));
                }
 
-               userFunction.run(ctx);
-
-               ctx.close();
+               // copy to a field to give the 'cancel()' method access
+               this.ctx = ctx;
+               
+               try {
+                       userFunction.run(ctx);
+
+                       // if we get here, then the user function either exited 
after being done (finite source)
+                       // or the function was canceled or stopped. For the 
finite source case, we should emit
+                       // a final watermark that indicates that we reached the 
end of event-time
+                       if (!isCanceledOrStopped()) {
+                               ctx.emitWatermark(Watermark.MAX_WATERMARK);
+                       }
+               } finally {
+                       // make sure that the context is closed in any case
+                       ctx.close();
+               }
        }
 
        public void cancel() {
+               // important: marking the source as stopped has to happen 
before the function is stopped.
+               // the flag that tracks this status is volatile, so the memory 
model also guarantees
+               // the happens-before relationship
+               markCanceledOrStopped();
                userFunction.cancel();
+               
                // the context may not be initialized if the source was never 
running.
                if (ctx != null) {
                        ctx.close();
                }
        }
+
+       /**
+        * Marks this source as canceled or stopped.
+        * 
+        * <p>This indicates that any exit of the {@link #run(Object, Output)} 
method
+        * cannot be interpreted as the result of a finite source.  
+        */
+       protected void markCanceledOrStopped() {
+               this.canceledOrStopped = true;
+       }
        
+       /**
+        * Checks whether the source has been canceled or stopped. 
+        * @return True, if the source is canceled or stopped, false is not.
+        */
+       protected boolean isCanceledOrStopped() {
+               return canceledOrStopped;
+       }
+
+       /**
+        * Checks whether any asynchronous thread (checkpoint trigger, timer, 
watermark generator, ...)
+        * has caused an exception. If one of these threads caused an 
exception, this method will
+        * throw that exception.
+        */
        void checkAsyncException() {
                getContainingTask().checkTimerException();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
index 1375c0d..cb9eb99 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.watermark;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -38,11 +39,15 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
  * <p>
  * When a source closes it will emit a final watermark with timestamp {@code 
Long.MAX_VALUE}. When
  * an operator receives this it will know that no more input will be arriving 
in the future.
- *
  */
 @PublicEvolving
-public class Watermark extends StreamElement {
+public final class Watermark extends StreamElement {
 
+       /** The watermark that signifies end-of-event-time */
+       public static final Watermark MAX_WATERMARK = new 
Watermark(Long.MAX_VALUE);
+       
+       // 
------------------------------------------------------------------------
+       
        /** The timestamp of the watermark */
        private final long timestamp;
 
@@ -60,6 +65,8 @@ public class Watermark extends StreamElement {
                return timestamp;
        }
 
+       // 
------------------------------------------------------------------------
+       
        @Override
        public boolean equals(Object o) {
                return this == o ||

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index b53649a..4b99202 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -61,9 +61,9 @@ public class SourceFunctionTest {
                assertEquals(expectedList, actualList);
        }
 
-       @Test
-       public void socketTextStreamTest() throws Exception {
-               // TODO: does not work because we cannot set the internal 
socket anymore
+// TODO: does not work because we cannot set the internal socket anymore
+//     @Test
+//     public void socketTextStreamTest() throws Exception {
 //             List<String> expectedList = Arrays.asList("a", "b", "c");
 //             List<String> actualList = new ArrayList<String>();
 //
@@ -80,5 +80,5 @@ public class SourceFunctionTest {
 //                     actualList.add(source.next());
 //             }
 //             assertEquals(expectedList, actualList);
-       }
+//     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
new file mode 100644
index 0000000..b368019
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("serial")
+public class StreamSourceOperatorTest {
+       
+       @Test
+       public void testEmitMaxWatermarkForFiniteSource() throws Exception {
+
+               // regular stream source operator
+               StreamSource<String, FiniteSource<String>> operator = 
+                               new StreamSource<>(new FiniteSource<String>());
+               
+               final List<StreamElement> output = new ArrayList<>();
+               
+               setupSourceOperator(operator);
+               operator.run(new Object(), new CollectorOutput<String>(output));
+               
+               assertEquals(1, output.size());
+               assertEquals(Watermark.MAX_WATERMARK, output.get(0));
+       }
+
+       @Test
+       public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
+
+               final List<StreamElement> output = new ArrayList<>();
+
+               // regular stream source operator
+               final StreamSource<String, InfiniteSource<String>> operator =
+                               new StreamSource<>(new 
InfiniteSource<String>());
+
+
+               setupSourceOperator(operator);
+               operator.cancel();
+
+               // run and exit
+               operator.run(new Object(), new CollectorOutput<String>(output));
+               
+               assertTrue(output.isEmpty());
+       }
+       
+       @Test
+       public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
+
+               final List<StreamElement> output = new ArrayList<>();
+               final Thread runner = Thread.currentThread();
+               
+               // regular stream source operator
+               final StreamSource<String, InfiniteSource<String>> operator =
+                               new StreamSource<>(new 
InfiniteSource<String>());
+
+               
+               setupSourceOperator(operator);
+               
+               // trigger an async cancel in a bit
+               new Thread("canceler") {
+                       @Override
+                       public void run() {
+                               try {
+                                       Thread.sleep(200);
+                               } catch (InterruptedException ignored) {}
+                               operator.cancel();
+                               runner.interrupt();
+                       }
+               }.start();
+               
+               // run and wait to be canceled
+               try {
+                       operator.run(new Object(), new 
CollectorOutput<String>(output));
+               }
+               catch (InterruptedException ignored) {}
+
+               assertTrue(output.isEmpty());
+       }
+
+       @Test
+       public void testNoMaxWatermarkOnImmediateStop() throws Exception {
+
+               final List<StreamElement> output = new ArrayList<>();
+
+               // regular stream source operator
+               final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
+                               new StoppableStreamSource<>(new 
InfiniteSource<String>());
+
+
+               setupSourceOperator(operator);
+               operator.stop();
+
+               // run and stop
+               operator.run(new Object(), new CollectorOutput<String>(output));
+
+               assertTrue(output.isEmpty());
+       }
+
+       @Test
+       public void testNoMaxWatermarkOnAsyncStop() throws Exception {
+
+               final List<StreamElement> output = new ArrayList<>();
+
+               // regular stream source operator
+               final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
+                               new StoppableStreamSource<>(new 
InfiniteSource<String>());
+
+
+               setupSourceOperator(operator);
+
+               // trigger an async cancel in a bit
+               new Thread("canceler") {
+                       @Override
+                       public void run() {
+                               try {
+                                       Thread.sleep(200);
+                               } catch (InterruptedException ignored) {}
+                               operator.stop();
+                       }
+               }.start();
+
+               // run and wait to be stopped
+               operator.run(new Object(), new CollectorOutput<String>(output));
+
+               assertTrue(output.isEmpty());
+       }
+       
+       
+       // 
------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       private static <T> void setupSourceOperator(StreamSource<T, ?> 
operator) {
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               
+               cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               Environment env = new DummyEnvironment("MockTwoInputTask", 1, 
0);
+               
+               StreamTask<?, ?> mockTask = mock(StreamTask.class);
+               when(mockTask.getName()).thenReturn("Mock Task");
+               when(mockTask.getCheckpointLock()).thenReturn(new Object());
+               when(mockTask.getConfiguration()).thenReturn(cfg);
+               when(mockTask.getEnvironment()).thenReturn(env);
+               when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+               
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, 
Accumulator<?, ?>>emptyMap());
+
+               operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) 
mock(Output.class));
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       private static final class FiniteSource<T> implements 
SourceFunction<T>, StoppableFunction {
+
+               @Override
+               public void run(SourceContext<T> ctx) {}
+
+               @Override
+               public void cancel() {}
+
+               @Override
+               public void stop() {}
+       }
+
+       private static final class InfiniteSource<T> implements 
SourceFunction<T>, StoppableFunction {
+
+               private volatile boolean running = true;
+               
+               @Override
+               public void run(SourceContext<T> ctx) throws Exception {
+                       while (running) {
+                               Thread.sleep(20);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               @Override
+               public void stop() {
+                       running = false;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class CollectorOutput<T> implements 
Output<StreamRecord<T>> {
+
+               private final List<StreamElement> list;
+
+               private CollectorOutput(List<StreamElement> list) {
+                       this.list = list;
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       list.add(mark);
+               }
+
+               @Override
+               public void collect(StreamRecord<T> record) {
+                       list.add(record);
+               }
+
+               @Override
+               public void close() {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
index c40874c..5e67c72 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -74,8 +74,7 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple2.of("a", 7));
                                ctx.collect(Tuple2.of("a", 8));
 
-                               // so we get a final big watermark
-                               ctx.collect(Tuple2.of("a", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override
@@ -96,8 +95,7 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple2.of("c", 7));
                                ctx.collect(Tuple2.of("c", 8));
 
-                               // so we get a final big watermark
-                               ctx.collect(Tuple2.of("a", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override
@@ -172,8 +170,7 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple3.of("a", "j", 7));
                                ctx.collect(Tuple3.of("a", "k", 8));
 
-                               // so we get a final big watermark
-                               ctx.collect(Tuple3.of("a", "k", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override
@@ -194,8 +191,7 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple3.of("a", "x", 6));
                                ctx.collect(Tuple3.of("a", "z", 8));
 
-                               // so we get a final high watermark
-                               ctx.collect(Tuple3.of("a", "z", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override
@@ -272,8 +268,7 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple3.of("a", "j", 7));
                                ctx.collect(Tuple3.of("a", "k", 8));
 
-                               // so we get a final high watermark
-                               ctx.collect(Tuple3.of("a", "k", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index 3b859f0..fbe03c5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -74,13 +74,12 @@ public class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple2.of("a", 7));
                                ctx.collect(Tuple2.of("a", 8));
 
-                               // so that we get a high final watermark to 
process the previously sent elements
-                               ctx.collect(Tuple2.of("a", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override
-                       public void cancel() {
-                       }
+                       public void cancel() {}
+                       
                }).assignTimestampsAndWatermarks(new 
Tuple2TimestampExtractor());
 
                source1
@@ -139,8 +138,7 @@ public class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
                                ctx.collect(Tuple2.of("b", 5));
                                ctx.collect(Tuple2.of("a", 5));
 
-                               // so that we get a high final watermark to 
process the previously sent elements
-                               ctx.collect(Tuple2.of("a", 20));
+                               // source is finite, so it will have an 
implicit MAX watermark when it finishes
                        }
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index d918ba8..1a59ab3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.streaming.timestamp;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -52,7 +54,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -156,14 +159,88 @@ public class TimestampITCase {
                                                
System.err.println(CustomOperator.finalWatermarks[i].get(k));
                                        }
 
-                                       Assert.fail("Wrong watermark.");
+                                       fail("Wrong watermark.");
                                }
                        }
-                       
assertFalse(CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1).equals(new
 Watermark(Long.MAX_VALUE)));
+                       
+                       assertEquals(Watermark.MAX_WATERMARK,
+                                       
CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
                }
        }
 
+       @Test
+       public void testWatermarkPropagationNoFinalWatermarkOnStop() throws 
Exception {
+               
+               // for this test to work, we need to be sure that no other jobs 
are being executed
+               while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+                       Thread.sleep(100);
+               }
+               
+               final int NUM_WATERMARKS = 10;
+
+               long initialTime = 0L;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                               "localhost", cluster.getLeaderRPCPort());
+
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+
+               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS));
+               DataStream<Integer> source2 = env.addSource(new 
MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2));
+
+               source1.union(source2)
+                               .map(new IdentityMap())
+                               .connect(source2).map(new IdentityCoMap())
+                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
+                               .addSink(new NoOpSink<Integer>());
 
+               new Thread("stopper") {
+                       @Override
+                       public void run() {
+                               try {
+                                       // try until we get the running jobs
+                                       List<JobID> running;
+                                       while ((running = 
cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
+                                               Thread.sleep(100);
+                                       }
+
+                                       JobID id = running.get(0);
+                                       
+                                       // send stop until the job is stopped
+                                       do {
+                                               cluster.stopJob(id);
+                                               Thread.sleep(50);
+                                       } while 
(!cluster.getCurrentlyRunningJobsJava().isEmpty());
+                               }
+                               catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+               }.start();
+               
+               env.execute();
+
+               // verify that all the watermarks arrived at the final custom 
operator
+               for (int i = 0; i < PARALLELISM; i++) {
+                       // we are only guaranteed to see NUM_WATERMARKS / 2 
watermarks because the
+                       // other source stops emitting after that
+                       for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
+                               if 
(!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + 
j))) {
+                                       System.err.println("All Watermarks: ");
+                                       for (int k = 0; k <= NUM_WATERMARKS / 
2; k++) {
+                                               
System.err.println(CustomOperator.finalWatermarks[i].get(k));
+                                       }
+
+                                       fail("Wrong watermark.");
+                               }
+                       }
+                       
+                       assertNotEquals(Watermark.MAX_WATERMARK,
+                                       
CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+               }
+       }
 
        /**
         * These check whether timestamps are properly assigned at the sources 
and handled in
@@ -200,8 +277,7 @@ public class TimestampITCase {
        @Test
        public void testDisabledTimestamps() throws Exception {
                final int NUM_ELEMENTS = 10;
-
-
+               
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
                                "localhost", cluster.getLeaderRPCPort());
                
@@ -222,7 +298,7 @@ public class TimestampITCase {
        }
 
        /**
-        * This thests whether timestamps are properly extracted in the 
timestamp
+        * This tests whether timestamps are properly extracted in the timestamp
         * extractor and whether watermarks are also correctly forwared from 
this with the auto watermark
         * interval.
         */
@@ -279,7 +355,10 @@ public class TimestampITCase {
                                Assert.fail("Wrong watermark. Expected: " + j + 
" Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
                        }
                }
-               
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new
 Watermark(Long.MAX_VALUE)));
+               
+               // the input is finite, so it should have a MAX Watermark
+               assertEquals(Watermark.MAX_WATERMARK, 
+                               
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() 
- 1));
        }
 
        /**
@@ -339,7 +418,10 @@ public class TimestampITCase {
                                Assert.fail("Wrong watermark.");
                        }
                }
-               
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new
 Watermark(Long.MAX_VALUE)));
+
+               // the input is finite, so it should have a MAX Watermark
+               assertEquals(Watermark.MAX_WATERMARK,
+                               
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() 
- 1));
        }
 
        /**
@@ -400,7 +482,9 @@ public class TimestampITCase {
                                Assert.fail("Wrong watermark.");
                        }
                }
-               
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new
 Watermark(Long.MAX_VALUE)));
+               // the input is finite, so it should have a MAX Watermark
+               assertEquals(Watermark.MAX_WATERMARK,
+                               
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() 
- 1));
        }
 
        /**
@@ -710,8 +794,8 @@ public class TimestampITCase {
 
        public static class MyTimestampSource implements 
SourceFunction<Integer> {
 
-               private long initialTime;
-               private int numWatermarks;
+               private final long initialTime;
+               private final int numWatermarks;
 
                public MyTimestampSource(long initialTime, int numWatermarks) {
                        this.initialTime = initialTime;
@@ -730,6 +814,41 @@ public class TimestampITCase {
                public void cancel() {}
        }
 
+       public static class MyTimestampSourceInfinite implements 
SourceFunction<Integer>, StoppableFunction {
+
+               private final long initialTime;
+               private final int numWatermarks;
+
+               private volatile boolean running = true;
+               
+               public MyTimestampSourceInfinite(long initialTime, int 
numWatermarks) {
+                       this.initialTime = initialTime;
+                       this.numWatermarks = numWatermarks;
+               }
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+                       for (int i = 0; i < numWatermarks; i++) {
+                               ctx.collectWithTimestamp(i, initialTime + i);
+                               ctx.emitWatermark(new Watermark(initialTime + 
i));
+                       }
+                       
+                       while (running) {
+                               Thread.sleep(20);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               @Override
+               public void stop() {
+                       running = false;
+               }
+       }
+
        public static class MyNonWatermarkingSource implements 
SourceFunction<Integer> {
 
                int numWatermarks;

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c484eae..46e74e7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -102,7 +102,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                                }
                        }).when(mockTask).createStateBackend(any(String.class), 
any(TypeSerializer.class));
                } catch (Exception e) {
-                       e.printStackTrace();
+                       throw new RuntimeException(e.getMessage(), e);
                }
                
                doAnswer(new Answer<Void>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index e23673a..d848d2a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -28,12 +28,9 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.stubbing.OngoingStubbing;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index f700fa3..fddbe00 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -56,8 +56,7 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("a", 7))
         ctx.collect(("a", 8))
 
-        // so that we get a high final watermark to process the previously 
sent elements
-        ctx.collect(("a", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {}
@@ -73,8 +72,7 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("c", 7))
         ctx.collect(("c", 8))
 
-        // so that we get a high final watermark to process the previously 
sent elements
-        ctx.collect(("c", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {
@@ -126,8 +124,7 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "j", 7))
         ctx.collect(("a", "k", 8))
 
-        // so that we get a high final watermark to process the previously 
sent elements
-        ctx.collect(("a", "k", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {}
@@ -145,8 +142,7 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "x", 6))
         ctx.collect(("a", "z", 8))
 
-        // so that we get a high final watermark to process the previously 
sent elements
-        ctx.collect(("a", "z", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {}
@@ -208,8 +204,7 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("a", "j", 7))
         ctx.collect(("a", "k", 8))
 
-        // so that we get a high final watermark to process the previously 
sent elements
-        ctx.collect(("a", "k", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5975d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index b1d6367..7833651 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -59,8 +59,7 @@ class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("a", 7))
         ctx.collect(("a", 8))
 
-        // so we get a big watermark to trigger processing of the previous 
elements
-        ctx.collect(("a", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {
@@ -107,8 +106,7 @@ class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
         ctx.collect(("b", 5))
         ctx.collect(("a", 5))
 
-        // so we get a big watermark to trigger processing of the previous 
elements
-        ctx.collect(("a", 20))
+        // source is finite, so it will have an implicit MAX watermark when it 
finishes
       }
 
       def cancel() {

Reply via email to