[
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033738#comment-15033738
]
ASF GitHub Bot commented on FLINK-2111:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/750#discussion_r46282619
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class,
FiniteDuration.class })
+public class TaskStopTest {
+ private AbstractInvokable taskMock;
+ private Task task;
+
+ public void doMocking() throws Exception {
+ TaskDeploymentDescriptor tddMock =
mock(TaskDeploymentDescriptor.class);
+ when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+ when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+ when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+ when(tddMock.getTaskName()).thenReturn("taskName");
+
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+ when(tddMock.getInvokableClassName()).thenReturn("className");
+
+ task = new Task(tddMock, mock(MemoryManager.class),
mock(IOManager.class), mock(NetworkEnvironment.class),
+ mock(BroadcastVariableManager.class),
mock(ActorGateway.class), mock(ActorGateway.class),
+ mock(FiniteDuration.class),
mock(LibraryCacheManager.class), mock(FileCache.class),
+ mock(TaskManagerRuntimeInfo.class));
+ Field f = task.getClass().getDeclaredField("invokable");
+ f.setAccessible(true);
+ f.set(task, taskMock);
--- End diff --
Why do you use class fields to pass in parameters like `taskMock`?
> Add "stop" signal to cleanly shutdown streaming jobs
> ----------------------------------------------------
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime, JobManager, Local Runtime,
> Streaming, TaskManager, Webfrontend
> Reporter: Matthias J. Sax
> Assignee: Matthias J. Sax
> Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks
> such that the sources can stop emitting data and shutdown cleanly, resulting
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for
> https://issues.apache.org/jira/browse/FLINK-1929
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)