This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 2233cb2 [FLINK-21576][runtime] Remove legacy ExecutionVertex#getPreferredLocations() 2233cb2 is described below commit 2233cb2ebad5908464e36ac890f0ab54bd57d35a Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Wed Mar 3 14:39:52 2021 +0800 [FLINK-21576][runtime] Remove legacy ExecutionVertex#getPreferredLocations() It is superseded by DefaultPreferredLocationsRetriever now and is no longer used. Its test ExecutionVertexLocalityTest is superseded by DefaultPreferredLocationsRetrieverTest. --- .../flink/runtime/executiongraph/Execution.java | 53 ----- .../runtime/executiongraph/ExecutionVertex.java | 105 --------- .../ExecutionVertexLocalityTest.java | 253 --------------------- 3 files changed, 411 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index d46fb7f..d5fee3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.messages.Acknowledge; @@ -57,7 +56,6 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -1366,57 +1364,6 @@ public class Execution // Miscellaneous // -------------------------------------------------------------------------------------------- - /** - * Calculates the preferred locations based on the location preference constraint. - * - * @param locationPreferenceConstraint constraint for the location preference - * @return Future containing the collection of preferred locations. This might not be completed - * if not all inputs have been a resource assigned. - */ - @VisibleForTesting - public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations( - LocationPreferenceConstraint locationPreferenceConstraint) { - final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = - getVertex().getPreferredLocations(); - final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture; - - switch (locationPreferenceConstraint) { - case ALL: - preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); - break; - case ANY: - final ArrayList<TaskManagerLocation> completedTaskManagerLocations = - new ArrayList<>(preferredLocationFutures.size()); - - for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : - preferredLocationFutures) { - if (preferredLocationFuture.isDone() - && !preferredLocationFuture.isCompletedExceptionally()) { - final TaskManagerLocation taskManagerLocation = - preferredLocationFuture.getNow(null); - - if (taskManagerLocation == null) { - throw new FlinkRuntimeException( - "TaskManagerLocationFuture was completed with null. This indicates a programming bug."); - } - - completedTaskManagerLocations.add(taskManagerLocation); - } - } - - preferredLocationsFuture = - CompletableFuture.completedFuture(completedTaskManagerLocations); - break; - default: - throw new RuntimeException( - "Unknown LocationPreferenceConstraint " - + locationPreferenceConstraint - + '.'); - } - - return preferredLocationsFuture; - } - public void transitionState(ExecutionState targetState) { transitionState(state, targetState); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 1bc0fda..199a3f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -43,15 +43,12 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; @@ -425,53 +422,6 @@ public class ExecutionVertex } /** - * Gets the overall preferred execution location for this vertex's current execution. The - * preference is determined as follows: - * - * <ol> - * <li>If the task execution has state to load (from a checkpoint), then the location - * preference is the location of the previous execution (if there is a previous execution - * attempt). - * <li>If the task execution has no state or no previous location, then the location - * preference is based on the task's inputs. - * </ol> - * - * <p>These rules should result in the following behavior: - * - * <ul> - * <li>Stateless tasks are always scheduled based on co-location with inputs. - * <li>Stateful tasks are on their initial attempt executed based on co-location with inputs. - * <li>Repeated executions of stateful tasks try to co-locate the execution with its state. - * </ul> - * - * @see #getPreferredLocationsBasedOnState() - * @see #getPreferredLocationsBasedOnInputs() - * @return The preferred execution locations for the execution attempt. - */ - public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() { - Collection<CompletableFuture<TaskManagerLocation>> basedOnState = - getPreferredLocationsBasedOnState(); - return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs(); - } - - /** - * Gets the preferred location to execute the current task execution attempt, based on the state - * that the execution attempt will resume. - * - * @return A size-one collection with the location preference, or null, if there is no location - * preference based on the state. - */ - public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() { - TaskManagerLocation priorLocation; - if (currentExecution.getTaskRestore() != null - && (priorLocation = getLatestPriorLocation()) != null) { - return Collections.singleton(CompletableFuture.completedFuture(priorLocation)); - } else { - return null; - } - } - - /** * Gets the preferred location to execute the current task execution attempt, based on the state * that the execution attempt will resume. */ @@ -483,61 +433,6 @@ public class ExecutionVertex return Optional.empty(); } - /** - * Gets the location preferences of the vertex's current task execution, as determined by the - * locations of the predecessors from which it receives input data. If there are more than - * MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this method returns - * {@code null} to indicate no location preference. - * - * @return The preferred locations based in input streams, or an empty iterable, if there is no - * input-based preference. - */ - public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() { - // otherwise, base the preferred locations on the input connections - if (inputEdges == null) { - return Collections.emptySet(); - } else { - Set<CompletableFuture<TaskManagerLocation>> locations = - new HashSet<>(getTotalNumberOfParallelSubtasks()); - Set<CompletableFuture<TaskManagerLocation>> inputLocations = - new HashSet<>(getTotalNumberOfParallelSubtasks()); - - // go over all inputs - for (int i = 0; i < inputEdges.length; i++) { - inputLocations.clear(); - ExecutionEdge[] sources = inputEdges[i]; - if (sources != null) { - // go over all input sources - for (int k = 0; k < sources.length; k++) { - // look-up assigned slot of input source - CompletableFuture<TaskManagerLocation> locationFuture = - sources[k] - .getSource() - .getProducer() - .getCurrentTaskManagerLocationFuture(); - // add input location - inputLocations.add(locationFuture); - // inputs which have too many distinct sources are not considered - if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - inputLocations.clear(); - break; - } - } - } - // keep the locations of the input with the least preferred locations - if (locations.isEmpty() - || // nothing assigned yet - (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { - // current input has fewer preferred locations - locations.clear(); - locations.addAll(inputLocations); - } - } - - return locations.isEmpty() ? Collections.emptyList() : locations; - } - } - // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java deleted file mode 100644 index bc5d84a..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.SimpleSlotContext; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.SlotContext; -import org.apache.flink.runtime.jobmaster.SlotOwner; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.lang.reflect.Field; -import java.net.InetAddress; -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -/** Tests that the execution vertex handles locality preferences well. */ -public class ExecutionVertexLocalityTest extends TestLogger { - - private final JobID jobId = new JobID(); - - private final JobVertexID sourceVertexId = new JobVertexID(); - private final JobVertexID targetVertexId = new JobVertexID(); - - /** - * This test validates that vertices that have only one input stream try to co-locate their - * tasks with the producer. - */ - @Test - public void testLocalityInputBasedForward() throws Exception { - final int parallelism = 10; - final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism]; - - final ExecutionGraph graph = createTestGraph(parallelism, false); - - // set the location for all sources to a distinct location - for (int i = 0; i < parallelism; i++) { - ExecutionVertex source = - graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; - - TaskManagerLocation location = - new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i); - - locations[i] = location; - initializeLocation(source, location); - } - - // validate that the target vertices have no location preference - for (int i = 0; i < parallelism; i++) { - ExecutionVertex target = - graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - Iterator<CompletableFuture<TaskManagerLocation>> preference = - target.getPreferredLocations().iterator(); - - assertTrue(preference.hasNext()); - assertEquals(locations[i], preference.next().get()); - assertFalse(preference.hasNext()); - } - } - - /** - * This test validates that vertices with too many input streams do not have a location - * preference any more. - */ - @Test - public void testNoLocalityInputLargeAllToAll() throws Exception { - final int parallelism = 100; - - final ExecutionGraph graph = createTestGraph(parallelism, true); - - // set the location for all sources to a distinct location - for (int i = 0; i < parallelism; i++) { - ExecutionVertex source = - graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; - TaskManagerLocation location = - new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i); - initializeLocation(source, location); - } - - // validate that the target vertices have no location preference - for (int i = 0; i < parallelism; i++) { - ExecutionVertex target = - graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - - Iterator<CompletableFuture<TaskManagerLocation>> preference = - target.getPreferredLocations().iterator(); - assertFalse(preference.hasNext()); - } - } - - /** - * This test validates that stateful vertices schedule based in the state's location (which is - * the prior execution's location). - */ - @Test - public void testLocalityBasedOnState() throws Exception { - final int parallelism = 10; - final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism]; - - final ExecutionGraph graph = createTestGraph(parallelism, false); - - // set the location for all sources and targets - for (int i = 0; i < parallelism; i++) { - ExecutionVertex source = - graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; - ExecutionVertex target = - graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - - TaskManagerLocation randomLocation = - new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i); - - TaskManagerLocation location = - new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 20000 + i); - - locations[i] = location; - initializeLocation(source, randomLocation); - initializeLocation(target, location); - - setState(source.getCurrentExecutionAttempt(), ExecutionState.CANCELED); - setState(target.getCurrentExecutionAttempt(), ExecutionState.CANCELED); - } - - // mimic a restart: all vertices get re-initialized without actually being executed - for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) { - for (ExecutionVertex ev : ejv.getTaskVertices()) { - ev.resetForNewExecution(); - } - } - - // set new location for the sources and some state for the targets - for (int i = 0; i < parallelism; i++) { - // source location - ExecutionVertex source = - graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; - TaskManagerLocation randomLocation = - new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), 30000 + i); - initializeLocation(source, randomLocation); - - // target state - ExecutionVertex target = - graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - target.getCurrentExecutionAttempt().setInitialState(mock(JobManagerTaskRestore.class)); - } - - // validate that the target vertices have the state's location as the location preference - for (int i = 0; i < parallelism; i++) { - ExecutionVertex target = - graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; - Iterator<CompletableFuture<TaskManagerLocation>> preference = - target.getPreferredLocations().iterator(); - - assertTrue(preference.hasNext()); - assertEquals(locations[i], preference.next().get()); - assertFalse(preference.hasNext()); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** Creates a simple 2 vertex graph with a parallel source and a parallel target. */ - private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws Exception { - - JobVertex source = new JobVertex("source", sourceVertexId); - source.setParallelism(parallelism); - source.setInvokableClass(NoOpInvokable.class); - - JobVertex target = new JobVertex("source", targetVertexId); - target.setParallelism(parallelism); - target.setInvokableClass(NoOpInvokable.class); - - DistributionPattern connectionPattern = - allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE; - target.connectNewDataSetAsInput(source, connectionPattern, ResultPartitionType.PIPELINED); - - JobGraph testJob = new JobGraph(jobId, "test job", source, target); - - return TestingExecutionGraphBuilder.newBuilder().setJobGraph(testJob).build(); - } - - private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) - throws Exception { - // we need a bit of reflection magic to initialize the location without going through - // scheduling paths. we choose to do that, rather than the alternatives: - // - mocking the scheduler created fragile tests that break whenever the scheduler is - // adjusted - // - exposing test methods in the ExecutionVertex leads to undesirable setters - - SlotContext slotContext = - new SimpleSlotContext( - new AllocationID(), location, 0, mock(TaskManagerGateway.class)); - - LogicalSlot slot = - new SingleLogicalSlot( - new SlotRequestId(), slotContext, Locality.LOCAL, mock(SlotOwner.class)); - - if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) { - throw new FlinkException("Could not assign resource."); - } - } - - private void setState(Execution execution, ExecutionState state) throws Exception { - final Field stateField = Execution.class.getDeclaredField("state"); - stateField.setAccessible(true); - - stateField.set(execution, state); - } -}