Repository: flink
Updated Branches:
  refs/heads/master cbdb784dc -> 27fd2493e


[FLINK-4715] Remove superfluous test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2168b65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2168b65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2168b65

Branch: refs/heads/master
Commit: d2168b6557b6e86c52f2e8d8b2caf2934152b58b
Parents: cbdb784
Author: Ufuk Celebi <[email protected]>
Authored: Tue Oct 18 09:50:30 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Oct 27 17:38:44 2016 +0200

----------------------------------------------------------------------
 .../runtime/taskmanager/TaskCancelTest.java     | 258 -------------------
 .../test/cancelling/CancelingTestBase.java      |  54 ++--
 2 files changed, 35 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2168b65/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
deleted file mode 100644
index 64b25e1..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.api.reader.RecordReader;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.types.IntValue;
-
-import org.junit.Test;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-public class TaskCancelTest {
-
-       @Test
-       public void testCancelUnion() throws Exception {
-               // Test config
-               int numberOfSources = 8;
-               int sourceParallelism = 4;
-
-               TestingCluster flink = null;
-
-               try {
-                       // Start a cluster for the given test config
-                       final Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
sourceParallelism);
-                       config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
-
-                       flink = new TestingCluster(config, false);
-                       flink.start();
-
-                       // Setup
-                       final JobGraph jobGraph = new JobGraph("Cancel Big 
Union");
-
-                       JobVertex[] sources = new JobVertex[numberOfSources];
-                       SlotSharingGroup group = new SlotSharingGroup();
-
-                       // Create multiple sources
-                       for (int i = 0; i < sources.length; i++) {
-                               sources[i] = new JobVertex("Source " + i);
-                               
sources[i].setInvokableClass(InfiniteSource.class);
-                               sources[i].setParallelism(sourceParallelism);
-                               sources[i].setSlotSharingGroup(group);
-
-                               jobGraph.addVertex(sources[i]);
-                               group.addVertexToGroup(sources[i].getID());
-                       }
-
-                       // Union all sources
-                       JobVertex union = new JobVertex("Union");
-                       union.setInvokableClass(AgnosticUnion.class);
-                       union.setParallelism(sourceParallelism);
-
-                       jobGraph.addVertex(union);
-
-                       // Each source creates a separate result
-                       for (JobVertex source : sources) {
-                               union.connectNewDataSetAsInput(
-                                               source,
-                                               DistributionPattern.POINTWISE,
-                                               ResultPartitionType.PIPELINED);
-                       }
-
-                       // run the job
-                       flink.submitJobDetached(jobGraph);
-
-                       // Wait for the job to make some progress and then 
cancel
-                       awaitRunning(
-                                       
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       jobGraph.getJobID(),
-                                       TestingUtils.TESTING_DURATION());
-
-                       Thread.sleep(5000);
-
-                       cancelJob(
-                                       
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       jobGraph.getJobID(),
-                                       TestingUtils.TESTING_DURATION());
-
-                       // Wait for the job to be cancelled
-                       
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED,
-                                       
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       TestingUtils.TESTING_DURATION());
-               }
-               finally {
-                       if (flink != null) {
-                               flink.shutdown();
-                       }
-               }
-       }
-
-       // 
---------------------------------------------------------------------------------------------
-
-       /**
-        * Requests the {@link JobManager} to cancel a running job.
-        *
-        * @param jobManager The JobManager actor.
-        * @param jobId The JobID of the job to cancel.
-        * @param timeout Duration in which the JobManager must have responded.
-        */
-       public static void cancelJob(ActorGateway jobManager, JobID jobId, 
FiniteDuration timeout)
-                       throws Exception {
-
-               checkNotNull(jobManager);
-               checkNotNull(jobId);
-               checkNotNull(timeout);
-
-               Future<Object> ask = jobManager.ask(new CancelJob(jobId), 
timeout);
-
-               Object result = Await.result(ask, timeout);
-
-               if (result instanceof CancellationSuccess) {
-                       // Success
-                       CancellationSuccess success = (CancellationSuccess) 
result;
-
-                       if (!success.jobID().equals(jobId)) {
-                               throw new Exception("JobManager responded for 
wrong job ID. Request: "
-                                               + jobId + ", response: " + 
success.jobID() + ".");
-                       }
-               }
-               else if (result instanceof CancellationFailure) {
-                       // Failure
-                       CancellationFailure failure = (CancellationFailure) 
result;
-
-                       throw new Exception("Failed to cancel job with ID " + 
failure.jobID() + ".",
-                                       failure.cause());
-               }
-               else {
-                       throw new Exception("Unexpected response to cancel 
request: " + result);
-               }
-       }
-
-       public static void awaitRunning(ActorGateway jobManager, JobID jobId, 
FiniteDuration timeout)
-                       throws Exception {
-
-               checkNotNull(jobManager);
-               checkNotNull(jobId);
-               checkNotNull(timeout);
-
-               while (true) {
-                       Future<Object> ask = jobManager.ask(
-                                       new RequestJobStatus(jobId),
-                                       timeout);
-
-                       Object result = Await.result(ask, timeout);
-
-                       if (result instanceof CurrentJobStatus) {
-                               // Success
-                               CurrentJobStatus status = (CurrentJobStatus) 
result;
-
-                               if (!status.jobID().equals(jobId)) {
-                                       throw new Exception("JobManager 
responded for wrong job ID. Request: "
-                                                       + jobId + ", response: 
" + status.jobID() + ".");
-                               }
-
-                               if (status.status() == JobStatus.RUNNING) {
-                                       return;
-                               }
-                               else if 
(status.status().isGloballyTerminalState()) {
-                                       throw new Exception("JobStatus changed 
to " + status.status()
-                                                       + " while waiting for 
job to start running.");
-                               }
-                       }
-                       else if (result instanceof JobNotFound) {
-                               // Not found
-                               throw new Exception("Cannot find job with ID " 
+ jobId + ".");
-                       }
-                       else {
-                               throw new Exception("Unexpected response to 
cancel request: " + result);
-                       }
-               }
-
-       }
-
-       // 
---------------------------------------------------------------------------------------------
-
-       public static class InfiniteSource extends AbstractInvokable {
-
-               @Override
-               public void invoke() throws Exception {
-                       RecordWriter<IntValue> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
-
-                       final IntValue val = new IntValue();
-
-                       try {
-                               for (int i = 0; true; i++) {
-                                       if (Thread.interrupted()) {
-                                               return;
-                                       }
-
-                                       val.setValue(i);
-                                       writer.emit(val);
-                               }
-                       }
-                       finally {
-                               writer.clearBuffers();
-                       }
-               }
-       }
-
-       public static class AgnosticUnion extends AbstractInvokable {
-
-               @Override
-               public void invoke() throws Exception {
-                       UnionInputGate union = new 
UnionInputGate(getEnvironment().getAllInputGates());
-                       RecordReader<IntValue> reader = new RecordReader<>(
-                                       union, IntValue.class, 
getEnvironment().getTaskManagerInfo().getTmpDirectories());
-
-                       //noinspection StatementWithEmptyBody
-                       while (reader.next() != null) {}
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d2168b65/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 8a08f15..8d8ee64 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -19,32 +19,36 @@
 
 package org.apache.flink.test.cancelling;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning;
-import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.fs.FileSystem;
-
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+
 /**
  * 
  */
@@ -115,17 +119,30 @@ public abstract class CancelingTestBase extends 
TestLogger {
                        executor.submitJobDetached(jobGraph);
 
                        // Wait for the job to make some progress and then 
cancel
-                       awaitRunning(
+                       
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING,
                                        
executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       jobGraph.getJobID(),
                                        TestingUtils.TESTING_DURATION());
 
                        Thread.sleep(msecsTillCanceling);
 
-                       cancelJob(
-                                       
executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                                       jobGraph.getJobID(),
-                                       new FiniteDuration(maxTimeTillCanceled, 
TimeUnit.MILLISECONDS));
+                       FiniteDuration timeout = new 
FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS);
+
+                       ActorGateway jobManager = 
executor.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+                       Future<Object> ask = jobManager.ask(new 
CancelJob(jobGraph.getJobID()), timeout);
+
+                       Object result = Await.result(ask, timeout);
+
+                       if (result instanceof CancellationSuccess) {
+                               // all good
+                       } else if (result instanceof CancellationFailure) {
+                               // Failure
+                               CancellationFailure failure = 
(CancellationFailure) result;
+                               throw new Exception("Failed to cancel job with 
ID " + failure.jobID() + ".",
+                                               failure.cause());
+                       } else {
+                               throw new Exception("Unexpected response to 
cancel request: " + result);
+                       }
 
                        // Wait for the job to be cancelled
                        
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED,
@@ -137,7 +154,6 @@ public abstract class CancelingTestBase extends TestLogger {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                }
-
        }
 
        private JobGraph getJobGraph(final Plan plan) throws Exception {

Reply via email to