This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2233cb2  [FLINK-21576][runtime] Remove legacy 
ExecutionVertex#getPreferredLocations()
2233cb2 is described below

commit 2233cb2ebad5908464e36ac890f0ab54bd57d35a
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Wed Mar 3 14:39:52 2021 +0800

    [FLINK-21576][runtime] Remove legacy ExecutionVertex#getPreferredLocations()
    
    It is superseded by DefaultPreferredLocationsRetriever now and is no longer 
used.
    Its test ExecutionVertexLocalityTest is superseded by 
DefaultPreferredLocationsRetrieverTest.
---
 .../flink/runtime/executiongraph/Execution.java    |  53 -----
 .../runtime/executiongraph/ExecutionVertex.java    | 105 ---------
 .../ExecutionVertexLocalityTest.java               | 253 ---------------------
 3 files changed, 411 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index d46fb7f..d5fee3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -1366,57 +1364,6 @@ public class Execution
     //  Miscellaneous
     // 
--------------------------------------------------------------------------------------------
 
-    /**
-     * Calculates the preferred locations based on the location preference 
constraint.
-     *
-     * @param locationPreferenceConstraint constraint for the location 
preference
-     * @return Future containing the collection of preferred locations. This 
might not be completed
-     *     if not all inputs have been a resource assigned.
-     */
-    @VisibleForTesting
-    public CompletableFuture<Collection<TaskManagerLocation>> 
calculatePreferredLocations(
-            LocationPreferenceConstraint locationPreferenceConstraint) {
-        final Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures =
-                getVertex().getPreferredLocations();
-        final CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture;
-
-        switch (locationPreferenceConstraint) {
-            case ALL:
-                preferredLocationsFuture = 
FutureUtils.combineAll(preferredLocationFutures);
-                break;
-            case ANY:
-                final ArrayList<TaskManagerLocation> 
completedTaskManagerLocations =
-                        new ArrayList<>(preferredLocationFutures.size());
-
-                for (CompletableFuture<TaskManagerLocation> 
preferredLocationFuture :
-                        preferredLocationFutures) {
-                    if (preferredLocationFuture.isDone()
-                            && 
!preferredLocationFuture.isCompletedExceptionally()) {
-                        final TaskManagerLocation taskManagerLocation =
-                                preferredLocationFuture.getNow(null);
-
-                        if (taskManagerLocation == null) {
-                            throw new FlinkRuntimeException(
-                                    "TaskManagerLocationFuture was completed 
with null. This indicates a programming bug.");
-                        }
-
-                        completedTaskManagerLocations.add(taskManagerLocation);
-                    }
-                }
-
-                preferredLocationsFuture =
-                        
CompletableFuture.completedFuture(completedTaskManagerLocations);
-                break;
-            default:
-                throw new RuntimeException(
-                        "Unknown LocationPreferenceConstraint "
-                                + locationPreferenceConstraint
-                                + '.');
-        }
-
-        return preferredLocationsFuture;
-    }
-
     public void transitionState(ExecutionState targetState) {
         transitionState(state, targetState);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 1bc0fda..199a3f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,15 +43,12 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
@@ -425,53 +422,6 @@ public class ExecutionVertex
     }
 
     /**
-     * Gets the overall preferred execution location for this vertex's current 
execution. The
-     * preference is determined as follows:
-     *
-     * <ol>
-     *   <li>If the task execution has state to load (from a checkpoint), then 
the location
-     *       preference is the location of the previous execution (if there is 
a previous execution
-     *       attempt).
-     *   <li>If the task execution has no state or no previous location, then 
the location
-     *       preference is based on the task's inputs.
-     * </ol>
-     *
-     * <p>These rules should result in the following behavior:
-     *
-     * <ul>
-     *   <li>Stateless tasks are always scheduled based on co-location with 
inputs.
-     *   <li>Stateful tasks are on their initial attempt executed based on 
co-location with inputs.
-     *   <li>Repeated executions of stateful tasks try to co-locate the 
execution with its state.
-     * </ul>
-     *
-     * @see #getPreferredLocationsBasedOnState()
-     * @see #getPreferredLocationsBasedOnInputs()
-     * @return The preferred execution locations for the execution attempt.
-     */
-    public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocations() {
-        Collection<CompletableFuture<TaskManagerLocation>> basedOnState =
-                getPreferredLocationsBasedOnState();
-        return basedOnState != null ? basedOnState : 
getPreferredLocationsBasedOnInputs();
-    }
-
-    /**
-     * Gets the preferred location to execute the current task execution 
attempt, based on the state
-     * that the execution attempt will resume.
-     *
-     * @return A size-one collection with the location preference, or null, if 
there is no location
-     *     preference based on the state.
-     */
-    public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnState() {
-        TaskManagerLocation priorLocation;
-        if (currentExecution.getTaskRestore() != null
-                && (priorLocation = getLatestPriorLocation()) != null) {
-            return 
Collections.singleton(CompletableFuture.completedFuture(priorLocation));
-        } else {
-            return null;
-        }
-    }
-
-    /**
      * Gets the preferred location to execute the current task execution 
attempt, based on the state
      * that the execution attempt will resume.
      */
@@ -483,61 +433,6 @@ public class ExecutionVertex
         return Optional.empty();
     }
 
-    /**
-     * Gets the location preferences of the vertex's current task execution, 
as determined by the
-     * locations of the predecessors from which it receives input data. If 
there are more than
-     * MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, 
this method returns
-     * {@code null} to indicate no location preference.
-     *
-     * @return The preferred locations based in input streams, or an empty 
iterable, if there is no
-     *     input-based preference.
-     */
-    public Collection<CompletableFuture<TaskManagerLocation>> 
getPreferredLocationsBasedOnInputs() {
-        // otherwise, base the preferred locations on the input connections
-        if (inputEdges == null) {
-            return Collections.emptySet();
-        } else {
-            Set<CompletableFuture<TaskManagerLocation>> locations =
-                    new HashSet<>(getTotalNumberOfParallelSubtasks());
-            Set<CompletableFuture<TaskManagerLocation>> inputLocations =
-                    new HashSet<>(getTotalNumberOfParallelSubtasks());
-
-            // go over all inputs
-            for (int i = 0; i < inputEdges.length; i++) {
-                inputLocations.clear();
-                ExecutionEdge[] sources = inputEdges[i];
-                if (sources != null) {
-                    // go over all input sources
-                    for (int k = 0; k < sources.length; k++) {
-                        // look-up assigned slot of input source
-                        CompletableFuture<TaskManagerLocation> locationFuture =
-                                sources[k]
-                                        .getSource()
-                                        .getProducer()
-                                        .getCurrentTaskManagerLocationFuture();
-                        // add input location
-                        inputLocations.add(locationFuture);
-                        // inputs which have too many distinct sources are not 
considered
-                        if (inputLocations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-                            inputLocations.clear();
-                            break;
-                        }
-                    }
-                }
-                // keep the locations of the input with the least preferred 
locations
-                if (locations.isEmpty()
-                        || // nothing assigned yet
-                        (!inputLocations.isEmpty() && inputLocations.size() < 
locations.size())) {
-                    // current input has fewer preferred locations
-                    locations.clear();
-                    locations.addAll(inputLocations);
-                }
-            }
-
-            return locations.isEmpty() ? Collections.emptyList() : locations;
-        }
-    }
-
     // 
--------------------------------------------------------------------------------------------
     //   Actions
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
deleted file mode 100644
index bc5d84a..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.SimpleSlotContext;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotContext;
-import org.apache.flink.runtime.jobmaster.SlotOwner;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/** Tests that the execution vertex handles locality preferences well. */
-public class ExecutionVertexLocalityTest extends TestLogger {
-
-    private final JobID jobId = new JobID();
-
-    private final JobVertexID sourceVertexId = new JobVertexID();
-    private final JobVertexID targetVertexId = new JobVertexID();
-
-    /**
-     * This test validates that vertices that have only one input stream try 
to co-locate their
-     * tasks with the producer.
-     */
-    @Test
-    public void testLocalityInputBasedForward() throws Exception {
-        final int parallelism = 10;
-        final TaskManagerLocation[] locations = new 
TaskManagerLocation[parallelism];
-
-        final ExecutionGraph graph = createTestGraph(parallelism, false);
-
-        // set the location for all sources to a distinct location
-        for (int i = 0; i < parallelism; i++) {
-            ExecutionVertex source =
-                    
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
-
-            TaskManagerLocation location =
-                    new TaskManagerLocation(
-                            ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + i);
-
-            locations[i] = location;
-            initializeLocation(source, location);
-        }
-
-        // validate that the target vertices have no location preference
-        for (int i = 0; i < parallelism; i++) {
-            ExecutionVertex target =
-                    
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-            Iterator<CompletableFuture<TaskManagerLocation>> preference =
-                    target.getPreferredLocations().iterator();
-
-            assertTrue(preference.hasNext());
-            assertEquals(locations[i], preference.next().get());
-            assertFalse(preference.hasNext());
-        }
-    }
-
-    /**
-     * This test validates that vertices with too many input streams do not 
have a location
-     * preference any more.
-     */
-    @Test
-    public void testNoLocalityInputLargeAllToAll() throws Exception {
-        final int parallelism = 100;
-
-        final ExecutionGraph graph = createTestGraph(parallelism, true);
-
-        // set the location for all sources to a distinct location
-        for (int i = 0; i < parallelism; i++) {
-            ExecutionVertex source =
-                    
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
-            TaskManagerLocation location =
-                    new TaskManagerLocation(
-                            ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + i);
-            initializeLocation(source, location);
-        }
-
-        // validate that the target vertices have no location preference
-        for (int i = 0; i < parallelism; i++) {
-            ExecutionVertex target =
-                    
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-
-            Iterator<CompletableFuture<TaskManagerLocation>> preference =
-                    target.getPreferredLocations().iterator();
-            assertFalse(preference.hasNext());
-        }
-    }
-
-    /**
-     * This test validates that stateful vertices schedule based in the 
state's location (which is
-     * the prior execution's location).
-     */
-    @Test
-    public void testLocalityBasedOnState() throws Exception {
-        final int parallelism = 10;
-        final TaskManagerLocation[] locations = new 
TaskManagerLocation[parallelism];
-
-        final ExecutionGraph graph = createTestGraph(parallelism, false);
-
-        // set the location for all sources and targets
-        for (int i = 0; i < parallelism; i++) {
-            ExecutionVertex source =
-                    
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
-            ExecutionVertex target =
-                    
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-
-            TaskManagerLocation randomLocation =
-                    new TaskManagerLocation(
-                            ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + i);
-
-            TaskManagerLocation location =
-                    new TaskManagerLocation(
-                            ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 20000 + i);
-
-            locations[i] = location;
-            initializeLocation(source, randomLocation);
-            initializeLocation(target, location);
-
-            setState(source.getCurrentExecutionAttempt(), 
ExecutionState.CANCELED);
-            setState(target.getCurrentExecutionAttempt(), 
ExecutionState.CANCELED);
-        }
-
-        // mimic a restart: all vertices get re-initialized without actually 
being executed
-        for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
-            for (ExecutionVertex ev : ejv.getTaskVertices()) {
-                ev.resetForNewExecution();
-            }
-        }
-
-        // set new location for the sources and some state for the targets
-        for (int i = 0; i < parallelism; i++) {
-            // source location
-            ExecutionVertex source =
-                    
graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i];
-            TaskManagerLocation randomLocation =
-                    new TaskManagerLocation(
-                            ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 30000 + i);
-            initializeLocation(source, randomLocation);
-
-            // target state
-            ExecutionVertex target =
-                    
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-            
target.getCurrentExecutionAttempt().setInitialState(mock(JobManagerTaskRestore.class));
-        }
-
-        // validate that the target vertices have the state's location as the 
location preference
-        for (int i = 0; i < parallelism; i++) {
-            ExecutionVertex target =
-                    
graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-            Iterator<CompletableFuture<TaskManagerLocation>> preference =
-                    target.getPreferredLocations().iterator();
-
-            assertTrue(preference.hasNext());
-            assertEquals(locations[i], preference.next().get());
-            assertFalse(preference.hasNext());
-        }
-    }
-
-    // ------------------------------------------------------------------------
-    //  Utilities
-    // ------------------------------------------------------------------------
-
-    /** Creates a simple 2 vertex graph with a parallel source and a parallel 
target. */
-    private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) 
throws Exception {
-
-        JobVertex source = new JobVertex("source", sourceVertexId);
-        source.setParallelism(parallelism);
-        source.setInvokableClass(NoOpInvokable.class);
-
-        JobVertex target = new JobVertex("source", targetVertexId);
-        target.setParallelism(parallelism);
-        target.setInvokableClass(NoOpInvokable.class);
-
-        DistributionPattern connectionPattern =
-                allToAll ? DistributionPattern.ALL_TO_ALL : 
DistributionPattern.POINTWISE;
-        target.connectNewDataSetAsInput(source, connectionPattern, 
ResultPartitionType.PIPELINED);
-
-        JobGraph testJob = new JobGraph(jobId, "test job", source, target);
-
-        return 
TestingExecutionGraphBuilder.newBuilder().setJobGraph(testJob).build();
-    }
-
-    private void initializeLocation(ExecutionVertex vertex, 
TaskManagerLocation location)
-            throws Exception {
-        // we need a bit of reflection magic to initialize the location 
without going through
-        // scheduling paths. we choose to do that, rather than the 
alternatives:
-        //  - mocking the scheduler created fragile tests that break whenever 
the scheduler is
-        // adjusted
-        //  - exposing test methods in the ExecutionVertex leads to 
undesirable setters
-
-        SlotContext slotContext =
-                new SimpleSlotContext(
-                        new AllocationID(), location, 0, 
mock(TaskManagerGateway.class));
-
-        LogicalSlot slot =
-                new SingleLogicalSlot(
-                        new SlotRequestId(), slotContext, Locality.LOCAL, 
mock(SlotOwner.class));
-
-        if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) {
-            throw new FlinkException("Could not assign resource.");
-        }
-    }
-
-    private void setState(Execution execution, ExecutionState state) throws 
Exception {
-        final Field stateField = Execution.class.getDeclaredField("state");
-        stateField.setAccessible(true);
-
-        stateField.set(execution, state);
-    }
-}

Reply via email to