http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html index a7a5d9d..c1f15b4 100644 --- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html @@ -30,7 +30,7 @@ limitations under the License. </tr> <tr> <td>Max. number of execution retries</td> - <td>{{ job['execution-config']['max-execution-retries'] === -1 ? 'deactivated' : job['execution-config']['max-execution-retries'] }}</td> + <td>{{ job['execution-config']['restart-strategy'] }}</td> </tr> <tr> <td>Job parallelism</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html index c7dc0bc..17614e2 100644 --- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html @@ -33,7 +33,7 @@ limitations under the License. {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div> <div ng-if="job.duration > -1" title="{{job.duration | humanizeDuration:false}}" class="navbar-info last first">{{job.duration | humanizeDuration:true}}</div> <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div> - <div ng-if="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')" class="navbar-info last first"><span ng-click="stopJob($event)" class="navbar-info-button btn btn-default">Stop</span></div> + <div ng-if="job.isStoppable && job.state=='RUNNING'" class="navbar-info last first"><span ng-click="stopJob($event)" class="navbar-info-button btn btn-default">Stop</span></div> </nav> <nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional"> <ul class="nav nav-tabs"> http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java index 383a0d2..1b2d2a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java @@ -21,5 +21,5 @@ package org.apache.flink.runtime.jobgraph.tasks; */ public interface StoppableTask { /** Called on STOP signal. */ - public void stop(); -} + void stop(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 6a22949..7430115 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -473,24 +473,33 @@ class JobManager( case Some((executionGraph, _)) => try { if (!executionGraph.isStoppable()) { - sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" + - " is not stoppable.")) - } else if(executionGraph.getState() != JobStatus.CREATED - && executionGraph.getState() != JobStatus.RUNNING - && executionGraph.getState() != JobStatus.RESTARTING) { - sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" + - "is not in state CREATED, RUNNING, or RESTARTING.")) + sender ! decorateMessage( + StoppingFailure( + jobID, + new IllegalStateException(s"Job with ID $jobID is not stoppable.")) + ) + } else if (executionGraph.getState() != JobStatus.RUNNING) { + sender ! decorateMessage( + StoppingFailure( + jobID, + new IllegalStateException(s"Job with ID $jobID is in state " + + executionGraph.getState().name() + " but stopping is only allowed in state " + + "RUNNING.")) + ) } else { executionGraph.stop() - sender ! StoppingSuccess(jobID) + sender ! decorateMessage(StoppingSuccess(jobID)) } } catch { - case t: Throwable => sender ! StoppingFailure(jobID, t) + case t: Throwable => sender ! decorateMessage(StoppingFailure(jobID, t)) } case None => log.info(s"No job found with ID $jobID.") - sender ! StoppingFailure(jobID, new IllegalArgumentException("No job found with " + - s"ID $jobID.")) + sender ! decorateMessage( + StoppingFailure( + jobID, + new IllegalArgumentException(s"No job found with ID $jobID.")) + ) } case UpdateTaskExecutionState(taskExecutionState) => http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 12bc426..2f46c83 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -428,8 +428,12 @@ class TaskManager( sender ! decorateMessage(new TaskOperationResult(executionID, true)) } catch { case t: Throwable => - sender ! new TaskOperationResult(executionID, false, + sender ! decorateMessage( + new TaskOperationResult( + executionID, + false, t.getClass().getSimpleName() + ": " + t.getLocalizedMessage()) + ) } } else { log.debug(s"Cannot find task to stop for execution ${executionID})") http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index 3712861..7cc91c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -121,8 +122,13 @@ public class ExecutionGraphSignalsTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); - eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, AkkaUtils.getDefaultTimeout()); + eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); eg.attachJobGraph(ordered); f = eg.getClass().getDeclaredField("state"); http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index 5f9717f..a2e2482 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -270,7 +270,7 @@ public class LocalInputSplitsTest { TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), - jobGraph.getJobConfiguration() + jobGraph.getJobConfiguration(), TIMEOUT, new NoRestartStrategy()); @@ -334,7 +334,7 @@ public class LocalInputSplitsTest { TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), - jobGraph.getJobConfiguration() + jobGraph.getJobConfiguration(), TIMEOUT, new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index a11f65b..9f8c6a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -34,7 +34,7 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T, DataStrea boolean isParallel; public DataStreamSource(StreamExecutionEnvironment environment, - TypeInformation<T> outTypeInfo, StreamSource<T> operator, + TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator, boolean isParallel, String sourceName) { super(environment, new SourceTransformation<T>(sourceName, operator, outTypeInfo, environment.getParallelism())); http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 2008061..9b1c034 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1227,17 +1227,33 @@ public abstract class StreamExecutionEnvironment { boolean isParallel = function instanceof ParallelSourceFunction; clean(function); - StreamSource<OUT> sourceOperator; + StreamSource<OUT, ?> sourceOperator; if (function instanceof StoppableFunction) { - sourceOperator = new StoppableStreamSource<OUT>(function); + sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); } else { - sourceOperator = new StreamSource<OUT>(function); + sourceOperator = new StreamSource<>(function); } return new DataStreamSource<OUT>(this, typeInfo, sourceOperator, isParallel, sourceName); } /** + * Casts the source function into a SourceFunction implementing the StoppableFunction. + * + * This method should only be used if the source function was checked to implement the + * {@link StoppableFunction} interface. + * + * @param sourceFunction Source function to cast + * @param <OUT> Output type of source function + * @param <T> Union type of SourceFunction and StoppableFunction + * @return The casted source function so that it's type implements the StoppableFunction + */ + @SuppressWarnings("unchecked") + private <OUT, T extends SourceFunction<OUT> & StoppableFunction> T cast2StoppableSourceFunction(SourceFunction<OUT> sourceFunction) { + return (T) sourceFunction; + } + + /** * Triggers the program execution. The environment will execute all parts of * the program that have resulted in a "sink" operation. Sink operations are * for example printing results or forwarding them to a message queue. http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java index 3d8190f..927f61f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java @@ -22,30 +22,26 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * {@link StoppableStreamSource} takes a {@link SourceFunction} that implements {@link StoppableFunction}. + * + * @param <OUT> Type of the output elements + * @param <SRC> Type of the source function which has to be stoppable */ -public class StoppableStreamSource<T> extends StreamSource<T> { +public class StoppableStreamSource<OUT, SRC extends SourceFunction<OUT> & StoppableFunction> + extends StreamSource<OUT, SRC> { private static final long serialVersionUID = -4365670858793587337L; /** * Takes a {@link SourceFunction} that implements {@link StoppableFunction}. - * + * * @param sourceFunction * A {@link SourceFunction} that implements {@link StoppableFunction}. - * - * @throw IllegalArgumentException if {@code sourceFunction} does not implement {@link StoppableFunction} */ - public StoppableStreamSource(SourceFunction<T> sourceFunction) { + public StoppableStreamSource(SRC sourceFunction) { super(sourceFunction); - - if (!(sourceFunction instanceof StoppableFunction)) { - throw new IllegalArgumentException( - "The given SourceFunction must implement StoppableFunction."); - } } public void stop() { - ((StoppableFunction) userFunction).stop(); + userFunction.stop(); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 2834912..b0f933a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -31,30 +31,34 @@ import java.util.concurrent.TimeUnit; /** * {@link StreamOperator} for streaming sources. + * + * @param <OUT> Type of the output elements + * @param <SRC> Type of the source function of this stream source operator */ @Internal -public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction<T>> implements StreamOperator<T> { +public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> + implements StreamOperator<OUT> { private static final long serialVersionUID = 1L; - private transient SourceFunction.SourceContext<T> ctx; + private transient SourceFunction.SourceContext<OUT> ctx; - public StreamSource(SourceFunction<T> sourceFunction) { + public StreamSource(SRC sourceFunction) { super(sourceFunction); this.chainingStrategy = ChainingStrategy.HEAD; } - public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception { + public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception { final ExecutionConfig executionConfig = getExecutionConfig(); if (userFunction instanceof EventTimeSourceFunction) { - ctx = new ManualWatermarkContext<T>(lockingObject, collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled()); + ctx = new ManualWatermarkContext<OUT>(lockingObject, collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled()); } else if (executionConfig.getAutoWatermarkInterval() > 0) { - ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig); + ctx = new AutomaticWatermarkContext<OUT>(lockingObject, collector, executionConfig); } else if (executionConfig.areTimestampsEnabled()) { - ctx = new NonWatermarkContext<T>(lockingObject, collector); + ctx = new NonWatermarkContext<OUT>(lockingObject, collector); } else { - ctx = new NonTimestampContext<T>(lockingObject, collector); + ctx = new NonTimestampContext<OUT>(lockingObject, collector); } userFunction.run(ctx); http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java index 529399c..772744e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java @@ -34,7 +34,7 @@ import java.util.Collections; @Internal public class SourceTransformation<T> extends StreamTransformation<T> { - private final StreamSource<T> operator; + private final StreamSource<T, ?> operator; /** * Creates a new {@code SourceTransformation} from the given operator. @@ -46,7 +46,7 @@ public class SourceTransformation<T> extends StreamTransformation<T> { */ public SourceTransformation( String name, - StreamSource<T> operator, + StreamSource<T, ?> operator, TypeInformation<T> outputType, int parallelism) { super(name, outputType, parallelism); @@ -56,7 +56,7 @@ public class SourceTransformation<T> extends StreamTransformation<T> { /** * Returns the {@code StreamSource}, the operator of this {@code SourceTransformation}. */ - public StreamSource<T> getOperator() { + public StreamSource<T, ?> getOperator() { return operator; } http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 61dcf72..44ff957 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; @@ -35,9 +36,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; * synchronized block. * * @param <OUT> Type of the output elements of this source. + * @param <SRC> Type of the source function for the stream source operator + * @param <OP> Type of the stream source operator */ @Internal -public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> { +public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> + extends StreamTask<OUT, OP> { @Override protected void init() { http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java index 7359cb3..5173796 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java @@ -17,17 +17,23 @@ */ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StoppableStreamSource; /** * Stoppable task for executing stoppable streaming sources. + * + * @param <OUT> Type of the produced elements + * @param <SRC> Stoppable source function */ -public class StoppableSourceStreamTask<OUT> extends SourceStreamTask<OUT> implements StoppableTask { +public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction> + extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask { @Override public void stop() { - ((StoppableStreamSource<?>) this.headOperator).stop(); + this.headOperator.stop(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index bb91f2a..b8d57a6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -108,7 +108,7 @@ public class FoldApplyWindowFunctionTest { } }; - SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<Integer>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); + SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 703d9d8..bfb2d34 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -61,11 +61,11 @@ public class SourceStreamTaskTest { */ @Test public void testOpenClose() throws Exception { - final SourceStreamTask<String> sourceTask = new SourceStreamTask<String>(); + final SourceStreamTask<String, SourceFunction<String>, StreamSource<String, SourceFunction<String>>> sourceTask = new SourceStreamTask<>(); final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamSource<String> sourceOperator = new StreamSource<String>(new OpenCloseTestSource()); + StreamSource<String, ?> sourceOperator = new StreamSource<>(new OpenCloseTestSource()); streamConfig.setStreamOperator(sourceOperator); testHarness.invoke(); @@ -82,8 +82,8 @@ public class SourceStreamTaskTest { @Test public void testStop() { - final StoppableSourceStreamTask<Object> sourceTask = new StoppableSourceStreamTask<Object>(); - sourceTask.headOperator = new StoppableStreamSource<Object>(new StoppableSource()); + final StoppableSourceStreamTask<Object, StoppableSource> sourceTask = new StoppableSourceStreamTask<>(); + sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource()); sourceTask.stop(); @@ -115,11 +115,12 @@ public class SourceStreamTaskTest { ExecutorService executor = Executors.newFixedThreadPool(10); try { final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); - final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>(); + final SourceStreamTask<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>, + StreamSource<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>>> sourceTask = new SourceStreamTask<>(); final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); + StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); streamConfig.setStreamOperator(sourceOperator); // prepare the http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index c18d150..58a5113 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -163,7 +163,7 @@ public class StreamTaskTest { // Test operators // ------------------------------------------------------------------------ - public static class SlowlyDeserializingOperator extends StreamSource<Long> { + public static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> { private static final long serialVersionUID = 1L; private volatile boolean canceled = false; http://git-wip-us.apache.org/repos/asf/flink/blob/f60f8fbc/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 43f3795..c7b1dc6 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -149,14 +149,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime-web</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-curator-test</artifactId> <version>${project.version}</version> <scope>test</scope>
