http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
index a7a5d9d..c1f15b4 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
@@ -30,7 +30,7 @@ limitations under the License.
     </tr>
     <tr>
       <td>Max. number of execution retries</td>
-      <td>{{ job['execution-config']['max-execution-retries'] === -1 ? 
'deactivated' : job['execution-config']['max-execution-retries'] }}</td>
+      <td>{{ job['execution-config']['restart-strategy'] }}</td>
     </tr>
     <tr>
       <td>Job parallelism</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
index c7dc0bc..17614e2 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
@@ -33,7 +33,7 @@ limitations under the License.
       {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
   <div ng-if="job.duration &gt; -1" title="{{job.duration | 
humanizeDuration:false}}" class="navbar-info last first">{{job.duration | 
humanizeDuration:true}}</div>
   <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || 
job.state=='RESTARTING'" class="navbar-info last first"><span 
ng-click="cancelJob($event)" class="navbar-info-button btn 
btn-default">Cancel</span></div>
-  <div ng-if="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' 
|| job.state=='RESTARTING')" class="navbar-info last first"><span 
ng-click="stopJob($event)" class="navbar-info-button btn 
btn-default">Stop</span></div>
+  <div ng-if="job.isStoppable && job.state=='RUNNING'" class="navbar-info last 
first"><span ng-click="stopJob($event)" class="navbar-info-button btn 
btn-default">Stop</span></div>
 </nav>
 <nav ng-if="job" class="navbar navbar-default navbar-fixed-top 
navbar-main-additional">
   <ul class="nav nav-tabs">

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
index 383a0d2..1b2d2a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
@@ -21,5 +21,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
  */
 public interface StoppableTask {
        /** Called on STOP signal. */
-       public void stop();
-}
+       void stop();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6a22949..7430115 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -473,24 +473,33 @@ class JobManager(
         case Some((executionGraph, _)) =>
           try {
             if (!executionGraph.isStoppable()) {
-              sender ! StoppingFailure(jobID, new IllegalStateException(s"Job 
with ID $jobID" +
-                " is not stoppable."))
-            } else if(executionGraph.getState() != JobStatus.CREATED
-                && executionGraph.getState() != JobStatus.RUNNING
-                && executionGraph.getState() != JobStatus.RESTARTING) {
-              sender ! StoppingFailure(jobID, new IllegalStateException(s"Job 
with ID $jobID" +
-                "is not in state CREATED, RUNNING, or RESTARTING."))
+              sender ! decorateMessage(
+                StoppingFailure(
+                  jobID,
+                  new IllegalStateException(s"Job with ID $jobID is not 
stoppable."))
+              )
+            } else if (executionGraph.getState() != JobStatus.RUNNING) {
+              sender ! decorateMessage(
+                StoppingFailure(
+                  jobID,
+                  new IllegalStateException(s"Job with ID $jobID is in state " 
+
+                    executionGraph.getState().name() + " but stopping is only 
allowed in state " +
+                    "RUNNING."))
+              )
             } else {
               executionGraph.stop()
-              sender ! StoppingSuccess(jobID)
+              sender ! decorateMessage(StoppingSuccess(jobID))
             }
           } catch {
-            case t: Throwable =>  sender ! StoppingFailure(jobID, t)
+            case t: Throwable =>  sender ! 
decorateMessage(StoppingFailure(jobID, t))
           }
         case None =>
           log.info(s"No job found with ID $jobID.")
-          sender ! StoppingFailure(jobID, new IllegalArgumentException("No job 
found with " +
-            s"ID $jobID."))
+          sender ! decorateMessage(
+            StoppingFailure(
+              jobID,
+              new IllegalArgumentException(s"No job found with ID $jobID."))
+          )
       }
 
     case UpdateTaskExecutionState(taskExecutionState) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 12bc426..2f46c83 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -428,8 +428,12 @@ class TaskManager(
               sender ! decorateMessage(new TaskOperationResult(executionID, 
true))
             } catch {
               case t: Throwable =>
-                        sender ! new TaskOperationResult(executionID, false,
+                        sender ! decorateMessage(
+                          new TaskOperationResult(
+                            executionID,
+                            false,
                             t.getClass().getSimpleName() + ": " + 
t.getLocalizedMessage())
+                        )
             }
           } else {
             log.debug(s"Cannot find task to stop for execution 
${executionID})")

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 3712861..7cc91c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -121,8 +122,13 @@ public class ExecutionGraphSignalsTest {
 
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-               eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(), 
jobId, jobName,
-                               cfg, AkkaUtils.getDefaultTimeout());
+               eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutionContext(),
+                       jobId,
+                       jobName,
+                       cfg,
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                eg.attachJobGraph(ordered);
 
                f = eg.getClass().getDeclaredField("state");

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 5f9717f..a2e2482 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -270,7 +270,7 @@ public class LocalInputSplitsTest {
                                TestingUtils.defaultExecutionContext(), 
                                jobGraph.getJobID(),
                                jobGraph.getName(),  
-                               jobGraph.getJobConfiguration()
+                               jobGraph.getJobConfiguration(),
                                TIMEOUT,
                                new NoRestartStrategy());
                        
@@ -334,7 +334,7 @@ public class LocalInputSplitsTest {
                        TestingUtils.defaultExecutionContext(),
                        jobGraph.getJobID(),
                        jobGraph.getName(),  
-                       jobGraph.getJobConfiguration()
+                       jobGraph.getJobConfiguration(),
                        TIMEOUT,
                        new NoRestartStrategy());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index a11f65b..9f8c6a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -34,7 +34,7 @@ public class DataStreamSource<T> extends 
SingleOutputStreamOperator<T, DataStrea
        boolean isParallel;
 
        public DataStreamSource(StreamExecutionEnvironment environment,
-                       TypeInformation<T> outTypeInfo, StreamSource<T> 
operator,
+                       TypeInformation<T> outTypeInfo, StreamSource<T, ?> 
operator,
                        boolean isParallel, String sourceName) {
                super(environment, new SourceTransformation<T>(sourceName, 
operator, outTypeInfo, environment.getParallelism()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2008061..9b1c034 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1227,17 +1227,33 @@ public abstract class StreamExecutionEnvironment {
                boolean isParallel = function instanceof ParallelSourceFunction;
 
                clean(function);
-               StreamSource<OUT> sourceOperator;
+               StreamSource<OUT, ?> sourceOperator;
                if (function instanceof StoppableFunction) {
-                       sourceOperator = new 
StoppableStreamSource<OUT>(function);
+                       sourceOperator = new 
StoppableStreamSource<>(cast2StoppableSourceFunction(function));
                } else {
-                       sourceOperator = new StreamSource<OUT>(function);
+                       sourceOperator = new StreamSource<>(function);
                }
 
                return new DataStreamSource<OUT>(this, typeInfo, 
sourceOperator, isParallel, sourceName);
        }
 
        /**
+        * Casts the source function into a SourceFunction implementing the 
StoppableFunction.
+        *
+        * This method should only be used if the source function was checked 
to implement the
+        * {@link StoppableFunction} interface.
+        *
+        * @param sourceFunction Source function to cast
+        * @param <OUT> Output type of source function
+        * @param <T> Union type of SourceFunction and StoppableFunction
+        * @return The casted source function so that it's type implements the 
StoppableFunction
+        */
+       @SuppressWarnings("unchecked")
+       private <OUT, T extends SourceFunction<OUT> & StoppableFunction> T 
cast2StoppableSourceFunction(SourceFunction<OUT> sourceFunction) {
+               return (T) sourceFunction;
+       }
+
+       /**
         * Triggers the program execution. The environment will execute all 
parts of
         * the program that have resulted in a "sink" operation. Sink 
operations are
         * for example printing results or forwarding them to a message queue.

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
index 3d8190f..927f61f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
@@ -22,30 +22,26 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 /**
  * {@link StoppableStreamSource} takes a {@link SourceFunction} that 
implements {@link StoppableFunction}.
+ *
+ * @param <OUT> Type of the output elements
+ * @param <SRC> Type of the source function which has to be stoppable
  */
-public class StoppableStreamSource<T> extends StreamSource<T> {
+public class StoppableStreamSource<OUT, SRC extends SourceFunction<OUT> & 
StoppableFunction>
+       extends StreamSource<OUT, SRC> {
 
        private static final long serialVersionUID = -4365670858793587337L;
 
        /**
         * Takes a {@link SourceFunction} that implements {@link 
StoppableFunction}.
-        * 
+        *
         * @param sourceFunction
         *            A {@link SourceFunction} that implements {@link 
StoppableFunction}.
-        * 
-        * @throw IllegalArgumentException if {@code sourceFunction} does not 
implement {@link StoppableFunction}
         */
-       public StoppableStreamSource(SourceFunction<T> sourceFunction) {
+       public StoppableStreamSource(SRC sourceFunction) {
                super(sourceFunction);
-
-               if (!(sourceFunction instanceof StoppableFunction)) {
-                       throw new IllegalArgumentException(
-                                       "The given SourceFunction must 
implement StoppableFunction.");
-               }
        }
 
        public void stop() {
-               ((StoppableFunction) userFunction).stop();
+               userFunction.stop();
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 2834912..b0f933a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -31,30 +31,34 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * {@link StreamOperator} for streaming sources.
+ *
+ * @param <OUT> Type of the output elements
+ * @param <SRC> Type of the source function of this stream source operator
  */
 @Internal
-public class StreamSource<T> extends AbstractUdfStreamOperator<T, 
SourceFunction<T>> implements StreamOperator<T> {
+public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends 
AbstractUdfStreamOperator<OUT, SRC>
+       implements StreamOperator<OUT> {
 
        private static final long serialVersionUID = 1L;
-       private transient SourceFunction.SourceContext<T> ctx;
+       private transient SourceFunction.SourceContext<OUT> ctx;
 
-       public StreamSource(SourceFunction<T> sourceFunction) {
+       public StreamSource(SRC sourceFunction) {
                super(sourceFunction);
 
                this.chainingStrategy = ChainingStrategy.HEAD;
        }
 
-       public void run(final Object lockingObject, final 
Output<StreamRecord<T>> collector) throws Exception {
+       public void run(final Object lockingObject, final 
Output<StreamRecord<OUT>> collector) throws Exception {
                final ExecutionConfig executionConfig = getExecutionConfig();
                
                if (userFunction instanceof EventTimeSourceFunction) {
-                       ctx = new ManualWatermarkContext<T>(lockingObject, 
collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled());
+                       ctx = new ManualWatermarkContext<OUT>(lockingObject, 
collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled());
                } else if (executionConfig.getAutoWatermarkInterval() > 0) {
-                       ctx = new AutomaticWatermarkContext<T>(lockingObject, 
collector, executionConfig);
+                       ctx = new AutomaticWatermarkContext<OUT>(lockingObject, 
collector, executionConfig);
                } else if (executionConfig.areTimestampsEnabled()) {
-                       ctx = new NonWatermarkContext<T>(lockingObject, 
collector);
+                       ctx = new NonWatermarkContext<OUT>(lockingObject, 
collector);
                } else {
-                       ctx = new NonTimestampContext<T>(lockingObject, 
collector);
+                       ctx = new NonTimestampContext<OUT>(lockingObject, 
collector);
                }
 
                userFunction.run(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index 529399c..772744e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -34,7 +34,7 @@ import java.util.Collections;
 @Internal
 public class SourceTransformation<T> extends StreamTransformation<T> {
 
-       private final StreamSource<T> operator;
+       private final StreamSource<T, ?> operator;
 
        /**
         * Creates a new {@code SourceTransformation} from the given operator.
@@ -46,7 +46,7 @@ public class SourceTransformation<T> extends 
StreamTransformation<T> {
         */
        public SourceTransformation(
                        String name,
-                       StreamSource<T> operator,
+                       StreamSource<T, ?> operator,
                        TypeInformation<T> outputType,
                        int parallelism) {
                super(name, outputType, parallelism);
@@ -56,7 +56,7 @@ public class SourceTransformation<T> extends 
StreamTransformation<T> {
        /**
         * Returns the {@code StreamSource}, the operator of this {@code 
SourceTransformation}.
         */
-       public StreamSource<T> getOperator() {
+       public StreamSource<T, ?> getOperator() {
                return operator;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 61dcf72..44ff957 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -35,9 +36,12 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  * synchronized block.
  *
  * @param <OUT> Type of the output elements of this source.
+ * @param <SRC> Type of the source function for the stream source operator
+ * @param <OP> Type of the stream source operator
  */
 @Internal
-public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
+public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends 
StreamSource<OUT, SRC>>
+       extends StreamTask<OUT, OP> {
 
        @Override
        protected void init() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
index 7359cb3..5173796 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -17,17 +17,23 @@
  */
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 
 /**
  * Stoppable task for executing stoppable streaming sources.
+ *
+ * @param <OUT> Type of the produced elements
+ * @param <SRC> Stoppable source function
  */
-public class StoppableSourceStreamTask<OUT> extends SourceStreamTask<OUT> 
implements StoppableTask {
+public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & 
StoppableFunction>
+       extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> 
implements StoppableTask {
 
        @Override
        public void stop() {
-               ((StoppableStreamSource<?>) this.headOperator).stop();
+               this.headOperator.stop();
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index bb91f2a..b8d57a6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -108,7 +108,7 @@ public class FoldApplyWindowFunctionTest {
                        }
                };
 
-               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<Integer>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
+               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
 
                transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 703d9d8..bfb2d34 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -61,11 +61,11 @@ public class SourceStreamTaskTest {
         */
        @Test
        public void testOpenClose() throws Exception {
-               final SourceStreamTask<String> sourceTask = new 
SourceStreamTask<String>();
+               final SourceStreamTask<String, SourceFunction<String>, 
StreamSource<String, SourceFunction<String>>> sourceTask = new 
SourceStreamTask<>();
                final StreamTaskTestHarness<String> testHarness = new 
StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               StreamSource<String> sourceOperator = new 
StreamSource<String>(new OpenCloseTestSource());
+               StreamSource<String, ?> sourceOperator = new StreamSource<>(new 
OpenCloseTestSource());
                streamConfig.setStreamOperator(sourceOperator);
 
                testHarness.invoke();
@@ -82,8 +82,8 @@ public class SourceStreamTaskTest {
 
        @Test
        public void testStop() {
-               final StoppableSourceStreamTask<Object> sourceTask = new 
StoppableSourceStreamTask<Object>();
-               sourceTask.headOperator = new StoppableStreamSource<Object>(new 
StoppableSource());
+               final StoppableSourceStreamTask<Object, StoppableSource> 
sourceTask = new StoppableSourceStreamTask<>();
+               sourceTask.headOperator = new StoppableStreamSource<>(new 
StoppableSource());
 
                sourceTask.stop();
 
@@ -115,11 +115,12 @@ public class SourceStreamTaskTest {
                ExecutorService executor = Executors.newFixedThreadPool(10);
                try {
                        final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = 
new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
-                       final SourceStreamTask<Tuple2<Long, Integer>> 
sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
+                       final SourceStreamTask<Tuple2<Long, Integer>, 
SourceFunction<Tuple2<Long, Integer>>,
+                               StreamSource<Tuple2<Long, Integer>, 
SourceFunction<Tuple2<Long, Integer>>>> sourceTask = new SourceStreamTask<>();
                        final StreamTaskTestHarness<Tuple2<Long, Integer>> 
testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, 
typeInfo);
 
                        StreamConfig streamConfig = 
testHarness.getStreamConfig();
-                       StreamSource<Tuple2<Long, Integer>> sourceOperator = 
new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, 
SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
+                       StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = 
new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, 
SOURCE_READ_DELAY));
                        streamConfig.setStreamOperator(sourceOperator);
 
                        // prepare the

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index c18d150..58a5113 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -163,7 +163,7 @@ public class StreamTaskTest {
        //  Test operators
        // 
------------------------------------------------------------------------
        
-       public static class SlowlyDeserializingOperator extends 
StreamSource<Long> {
+       public static class SlowlyDeserializingOperator extends 
StreamSource<Long, SourceFunction<Long>> {
                private static final long serialVersionUID = 1L;
 
                private volatile boolean canceled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 43f3795..c7b1dc6 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -149,14 +149,6 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime-web</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-shaded-curator-test</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

Reply via email to