Repository: flink
Updated Branches:
  refs/heads/master 1d53a40a6 -> 578e80e3c


[FLINK-4525] [core] Drop special cases for 'StrictlyLocalAssignment' and 
'PredeterminedAssignment'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/578e80e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/578e80e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/578e80e3

Branch: refs/heads/master
Commit: 578e80e3c161601d22760ef2ea0e52e6ae963786
Parents: 1d53a40
Author: Stephan Ewen <[email protected]>
Authored: Sat Aug 27 15:23:38 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Aug 29 17:07:55 2016 +0200

----------------------------------------------------------------------
 .../api/common/io/StrictlyLocalAssignment.java  |  24 -
 .../executiongraph/ExecutionJobVertex.java      | 104 +----
 .../executiongraph/LocalInputSplitsTest.java    | 436 -------------------
 3 files changed, 8 insertions(+), 556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
deleted file mode 100644
index e20107b..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.io;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-@PublicEvolving
-public interface StrictlyLocalAssignment {}

http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7b28b31..6272151 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -30,7 +29,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -81,11 +79,9 @@ public class ExecutionJobVertex {
        private final CoLocationGroup coLocationGroup;
        
        private final InputSplit[] inputSplits;
-       
-       private List<LocatableInputSplit>[] inputSplitsPerSubtask;
-       
+
        private InputSplitAssigner splitAssigner;
-       
+
        public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
                                                        int defaultParallelism, 
FiniteDuration timeout) throws JobException {
                this(graph, jobVertex, defaultParallelism, timeout, 
System.currentTimeMillis());
@@ -155,12 +151,7 @@ public class ExecutionJobVertex {
                                inputSplits = 
splitSource.createInputSplits(numTaskVertices);
                                
                                if (inputSplits != null) {
-                                       if (splitSource instanceof 
StrictlyLocalAssignment) {
-                                               inputSplitsPerSubtask = 
computeLocalInputSplitsPerTask(inputSplits);
-                                               splitAssigner = new 
PredeterminedInputSplitAssigner(inputSplitsPerSubtask);
-                                       } else {
-                                               splitAssigner = 
splitSource.getInputSplitAssigner(inputSplits);
-                                       }
+                                       splitAssigner = 
splitSource.getInputSplitAssigner(inputSplits);
                                }
                        }
                        else {
@@ -278,48 +269,8 @@ public class ExecutionJobVertex {
        
//---------------------------------------------------------------------------------------------
        
        public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {
-               
                ExecutionVertex[] vertices = this.taskVertices;
-               
-               // check if we need to do pre-assignment of tasks
-               if (inputSplitsPerSubtask != null) {
-               
-                       final Map<String, List<Instance>> instances = 
scheduler.getInstancesByHost();
-                       final Map<String, Integer> assignments = new 
HashMap<String, Integer>();
-                       
-                       for (int i = 0; i < vertices.length; i++) {
-                               List<LocatableInputSplit> splitsForHost = 
inputSplitsPerSubtask[i];
-                               if (splitsForHost == null || 
splitsForHost.isEmpty()) {
-                                       continue;
-                               }
-                               
-                               String[] hostNames = 
splitsForHost.get(0).getHostnames();
-                               if (hostNames == null || hostNames.length == 0 
|| hostNames[0] == null) {
-                                       continue;
-                               }
-                               
-                               String host = hostNames[0];
-                               ExecutionVertex v = vertices[i];
-                               
-                               List<Instance> instancesOnHost = 
instances.get(host);
-                               
-                               if (instancesOnHost == null || 
instancesOnHost.isEmpty()) {
-                                       throw new 
NoResourceAvailableException("Cannot schedule a strictly local task to host " + 
host
-                                                       + ". No TaskManager 
available on that host.");
-                               }
-                               
-                               Integer pos = assignments.get(host);
-                               if (pos == null) {
-                                       pos = 0;
-                                       assignments.put(host, 0);
-                               } else {
-                                       assignments.put(host, (pos + 1) % 
instancesOnHost.size());
-                               }
-                               
-                               
v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos)));
-                       }
-               }
-               
+
                // kick off the tasks
                for (ExecutionVertex ev : vertices) {
                        ev.scheduleForExecution(scheduler, queued);
@@ -374,17 +325,10 @@ public class ExecutionJobVertex {
                        // set up the input splits again
                        try {
                                if (this.inputSplits != null) {
-                                       
-                                       if (inputSplitsPerSubtask == null) {
-                                               // lazy assignment
-                                               @SuppressWarnings("unchecked")
-                                               InputSplitSource<InputSplit> 
splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
-                                               this.splitAssigner = 
splitSource.getInputSplitAssigner(this.inputSplits);
-                                       }
-                                       else {
-                                               // eager assignment
-                                               //TODO: this.splitAssigner = 
new AssignBasedOnPreAssignment();
-                                       }
+                                       // lazy assignment
+                                       @SuppressWarnings("unchecked")
+                                       InputSplitSource<InputSplit> 
splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+                                       this.splitAssigner = 
splitSource.getInputSplitAssigner(this.inputSplits);
                                }
                        }
                        catch (Throwable t) {
@@ -426,7 +370,6 @@ public class ExecutionJobVertex {
                                inputSplits[i] = null;
                        }
                }
-               inputSplitsPerSubtask = null;
        }
        
        
//---------------------------------------------------------------------------------------------
@@ -628,37 +571,6 @@ public class ExecutionJobVertex {
                
                return subTaskSplitAssignment;
        }
-       
-
-       /**
-        * An InputSplitAssigner that assigns to pre-determined hosts.
-        */
-       public static class PredeterminedInputSplitAssigner implements 
InputSplitAssigner {
-
-               private List<LocatableInputSplit>[] inputSplitsPerSubtask;
-
-               @SuppressWarnings("unchecked")
-               public 
PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] 
inputSplitsPerSubtask) {
-                       // copy input split assignment
-                       this.inputSplitsPerSubtask = 
(List<LocatableInputSplit>[]) new List<?>[inputSplitsPerSubtask.length];
-                       for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
-                               List<LocatableInputSplit> next = 
inputSplitsPerSubtask[i];
-                               
-                               this.inputSplitsPerSubtask[i] = next == null || 
next.isEmpty() ?
-                                               
Collections.<LocatableInputSplit>emptyList() : 
-                                               new 
ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]);
-                       }
-               }
-
-               @Override
-               public InputSplit getNextInputSplit(String host, int taskId) {
-                       if (inputSplitsPerSubtask[taskId].isEmpty()) {
-                               return null;
-                       } else {
-                               return 
inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
-                       }
-               }
-       }
 
        public static ExecutionState getAggregateJobVertexState(int[] 
verticesPerState, int parallelism) {
                if (verticesPerState == null || verticesPerState.length != 
ExecutionState.values().length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
deleted file mode 100644
index f03370c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.StrictlyLocalAssignment;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-public class LocalInputSplitsTest {
-       
-       private static final FiniteDuration TIMEOUT = new FiniteDuration(100, 
TimeUnit.SECONDS);
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Test
-       public void testNotEnoughSubtasks() {
-               int numHosts = 3;
-               int slotsPerHost = 1;
-               int parallelism = 2;
-               
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(1, "host1"),
-                               new TestLocatableInputSplit(2, "host2"),
-                               new TestLocatableInputSplit(3, "host3")
-               };
-               
-               // This should fail with an exception, since the parallelism of 
2 does not
-               // support strictly local assignment onto 3 hosts
-               try {
-                       runTests(numHosts, slotsPerHost, parallelism, splits);
-                       fail("should throw an exception");
-               }
-               catch (JobException e) {
-                       // what a great day!
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testDisallowMultipleLocations() {
-               int numHosts = 2;
-               int slotsPerHost = 1;
-               int parallelism = 2;
-               
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(1, new String[] { 
"host1", "host2" } ),
-                               new TestLocatableInputSplit(2, new String[] { 
"host1", "host2" } )
-               };
-               
-               // This should fail with an exception, since strictly local 
assignment
-               // currently supports only one choice of host
-               try {
-                       runTests(numHosts, slotsPerHost, parallelism, splits);
-                       fail("should throw an exception");
-               }
-               catch (JobException e) {
-                       // dandy!
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testNonExistentHost() {
-               int numHosts = 2;
-               int slotsPerHost = 1;
-               int parallelism = 2;
-               
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(1, "host1"),
-                               new TestLocatableInputSplit(2, "bogus_host" )
-               };
-               
-               // This should fail with an exception, since one of the hosts 
does not exist
-               try {
-                       runTests(numHosts, slotsPerHost, parallelism, splits);
-                       fail("should throw an exception");
-               }
-               catch (JobException e) {
-                       // dandy!
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testEqualSplitsPerHostAndSubtask() {
-               int numHosts = 5;
-               int slotsPerHost = 2;
-               int parallelism = 10;
-               
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(7, "host4"),
-                               new TestLocatableInputSplit(8, "host4"),
-                               new TestLocatableInputSplit(1, "host1"),
-                               new TestLocatableInputSplit(2, "host1"),
-                               new TestLocatableInputSplit(3, "host2"),
-                               new TestLocatableInputSplit(4, "host2"),
-                               new TestLocatableInputSplit(5, "host3"),
-                               new TestLocatableInputSplit(6, "host3"),
-                               new TestLocatableInputSplit(9, "host5"),
-                               new TestLocatableInputSplit(10, "host5")
-               };
-               
-               try {
-                       String[] hostsForTasks = runTests(numHosts, 
slotsPerHost, parallelism, splits);
-                       
-                       assertEquals("host1", hostsForTasks[0]);
-                       assertEquals("host1", hostsForTasks[1]);
-                       assertEquals("host2", hostsForTasks[2]);
-                       assertEquals("host2", hostsForTasks[3]);
-                       assertEquals("host3", hostsForTasks[4]);
-                       assertEquals("host3", hostsForTasks[5]);
-                       assertEquals("host4", hostsForTasks[6]);
-                       assertEquals("host4", hostsForTasks[7]);
-                       assertEquals("host5", hostsForTasks[8]);
-                       assertEquals("host5", hostsForTasks[9]);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testNonEqualSplitsPerhost() {
-               int numHosts = 3;
-               int slotsPerHost = 2;
-               int parallelism = 5;
-               
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(1, "host3"),
-                               new TestLocatableInputSplit(2, "host1"),
-                               new TestLocatableInputSplit(3, "host1"),
-                               new TestLocatableInputSplit(4, "host1"),
-                               new TestLocatableInputSplit(5, "host1"),
-                               new TestLocatableInputSplit(6, "host1"),
-                               new TestLocatableInputSplit(7, "host2"),
-                               new TestLocatableInputSplit(8, "host2")
-               };
-               
-               try {
-                       String[] hostsForTasks = runTests(numHosts, 
slotsPerHost, parallelism, splits);
-                       
-                       assertEquals("host1", hostsForTasks[0]);
-                       assertEquals("host1", hostsForTasks[1]);
-                       assertEquals("host2", hostsForTasks[2]);
-                       assertEquals("host2", hostsForTasks[3]);
-                       assertEquals("host3", hostsForTasks[4]);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWithSubtasksEmpty() {
-               int numHosts = 3;
-               int slotsPerHost = 5;
-               int parallelism = 7;
-               
-               // host one gets three subtasks (but two remain empty)
-               // host two get two subtasks where one gets two splits, the 
other one split
-               // host three gets two subtasks where one gets five splits, the 
other gets four splits
-               
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(1, "host1"),
-                               new TestLocatableInputSplit(2, "host2"),
-                               new TestLocatableInputSplit(3, "host2"),
-                               new TestLocatableInputSplit(4, "host2"),
-                               new TestLocatableInputSplit(5, "host3"),
-                               new TestLocatableInputSplit(6, "host3"),
-                               new TestLocatableInputSplit(7, "host3"),
-                               new TestLocatableInputSplit(8, "host3"),
-                               new TestLocatableInputSplit(9, "host3"),
-                               new TestLocatableInputSplit(10, "host3"),
-                               new TestLocatableInputSplit(11, "host3"),
-                               new TestLocatableInputSplit(12, "host3"),
-                               new TestLocatableInputSplit(13, "host3")
-               };
-               
-               try {
-                       String[] hostsForTasks = runTests(numHosts, 
slotsPerHost, parallelism, splits);
-                       
-                       assertEquals("host1", hostsForTasks[0]);
-                       
-                       assertEquals("host2", hostsForTasks[1]);
-                       assertEquals("host2", hostsForTasks[2]);
-
-                       assertEquals("host3", hostsForTasks[3]);
-                       assertEquals("host3", hostsForTasks[4]);
-                       
-                       // the current assignment leaves those with empty 
constraints
-                       assertTrue(hostsForTasks[5].equals("host1") || 
hostsForTasks[5].equals("host2") || hostsForTasks[5].equals("host3"));
-                       assertTrue(hostsForTasks[6].equals("host1") || 
hostsForTasks[6].equals("host2") || hostsForTasks[6].equals("host3"));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testMultipleInstancesPerHost() {
-
-               TestLocatableInputSplit[] splits = new 
TestLocatableInputSplit[] {
-                               new TestLocatableInputSplit(1, "host1"),
-                               new TestLocatableInputSplit(2, "host1"),
-                               new TestLocatableInputSplit(3, "host2"),
-                               new TestLocatableInputSplit(4, "host2"),
-                               new TestLocatableInputSplit(5, "host3"),
-                               new TestLocatableInputSplit(6, "host3")
-               };
-               
-               try {
-                       JobVertex vertex = new JobVertex("test vertex");
-                       vertex.setParallelism(6);
-                       vertex.setInvokableClass(DummyInvokable.class);
-                       vertex.setInputSplitSource(new 
TestInputSplitSource(splits));
-                       
-                       JobGraph jobGraph = new JobGraph("test job", vertex);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(), 
-                               jobGraph.getJobID(),
-                               jobGraph.getName(),  
-                               jobGraph.getJobConfiguration(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               TIMEOUT,
-                               new NoRestartStrategy());
-                       
-                       
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-                       eg.setQueuedSchedulingAllowed(false);
-                       
-                       // create a scheduler with 6 instances where always two 
are on the same host
-                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, 
"host1", 1);
-                       Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, 
"host1", 1);
-                       Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, 
"host2", 1);
-                       Instance i4 = getInstance(new byte[] {10,0,1,2}, 12346, 
"host2", 1);
-                       Instance i5 = getInstance(new byte[] {10,0,1,3}, 12345, 
"host3", 1);
-                       Instance i6 = getInstance(new byte[] {10,0,1,3}, 12346, 
"host4", 1);
-                       scheduler.newInstanceAvailable(i1);
-                       scheduler.newInstanceAvailable(i2);
-                       scheduler.newInstanceAvailable(i3);
-                       scheduler.newInstanceAvailable(i4);
-                       scheduler.newInstanceAvailable(i5);
-                       scheduler.newInstanceAvailable(i6);
-                       
-                       eg.scheduleForExecution(scheduler);
-                       
-                       ExecutionVertex[] tasks = 
eg.getVerticesTopologically().iterator().next().getTaskVertices();
-                       assertEquals(6, tasks.length);
-                       
-                       Instance taskInstance1 = 
tasks[0].getCurrentAssignedResource().getInstance();
-                       Instance taskInstance2 = 
tasks[1].getCurrentAssignedResource().getInstance();
-                       Instance taskInstance3 = 
tasks[2].getCurrentAssignedResource().getInstance();
-                       Instance taskInstance4 = 
tasks[3].getCurrentAssignedResource().getInstance();
-                       Instance taskInstance5 = 
tasks[4].getCurrentAssignedResource().getInstance();
-                       Instance taskInstance6 = 
tasks[5].getCurrentAssignedResource().getInstance();
-                       
-                       assertTrue (taskInstance1 == i1 || taskInstance1 == i2);
-                       assertTrue (taskInstance2 == i1 || taskInstance2 == i2);
-                       assertTrue (taskInstance3 == i3 || taskInstance3 == i4);
-                       assertTrue (taskInstance4 == i3 || taskInstance4 == i4);
-                       assertTrue (taskInstance5 == i5 || taskInstance5 == i6);
-                       assertTrue (taskInstance6 == i5 || taskInstance6 == i6);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static String[] runTests(int numHosts, int slotsPerHost, int 
parallelism, 
-                       TestLocatableInputSplit[] splits)
-               throws Exception
-       {
-               JobVertex vertex = new JobVertex("test vertex");
-               vertex.setParallelism(parallelism);
-               vertex.setInvokableClass(DummyInvokable.class);
-               vertex.setInputSplitSource(new TestInputSplitSource(splits));
-               
-               JobGraph jobGraph = new JobGraph("test job", vertex);
-               
-               ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(),
-                       jobGraph.getJobID(),
-                       jobGraph.getName(),  
-                       jobGraph.getJobConfiguration(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                       TIMEOUT,
-                       new NoRestartStrategy());
-               
-               eg.setQueuedSchedulingAllowed(false);
-               
-               
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-               
-               Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
-               eg.scheduleForExecution(scheduler);
-               
-               ExecutionVertex[] tasks = 
eg.getVerticesTopologically().iterator().next().getTaskVertices();
-               assertEquals(parallelism, tasks.length);
-               
-               String[] hostsForTasks = new String[parallelism];
-               for (int i = 0; i < parallelism; i++) {
-                       hostsForTasks[i] = 
tasks[i].getCurrentAssignedResourceLocation().getHostname();
-               }
-               
-               return hostsForTasks;
-       }
-       
-       private static Scheduler getScheduler(int numInstances, int 
numSlotsPerInstance) throws Exception {
-               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-               
-               for (int i = 0; i < numInstances; i++) {
-                       byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + 
i) };
-                       int dataPort = 12001 + i;
-                       String host = "host" + (i+1);
-                       
-                       Instance instance = getInstance(ipAddress, dataPort, 
host, numSlotsPerInstance);
-                       scheduler.newInstanceAvailable(instance);
-               }
-               return scheduler;
-       }
-       
-       private static Instance getInstance(byte[] ipAddress, int dataPort, 
String hostname, int slots) throws Exception {
-               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-               
-               InstanceConnectionInfo connection = 
mock(InstanceConnectionInfo.class);
-               
when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
-               when(connection.dataPort()).thenReturn(dataPort);
-               
when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
-               when(connection.getHostname()).thenReturn(hostname);
-               when(connection.getFQDNHostname()).thenReturn(hostname);
-               
-               return new Instance(
-                               new ExecutionGraphTestUtils.SimpleActorGateway(
-                                               
TestingUtils.defaultExecutionContext()),
-                               connection,
-                               ResourceID.generate(),
-                               new InstanceID(),
-                               hardwareDescription,
-                               slots);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       // custom class to ensure behavior works for subclasses of 
LocatableInputSplit
-       private static class TestLocatableInputSplit extends 
LocatableInputSplit {
-               
-               private static final long serialVersionUID = 1L;
-
-               public TestLocatableInputSplit(int splitNumber, String 
hostname) {
-                       super(splitNumber, hostname);
-               }
-               
-               public TestLocatableInputSplit(int splitNumber, String[] 
hostnames) {
-                       super(splitNumber, hostnames);
-               }
-       }
-       
-       private static class TestInputSplitSource implements 
InputSplitSource<TestLocatableInputSplit>,
-               StrictlyLocalAssignment
-       {
-               private static final long serialVersionUID = 1L;
-               
-               private final TestLocatableInputSplit[] splits;
-               
-               public TestInputSplitSource(TestLocatableInputSplit[] splits) {
-                       this.splits = splits;
-               }
-
-               @Override
-               public TestLocatableInputSplit[] createInputSplits(int 
minNumSplits) {
-                       return splits;
-               }
-               
-               @Override
-               public InputSplitAssigner 
getInputSplitAssigner(TestLocatableInputSplit[] inputSplits) {
-                       fail("This method should not be called on 
StrictlyLocalAssignment splits.");
-                       return null; // silence the compiler
-               }
-       }
-}

Reply via email to