Repository: flink Updated Branches: refs/heads/master 1d53a40a6 -> 578e80e3c
[FLINK-4525] [core] Drop special cases for 'StrictlyLocalAssignment' and 'PredeterminedAssignment' Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/578e80e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/578e80e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/578e80e3 Branch: refs/heads/master Commit: 578e80e3c161601d22760ef2ea0e52e6ae963786 Parents: 1d53a40 Author: Stephan Ewen <[email protected]> Authored: Sat Aug 27 15:23:38 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Aug 29 17:07:55 2016 +0200 ---------------------------------------------------------------------- .../api/common/io/StrictlyLocalAssignment.java | 24 - .../executiongraph/ExecutionJobVertex.java | 104 +---- .../executiongraph/LocalInputSplitsTest.java | 436 ------------------- 3 files changed, 8 insertions(+), 556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java b/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java deleted file mode 100644 index e20107b..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/StrictlyLocalAssignment.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.io; - -import org.apache.flink.annotation.PublicEvolving; - -@PublicEvolving -public interface StrictlyLocalAssignment {} http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 7b28b31..6272151 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.io.StrictlyLocalAssignment; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -30,7 +29,6 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -81,11 +79,9 @@ public class ExecutionJobVertex { private final CoLocationGroup coLocationGroup; private final InputSplit[] inputSplits; - - private List<LocatableInputSplit>[] inputSplitsPerSubtask; - + private InputSplitAssigner splitAssigner; - + public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, FiniteDuration timeout) throws JobException { this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis()); @@ -155,12 +151,7 @@ public class ExecutionJobVertex { inputSplits = splitSource.createInputSplits(numTaskVertices); if (inputSplits != null) { - if (splitSource instanceof StrictlyLocalAssignment) { - inputSplitsPerSubtask = computeLocalInputSplitsPerTask(inputSplits); - splitAssigner = new PredeterminedInputSplitAssigner(inputSplitsPerSubtask); - } else { - splitAssigner = splitSource.getInputSplitAssigner(inputSplits); - } + splitAssigner = splitSource.getInputSplitAssigner(inputSplits); } } else { @@ -278,48 +269,8 @@ public class ExecutionJobVertex { //--------------------------------------------------------------------------------------------- public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { - ExecutionVertex[] vertices = this.taskVertices; - - // check if we need to do pre-assignment of tasks - if (inputSplitsPerSubtask != null) { - - final Map<String, List<Instance>> instances = scheduler.getInstancesByHost(); - final Map<String, Integer> assignments = new HashMap<String, Integer>(); - - for (int i = 0; i < vertices.length; i++) { - List<LocatableInputSplit> splitsForHost = inputSplitsPerSubtask[i]; - if (splitsForHost == null || splitsForHost.isEmpty()) { - continue; - } - - String[] hostNames = splitsForHost.get(0).getHostnames(); - if (hostNames == null || hostNames.length == 0 || hostNames[0] == null) { - continue; - } - - String host = hostNames[0]; - ExecutionVertex v = vertices[i]; - - List<Instance> instancesOnHost = instances.get(host); - - if (instancesOnHost == null || instancesOnHost.isEmpty()) { - throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + host - + ". No TaskManager available on that host."); - } - - Integer pos = assignments.get(host); - if (pos == null) { - pos = 0; - assignments.put(host, 0); - } else { - assignments.put(host, (pos + 1) % instancesOnHost.size()); - } - - v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos))); - } - } - + // kick off the tasks for (ExecutionVertex ev : vertices) { ev.scheduleForExecution(scheduler, queued); @@ -374,17 +325,10 @@ public class ExecutionJobVertex { // set up the input splits again try { if (this.inputSplits != null) { - - if (inputSplitsPerSubtask == null) { - // lazy assignment - @SuppressWarnings("unchecked") - InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource(); - this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits); - } - else { - // eager assignment - //TODO: this.splitAssigner = new AssignBasedOnPreAssignment(); - } + // lazy assignment + @SuppressWarnings("unchecked") + InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource(); + this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits); } } catch (Throwable t) { @@ -426,7 +370,6 @@ public class ExecutionJobVertex { inputSplits[i] = null; } } - inputSplitsPerSubtask = null; } //--------------------------------------------------------------------------------------------- @@ -628,37 +571,6 @@ public class ExecutionJobVertex { return subTaskSplitAssignment; } - - - /** - * An InputSplitAssigner that assigns to pre-determined hosts. - */ - public static class PredeterminedInputSplitAssigner implements InputSplitAssigner { - - private List<LocatableInputSplit>[] inputSplitsPerSubtask; - - @SuppressWarnings("unchecked") - public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) { - // copy input split assignment - this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List<?>[inputSplitsPerSubtask.length]; - for (int i = 0; i < inputSplitsPerSubtask.length; i++) { - List<LocatableInputSplit> next = inputSplitsPerSubtask[i]; - - this.inputSplitsPerSubtask[i] = next == null || next.isEmpty() ? - Collections.<LocatableInputSplit>emptyList() : - new ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]); - } - } - - @Override - public InputSplit getNextInputSplit(String host, int taskId) { - if (inputSplitsPerSubtask[taskId].isEmpty()) { - return null; - } else { - return inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1); - } - } - } public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) { if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) { http://git-wip-us.apache.org/repos/asf/flink/blob/578e80e3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java deleted file mode 100644 index f03370c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.net.InetAddress; -import java.util.concurrent.TimeUnit; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.io.StrictlyLocalAssignment; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.core.io.InputSplitSource; -import org.apache.flink.core.io.LocatableInputSplit; -import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; -import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.SerializedValue; -import org.junit.Test; - -import scala.concurrent.duration.FiniteDuration; - -public class LocalInputSplitsTest { - - private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); - - // -------------------------------------------------------------------------------------------- - - @Test - public void testNotEnoughSubtasks() { - int numHosts = 3; - int slotsPerHost = 1; - int parallelism = 2; - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(1, "host1"), - new TestLocatableInputSplit(2, "host2"), - new TestLocatableInputSplit(3, "host3") - }; - - // This should fail with an exception, since the parallelism of 2 does not - // support strictly local assignment onto 3 hosts - try { - runTests(numHosts, slotsPerHost, parallelism, splits); - fail("should throw an exception"); - } - catch (JobException e) { - // what a great day! - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testDisallowMultipleLocations() { - int numHosts = 2; - int slotsPerHost = 1; - int parallelism = 2; - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(1, new String[] { "host1", "host2" } ), - new TestLocatableInputSplit(2, new String[] { "host1", "host2" } ) - }; - - // This should fail with an exception, since strictly local assignment - // currently supports only one choice of host - try { - runTests(numHosts, slotsPerHost, parallelism, splits); - fail("should throw an exception"); - } - catch (JobException e) { - // dandy! - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testNonExistentHost() { - int numHosts = 2; - int slotsPerHost = 1; - int parallelism = 2; - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(1, "host1"), - new TestLocatableInputSplit(2, "bogus_host" ) - }; - - // This should fail with an exception, since one of the hosts does not exist - try { - runTests(numHosts, slotsPerHost, parallelism, splits); - fail("should throw an exception"); - } - catch (JobException e) { - // dandy! - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testEqualSplitsPerHostAndSubtask() { - int numHosts = 5; - int slotsPerHost = 2; - int parallelism = 10; - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(7, "host4"), - new TestLocatableInputSplit(8, "host4"), - new TestLocatableInputSplit(1, "host1"), - new TestLocatableInputSplit(2, "host1"), - new TestLocatableInputSplit(3, "host2"), - new TestLocatableInputSplit(4, "host2"), - new TestLocatableInputSplit(5, "host3"), - new TestLocatableInputSplit(6, "host3"), - new TestLocatableInputSplit(9, "host5"), - new TestLocatableInputSplit(10, "host5") - }; - - try { - String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits); - - assertEquals("host1", hostsForTasks[0]); - assertEquals("host1", hostsForTasks[1]); - assertEquals("host2", hostsForTasks[2]); - assertEquals("host2", hostsForTasks[3]); - assertEquals("host3", hostsForTasks[4]); - assertEquals("host3", hostsForTasks[5]); - assertEquals("host4", hostsForTasks[6]); - assertEquals("host4", hostsForTasks[7]); - assertEquals("host5", hostsForTasks[8]); - assertEquals("host5", hostsForTasks[9]); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testNonEqualSplitsPerhost() { - int numHosts = 3; - int slotsPerHost = 2; - int parallelism = 5; - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(1, "host3"), - new TestLocatableInputSplit(2, "host1"), - new TestLocatableInputSplit(3, "host1"), - new TestLocatableInputSplit(4, "host1"), - new TestLocatableInputSplit(5, "host1"), - new TestLocatableInputSplit(6, "host1"), - new TestLocatableInputSplit(7, "host2"), - new TestLocatableInputSplit(8, "host2") - }; - - try { - String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits); - - assertEquals("host1", hostsForTasks[0]); - assertEquals("host1", hostsForTasks[1]); - assertEquals("host2", hostsForTasks[2]); - assertEquals("host2", hostsForTasks[3]); - assertEquals("host3", hostsForTasks[4]); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testWithSubtasksEmpty() { - int numHosts = 3; - int slotsPerHost = 5; - int parallelism = 7; - - // host one gets three subtasks (but two remain empty) - // host two get two subtasks where one gets two splits, the other one split - // host three gets two subtasks where one gets five splits, the other gets four splits - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(1, "host1"), - new TestLocatableInputSplit(2, "host2"), - new TestLocatableInputSplit(3, "host2"), - new TestLocatableInputSplit(4, "host2"), - new TestLocatableInputSplit(5, "host3"), - new TestLocatableInputSplit(6, "host3"), - new TestLocatableInputSplit(7, "host3"), - new TestLocatableInputSplit(8, "host3"), - new TestLocatableInputSplit(9, "host3"), - new TestLocatableInputSplit(10, "host3"), - new TestLocatableInputSplit(11, "host3"), - new TestLocatableInputSplit(12, "host3"), - new TestLocatableInputSplit(13, "host3") - }; - - try { - String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits); - - assertEquals("host1", hostsForTasks[0]); - - assertEquals("host2", hostsForTasks[1]); - assertEquals("host2", hostsForTasks[2]); - - assertEquals("host3", hostsForTasks[3]); - assertEquals("host3", hostsForTasks[4]); - - // the current assignment leaves those with empty constraints - assertTrue(hostsForTasks[5].equals("host1") || hostsForTasks[5].equals("host2") || hostsForTasks[5].equals("host3")); - assertTrue(hostsForTasks[6].equals("host1") || hostsForTasks[6].equals("host2") || hostsForTasks[6].equals("host3")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testMultipleInstancesPerHost() { - - TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] { - new TestLocatableInputSplit(1, "host1"), - new TestLocatableInputSplit(2, "host1"), - new TestLocatableInputSplit(3, "host2"), - new TestLocatableInputSplit(4, "host2"), - new TestLocatableInputSplit(5, "host3"), - new TestLocatableInputSplit(6, "host3") - }; - - try { - JobVertex vertex = new JobVertex("test vertex"); - vertex.setParallelism(6); - vertex.setInvokableClass(DummyInvokable.class); - vertex.setInputSplitSource(new TestInputSplitSource(splits)); - - JobGraph jobGraph = new JobGraph("test job", vertex); - - ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - TIMEOUT, - new NoRestartStrategy()); - - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - eg.setQueuedSchedulingAllowed(false); - - // create a scheduler with 6 instances where always two are on the same host - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1); - Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1); - Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1); - Instance i4 = getInstance(new byte[] {10,0,1,2}, 12346, "host2", 1); - Instance i5 = getInstance(new byte[] {10,0,1,3}, 12345, "host3", 1); - Instance i6 = getInstance(new byte[] {10,0,1,3}, 12346, "host4", 1); - scheduler.newInstanceAvailable(i1); - scheduler.newInstanceAvailable(i2); - scheduler.newInstanceAvailable(i3); - scheduler.newInstanceAvailable(i4); - scheduler.newInstanceAvailable(i5); - scheduler.newInstanceAvailable(i6); - - eg.scheduleForExecution(scheduler); - - ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices(); - assertEquals(6, tasks.length); - - Instance taskInstance1 = tasks[0].getCurrentAssignedResource().getInstance(); - Instance taskInstance2 = tasks[1].getCurrentAssignedResource().getInstance(); - Instance taskInstance3 = tasks[2].getCurrentAssignedResource().getInstance(); - Instance taskInstance4 = tasks[3].getCurrentAssignedResource().getInstance(); - Instance taskInstance5 = tasks[4].getCurrentAssignedResource().getInstance(); - Instance taskInstance6 = tasks[5].getCurrentAssignedResource().getInstance(); - - assertTrue (taskInstance1 == i1 || taskInstance1 == i2); - assertTrue (taskInstance2 == i1 || taskInstance2 == i2); - assertTrue (taskInstance3 == i3 || taskInstance3 == i4); - assertTrue (taskInstance4 == i3 || taskInstance4 == i4); - assertTrue (taskInstance5 == i5 || taskInstance5 == i6); - assertTrue (taskInstance6 == i5 || taskInstance6 == i6); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - private static String[] runTests(int numHosts, int slotsPerHost, int parallelism, - TestLocatableInputSplit[] splits) - throws Exception - { - JobVertex vertex = new JobVertex("test vertex"); - vertex.setParallelism(parallelism); - vertex.setInvokableClass(DummyInvokable.class); - vertex.setInputSplitSource(new TestInputSplitSource(splits)); - - JobGraph jobGraph = new JobGraph("test job", vertex); - - ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - new SerializedValue<>(new ExecutionConfig()), - TIMEOUT, - new NoRestartStrategy()); - - eg.setQueuedSchedulingAllowed(false); - - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - Scheduler scheduler = getScheduler(numHosts, slotsPerHost); - eg.scheduleForExecution(scheduler); - - ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices(); - assertEquals(parallelism, tasks.length); - - String[] hostsForTasks = new String[parallelism]; - for (int i = 0; i < parallelism; i++) { - hostsForTasks[i] = tasks[i].getCurrentAssignedResourceLocation().getHostname(); - } - - return hostsForTasks; - } - - private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception { - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - - for (int i = 0; i < numInstances; i++) { - byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) }; - int dataPort = 12001 + i; - String host = "host" + (i+1); - - Instance instance = getInstance(ipAddress, dataPort, host, numSlotsPerInstance); - scheduler.newInstanceAvailable(instance); - } - return scheduler; - } - - private static Instance getInstance(byte[] ipAddress, int dataPort, String hostname, int slots) throws Exception { - HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); - - InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class); - when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress)); - when(connection.dataPort()).thenReturn(dataPort); - when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString()); - when(connection.getHostname()).thenReturn(hostname); - when(connection.getFQDNHostname()).thenReturn(hostname); - - return new Instance( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.defaultExecutionContext()), - connection, - ResourceID.generate(), - new InstanceID(), - hardwareDescription, - slots); - } - - // -------------------------------------------------------------------------------------------- - - // custom class to ensure behavior works for subclasses of LocatableInputSplit - private static class TestLocatableInputSplit extends LocatableInputSplit { - - private static final long serialVersionUID = 1L; - - public TestLocatableInputSplit(int splitNumber, String hostname) { - super(splitNumber, hostname); - } - - public TestLocatableInputSplit(int splitNumber, String[] hostnames) { - super(splitNumber, hostnames); - } - } - - private static class TestInputSplitSource implements InputSplitSource<TestLocatableInputSplit>, - StrictlyLocalAssignment - { - private static final long serialVersionUID = 1L; - - private final TestLocatableInputSplit[] splits; - - public TestInputSplitSource(TestLocatableInputSplit[] splits) { - this.splits = splits; - } - - @Override - public TestLocatableInputSplit[] createInputSplits(int minNumSplits) { - return splits; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(TestLocatableInputSplit[] inputSplits) { - fail("This method should not be called on StrictlyLocalAssignment splits."); - return null; // silence the compiler - } - } -}
