[
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631619#comment-16631619
]
ASF GitHub Bot commented on FLINK-9455:
---------------------------------------
asfgit closed pull request #6734: [FLINK-9455][RM] Add support for multi task
slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index f18d0d8cfc7..d8267735209 100644
---
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -24,7 +24,6 @@
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
@@ -79,6 +78,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -124,6 +124,8 @@
@Nullable
private final String webUiUrl;
+ private final Collection<ResourceProfile> slotsPerWorker;
+
/** Mesos scheduler driver. */
private SchedulerDriver schedulerDriver;
@@ -191,6 +193,9 @@ public MesosResourceManager(
this.workersInNew = new HashMap<>(8);
this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);
+
+ final ContaineredTaskManagerParameters
containeredTaskManagerParameters =
taskManagerParameters.containeredParameters();
+ this.slotsPerWorker =
createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
}
protected ActorRef createSelfActor() {
@@ -352,7 +357,7 @@ private void recoverWorkers(final
List<MesosWorkerStore.Worker> tasksFromPreviou
switch(worker.state()) {
case Launched:
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
- final LaunchableMesosWorker
launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
+ final LaunchableMesosWorker
launchable = createLaunchableMesosWorker(worker.taskID());
toAssign.add(new
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
break;
case Released:
@@ -426,7 +431,7 @@ protected void internalDeregisterApplication(
}
@Override
- public void startNewWorker(ResourceProfile resourceProfile) {
+ public Collection<ResourceProfile> startNewWorker(ResourceProfile
resourceProfile) {
LOG.info("Starting a new worker.");
try {
// generate new workers into persistent state and
launch associated actors
@@ -434,7 +439,7 @@ public void startNewWorker(ResourceProfile resourceProfile)
{
workerStore.putWorker(worker);
workersInNew.put(extractResourceID(worker.taskID()),
worker);
- LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID(), resourceProfile);
+ LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID());
LOG.info("Scheduling Mesos task {} with ({} MB, {}
cpus).",
launchable.taskID().getValue(),
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
@@ -443,9 +448,12 @@ public void startNewWorker(ResourceProfile
resourceProfile) {
taskMonitor.tell(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
// tell the launch coordinator to launch the new tasks
- launchCoordinator.tell(new
LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask)
launchable)), selfActor);
+ launchCoordinator.tell(new
LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+ return slotsPerWorker;
} catch (Exception ex) {
onFatalError(new ResourceManagerException("Unable to
request new workers.", ex));
+ return Collections.emptyList();
}
}
@@ -691,36 +699,13 @@ public void taskTerminated(TaskMonitor.TaskTerminated
message) {
/**
* Creates a launchable task for Fenzo to process.
*/
- private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID
taskID, ResourceProfile resourceProfile) {
-
- // create the specific TM parameters from the resource profile
and some defaults
- MesosTaskManagerParameters params = new
MesosTaskManagerParameters(
- resourceProfile.getCpuCores() < 1.0 ?
taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
- taskManagerParameters.gpus(),
- taskManagerParameters.containerType(),
- taskManagerParameters.containerImageName(),
- new ContaineredTaskManagerParameters(
- ResourceProfile.UNKNOWN.equals(resourceProfile)
? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() :
resourceProfile.getMemoryInMB(),
- ResourceProfile.UNKNOWN.equals(resourceProfile)
? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() :
resourceProfile.getHeapMemoryInMB(),
- ResourceProfile.UNKNOWN.equals(resourceProfile)
?
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB()
: resourceProfile.getDirectMemoryInMB(),
- 1,
- new
HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
- taskManagerParameters.containerVolumes(),
- taskManagerParameters.dockerParameters(),
- taskManagerParameters.dockerForcePullImage(),
- taskManagerParameters.constraints(),
- taskManagerParameters.command(),
- taskManagerParameters.bootstrapCommand(),
- taskManagerParameters.getTaskManagerHostname(),
- taskManagerParameters.uris()
- );
-
- LOG.debug("LaunchableMesosWorker parameters: {}", params);
+ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID
taskID) {
+ LOG.debug("LaunchableMesosWorker parameters: {}",
taskManagerParameters);
LaunchableMesosWorker launchable =
new LaunchableMesosWorker(
artifactServer,
- params,
+ taskManagerParameters,
taskManagerContainerSpec,
taskID,
mesosConfig);
diff --git
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e21f0fc3374..5163724ed6d 100644
---
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -242,7 +242,7 @@ protected void closeTaskManagerConnection(ResourceID
resourceID, Exception cause
TestingMesosResourceManager resourceManager;
// domain objects for test purposes
- final ResourceProfile resourceProfile1 = new
ResourceProfile(1.0, 1);
+ final ResourceProfile resourceProfile1 =
ResourceProfile.UNKNOWN;
Protos.FrameworkID framework1 =
Protos.FrameworkID.newBuilder().setValue("framework1").build();
public Protos.SlaveID slave1 =
Protos.SlaveID.newBuilder().setValue("slave1").build();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index a89b9f92b17..5b133e7b624 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -50,6 +50,9 @@
public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0,
-1);
+ /** ResourceProfile which matches any other ResourceProfile. */
+ public static final ResourceProfile ANY = new
ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+
//
------------------------------------------------------------------------
/** How many cpu cores are needed, use double so we can specify cpu
like 0.1. */
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ac1181b1d1d..7929e3170b7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -71,6 +71,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -1019,9 +1020,10 @@ protected abstract void internalDeregisterApplication(
* Allocates a resource using the resource profile.
*
* @param resourceProfile The resource description
+ * @return Collection of {@link ResourceProfile} describing the
launched slots
*/
@VisibleForTesting
- public abstract void startNewWorker(ResourceProfile resourceProfile);
+ public abstract Collection<ResourceProfile>
startNewWorker(ResourceProfile resourceProfile);
/**
* Callback when a worker was started.
@@ -1051,9 +1053,9 @@ public void releaseResource(InstanceID instanceId,
Exception cause) {
}
@Override
- public void allocateResource(ResourceProfile resourceProfile)
throws ResourceManagerException {
+ public Collection<ResourceProfile>
allocateResource(ResourceProfile resourceProfile) {
validateRunsInMainThread();
- startNewWorker(resourceProfile);
+ return startNewWorker(resourceProfile);
}
@Override
@@ -1176,8 +1178,16 @@ public void reportPayload(ResourceID resourceID, Void
payload) {
// Resource Management
//
------------------------------------------------------------------------
- protected int getNumberPendingSlotRequests() {
- return slotManager.getNumberPendingSlotRequests();
+ protected int getNumberRequiredTaskManagerSlots() {
+ return slotManager.getNumberPendingTaskManagerSlots();
+ }
+
+ //
------------------------------------------------------------------------
+ // Helper methods
+ //
------------------------------------------------------------------------
+
+ protected static Collection<ResourceProfile> createSlotsPerWorker(int
numSlots) {
+ return Collections.nCopies(numSlots, ResourceProfile.ANY);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 420b89f39f2..064c2d361d2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -32,6 +32,9 @@
import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Collections;
+
/**
* A standalone implementation of the resource manager. Used when the system
is started in
* standalone mode (via scripts), rather than via a resource framework like
YARN or Mesos.
@@ -74,7 +77,8 @@ protected void
internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
}
@Override
- public void startNewWorker(ResourceProfile resourceProfile) {
+ public Collection<ResourceProfile> startNewWorker(ResourceProfile
resourceProfile) {
+ return Collections.emptyList();
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 17cf8c7907e..a8f212fe6d4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -25,10 +25,14 @@
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
+/**
+ * Class representing a pending slot request in the {@link SlotManager}.
+ */
public class PendingSlotRequest {
private final SlotRequest slotRequest;
@@ -36,11 +40,16 @@
@Nullable
private CompletableFuture<Acknowledge> requestFuture;
+ @Nullable
+ private PendingTaskManagerSlot pendingTaskManagerSlot;
+
/** Timestamp when this pending slot request has been created. */
private final long creationTimestamp;
public PendingSlotRequest(SlotRequest slotRequest) {
this.slotRequest = Preconditions.checkNotNull(slotRequest);
+ this.requestFuture = null;
+ this.pendingTaskManagerSlot = null;
creationTimestamp = System.currentTimeMillis();
}
@@ -78,4 +87,18 @@ public void setRequestFuture(@Nullable
CompletableFuture<Acknowledge> requestFut
public CompletableFuture<Acknowledge> getRequestFuture() {
return requestFuture;
}
+
+ @Nullable
+ public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() {
+ return pendingTaskManagerSlot;
+ }
+
+ public void assignPendingTaskManagerSlot(@Nonnull
PendingTaskManagerSlot pendingTaskManagerSlotToAssign) {
+ Preconditions.checkState(pendingTaskManagerSlot == null);
+ this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign;
+ }
+
+ public void unassignPendingTaskManagerSlot() {
+ this.pendingTaskManagerSlot = null;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
new file mode 100644
index 00000000000..ed207e963e3
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Represents a pending task manager slot in the {@link SlotManager}.
+ */
+public class PendingTaskManagerSlot {
+
+ private final TaskManagerSlotId taskManagerSlotId =
TaskManagerSlotId.generate();
+
+ private final ResourceProfile resourceProfile;
+
+ @Nullable
+ private PendingSlotRequest pendingSlotRequest;
+
+ public PendingTaskManagerSlot(ResourceProfile resourceProfile) {
+ this.resourceProfile = resourceProfile;
+ }
+
+ public TaskManagerSlotId getTaskManagerSlotId() {
+ return taskManagerSlotId;
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ public void assignPendingSlotRequest(@Nonnull PendingSlotRequest
pendingSlotRequestToAssign) {
+ Preconditions.checkState(pendingSlotRequest == null);
+ pendingSlotRequest = pendingSlotRequestToAssign;
+ }
+
+ public void unassignPendingSlotRequest() {
+ pendingSlotRequest = null;
+ }
+
+ @Nullable
+ public PendingSlotRequest getAssignedPendingSlotRequest() {
+ return pendingSlotRequest;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
index 84e7c4e785d..adf8f13db57 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
@@ -24,6 +24,8 @@
import org.apache.flink.runtime.instance.InstanceID;
import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import java.util.Collection;
+
/**
* Resource related actions which the {@link SlotManager} can perform.
*/
@@ -41,9 +43,10 @@
* Requests to allocate a resource with the given {@link
ResourceProfile}.
*
* @param resourceProfile for the to be allocated resource
+ * @return Collection of {@link ResourceProfile} describing the
allocated slots
* @throws ResourceManagerException if the resource cannot be allocated
*/
- void allocateResource(ResourceProfile resourceProfile) throws
ResourceManagerException;
+ Collection<ResourceProfile> allocateResource(ResourceProfile
resourceProfile) throws ResourceManagerException;
/**
* Notifies that an allocation failure has occurred.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index bab56609a1b..2ef2b2fefcc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -43,14 +43,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -99,6 +102,8 @@
/** Map of pending/unfulfilled slot allocation requests. */
private final HashMap<AllocationID, PendingSlotRequest>
pendingSlotRequests;
+ private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot>
pendingSlots;
+
/** ResourceManager's id. */
private ResourceManagerId resourceManagerId;
@@ -130,6 +135,7 @@ public SlotManager(
taskManagerRegistrations = new HashMap<>(4);
fulfilledSlotRequests = new HashMap<>(16);
pendingSlotRequests = new HashMap<>(16);
+ pendingSlots = new HashMap<>(16);
resourceManagerId = null;
resourceActions = null;
@@ -168,8 +174,13 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
}
}
- public int getNumberPendingSlotRequests() {
- return pendingSlotRequests.size();
+ public int getNumberPendingTaskManagerSlots() {
+ return pendingSlots.size();
+ }
+
+ @VisibleForTesting
+ int getNumberAssignedPendingTaskManagerSlots() {
+ return (int) pendingSlots.values().stream().filter(slot ->
slot.getAssignedPendingSlotRequest() != null).count();
}
//
---------------------------------------------------------------------------------------------
@@ -530,14 +541,50 @@ private void registerSlot(
removeSlot(slotId);
}
- TaskManagerSlot slot = new TaskManagerSlot(
+ final TaskManagerSlot slot =
createAndRegisterTaskManagerSlot(slotId, resourceProfile,
taskManagerConnection);
+
+ final PendingTaskManagerSlot pendingTaskManagerSlot;
+
+ if (allocationId == null) {
+ pendingTaskManagerSlot =
findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
+ } else {
+ pendingTaskManagerSlot = null;
+ }
+
+ if (pendingTaskManagerSlot == null) {
+ updateSlot(slotId, allocationId, jobId);
+ } else {
+
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
+ final PendingSlotRequest assignedPendingSlotRequest =
pendingTaskManagerSlot.getAssignedPendingSlotRequest();
+
+ if (assignedPendingSlotRequest == null) {
+ handleFreeSlot(slot);
+ } else {
+
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
+ allocateSlot(slot, assignedPendingSlotRequest);
+ }
+ }
+ }
+
+ @Nonnull
+ private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId,
ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
+ final TaskManagerSlot slot = new TaskManagerSlot(
slotId,
resourceProfile,
taskManagerConnection);
-
slots.put(slotId, slot);
+ return slot;
+ }
- updateSlot(slotId, allocationId, jobId);
+ @Nullable
+ private PendingTaskManagerSlot
findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
+ for (PendingTaskManagerSlot pendingTaskManagerSlot :
pendingSlots.values()) {
+ if
(pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+ return pendingTaskManagerSlot;
+ }
+ }
+
+ return null;
}
/**
@@ -595,7 +642,11 @@ private void updateSlotState(
slot.updateAllocation(allocationId, jobId);
// remove the pending request
if any as it has been assigned
-
pendingSlotRequests.remove(allocationId);
+ final PendingSlotRequest
actualPendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+ if (actualPendingSlotRequest !=
null) {
+
cancelPendingSlotRequest(actualPendingSlotRequest);
+ }
// this will try to find a new
slot for the request
rejectPendingSlotRequest(
@@ -650,13 +701,54 @@ private void updateSlotState(
* @throws ResourceManagerException if the resource manager cannot
allocate more resource
*/
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
throws ResourceManagerException {
- TaskManagerSlot taskManagerSlot =
findMatchingSlot(pendingSlotRequest.getResourceProfile());
+ final ResourceProfile resourceProfile =
pendingSlotRequest.getResourceProfile();
+ TaskManagerSlot taskManagerSlot =
findMatchingSlot(resourceProfile);
if (taskManagerSlot != null) {
allocateSlot(taskManagerSlot, pendingSlotRequest);
} else {
-
resourceActions.allocateResource(pendingSlotRequest.getResourceProfile());
+ Optional<PendingTaskManagerSlot>
pendingTaskManagerSlotOptional =
findFreeMatchingPendingTaskManagerSlot(resourceProfile);
+
+ if (!pendingTaskManagerSlotOptional.isPresent()) {
+ pendingTaskManagerSlotOptional =
allocateResource(resourceProfile);
+ }
+
+
pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot ->
assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
+ }
+ }
+
+ private Optional<PendingTaskManagerSlot>
findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile)
{
+ for (PendingTaskManagerSlot pendingTaskManagerSlot :
pendingSlots.values()) {
+ if
(pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null &&
pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile))
{
+ return Optional.of(pendingTaskManagerSlot);
+ }
}
+
+ return Optional.empty();
+ }
+
+ private Optional<PendingTaskManagerSlot>
allocateResource(ResourceProfile resourceProfile) throws
ResourceManagerException {
+ final Collection<ResourceProfile> requestedSlots =
resourceActions.allocateResource(resourceProfile);
+
+ if (requestedSlots.isEmpty()) {
+ return Optional.empty();
+ } else {
+ final Iterator<ResourceProfile> slotIterator =
requestedSlots.iterator();
+ final PendingTaskManagerSlot pendingTaskManagerSlot =
new PendingTaskManagerSlot(slotIterator.next());
+
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(),
pendingTaskManagerSlot);
+
+ while (slotIterator.hasNext()) {
+ final PendingTaskManagerSlot
additionalPendingTaskManagerSlot = new
PendingTaskManagerSlot(slotIterator.next());
+
pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(),
additionalPendingTaskManagerSlot);
+ }
+
+ return Optional.of(pendingTaskManagerSlot);
+ }
+ }
+
+ private void assignPendingTaskManagerSlot(PendingSlotRequest
pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) {
+
pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
+
pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot);
}
/**
@@ -680,6 +772,8 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot,
PendingSlotRequest pe
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
+ returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
+
TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceID);
if (taskManagerRegistration == null) {
@@ -733,6 +827,14 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot,
PendingSlotRequest pe
mainThreadExecutor);
}
+ private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest
pendingSlotRequest) {
+ final PendingTaskManagerSlot pendingTaskManagerSlot =
pendingSlotRequest.getAssignedPendingTaskManagerSlot();
+ if (pendingTaskManagerSlot != null) {
+ pendingTaskManagerSlot.unassignPendingSlotRequest();
+ pendingSlotRequest.unassignPendingTaskManagerSlot();
+ }
+ }
+
/**
* Handles a free slot. It first tries to find a pending slot request
which can be fulfilled.
* If there is no such request, then it will add the slot to the set of
free slots.
@@ -886,6 +988,8 @@ private void rejectPendingSlotRequest(PendingSlotRequest
pendingSlotRequest, Exc
private void cancelPendingSlotRequest(PendingSlotRequest
pendingSlotRequest) {
CompletableFuture<Acknowledge> request =
pendingSlotRequest.getRequestFuture();
+ returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
+
if (null != request) {
request.cancel(false);
}
@@ -911,9 +1015,10 @@ private void checkTaskManagerTimeouts() {
}
// second we trigger the release resource callback
which can decide upon the resource release
+ final FlinkException cause = new
FlinkException("TaskExecutor exceeded the idle timeout.");
for (InstanceID timedOutTaskManagerId :
timedOutTaskManagerIds) {
LOG.debug("Release TaskExecutor {} because it
exceeded the idle timeout.", timedOutTaskManagerId);
-
resourceActions.releaseResource(timedOutTaskManagerId, new
FlinkException("TaskExecutor exceeded the idle timeout."));
+
resourceActions.releaseResource(timedOutTaskManagerId, cause);
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
new file mode 100644
index 00000000000..3084b3e8ed9
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Id of {@link TaskManagerSlot} and {@link PendingTaskManagerSlot}.
+ */
+public class TaskManagerSlotId extends AbstractID {
+
+ private static final long serialVersionUID = -4024240625523472071L;
+
+ private TaskManagerSlotId() {}
+
+ public static TaskManagerSlotId generate() {
+ return new TaskManagerSlotId();
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 0e98e44ab0d..c46d800bc95 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -246,7 +246,7 @@ public static TaskManagerServices fromConfiguration(
final List<ResourceProfile> resourceProfiles = new
ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
for (int i = 0; i <
taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
- resourceProfiles.add(new ResourceProfile(1.0, 42));
+ resourceProfiles.add(ResourceProfile.ANY);
}
final TimerService<AllocationID> timerService = new
TimerService<>(
@@ -259,7 +259,6 @@ public static TaskManagerServices fromConfiguration(
final JobLeaderService jobLeaderService = new
JobLeaderService(taskManagerLocation);
-
final String[] stateRootDirectoryStrings =
taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new
File[stateRootDirectoryStrings.length];
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 0b56231d4a4..e8207019a32 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -32,6 +32,9 @@
import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Collections;
+
/**
* Simple {@link ResourceManager} implementation for testing purposes.
*/
@@ -71,8 +74,8 @@ protected void
internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
}
@Override
- public void startNewWorker(ResourceProfile resourceProfile) {
- // noop
+ public Collection<ResourceProfile> startNewWorker(ResourceProfile
resourceProfile) {
+ return Collections.emptyList();
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8a7f733a3fa..33a696a1d24 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -47,6 +47,7 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -54,7 +55,9 @@
import javax.annotation.Nonnull;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -67,6 +70,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -84,9 +88,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -103,9 +105,9 @@
@Test
public void testTaskManagerRegistration() throws Exception {
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder().build();
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new
TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -135,14 +137,12 @@ public void testTaskManagerUnregistration() throws
Exception {
final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
final JobID jobId = new JobID();
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(new CompletableFuture<>());
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ assertThat(tuple5.f4,
is(equalTo(resourceManagerId)));
+ return new CompletableFuture<>();
+ })
+ .createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new
TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -202,13 +202,16 @@ public void testSlotRequestWithoutFreeSlots() throws
Exception {
resourceProfile,
"localhost");
- ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ CompletableFuture<ResourceProfile> allocateResourceFuture = new
CompletableFuture<>();
+ ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+
.setAllocateResourceConsumer(allocateResourceFuture::complete)
+ .build();
try (SlotManager slotManager =
createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerSlotRequest(slotRequest);
-
verify(resourceManagerActions).allocateResource(eq(resourceProfile));
+ assertThat(allocateResourceFuture.get(),
is(equalTo(resourceProfile)));
}
}
@@ -225,8 +228,11 @@ public void testSlotRequestWithResourceAllocationFailure()
throws Exception {
resourceProfile,
"localhost");
- ResourceActions resourceManagerActions =
mock(ResourceActions.class);
- doThrow(new ResourceManagerException("Test
exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+ ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(value -> {
+ throw new ResourceManagerException("Test
exception");
+ })
+ .build();
try (SlotManager slotManager =
createSlotManager(resourceManagerId, resourceManagerActions)) {
@@ -257,19 +263,17 @@ public void testSlotRequestWithFreeSlot() throws
Exception {
resourceProfile,
targetAddress);
- ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder().build();
try (SlotManager slotManager =
createSlotManager(resourceManagerId, resourceManagerActions)) {
-
+ final CompletableFuture<Tuple5<SlotID, JobID,
AllocationID, String, ResourceManagerId>> requestFuture = new
CompletableFuture<>();
// accept an incoming slot request
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- eq(slotId),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
-
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+
requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3,
tuple5.f4));
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection =
new TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -282,7 +286,7 @@ public void testSlotRequestWithFreeSlot() throws Exception {
assertTrue("The slot request should be accepted",
slotManager.registerSlotRequest(slotRequest));
- verify(taskExecutorGateway).requestSlot(eq(slotId),
eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId),
any(Time.class));
+ assertThat(requestFuture.get(),
is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress,
resourceManagerId))));
TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -302,14 +306,9 @@ public void testUnregisterPendingSlotRequest() throws
Exception {
final SlotID slotId = new SlotID(resourceID, 0);
final AllocationID allocationId = new AllocationID();
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(new CompletableFuture<>());
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 ->
new CompletableFuture<>())
+ .createTestingTaskExecutorGateway();
final ResourceProfile resourceProfile = new
ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId,
resourceProfile);
@@ -357,17 +356,19 @@ public void testFulfillingPendingSlotRequest() throws
Exception {
resourceProfile,
targetAddress);
- ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final AtomicInteger numberAllocateResourceCalls = new
AtomicInteger(0);
+ ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(ignored ->
numberAllocateResourceCalls.incrementAndGet())
+ .build();
+ final CompletableFuture<Tuple5<SlotID, JobID, AllocationID,
String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
// accept an incoming slot request
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- eq(slotId),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
-
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestFuture.complete(Tuple5.of(tuple5.f0,
tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new
TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -378,13 +379,13 @@ public void testFulfillingPendingSlotRequest() throws
Exception {
assertTrue("The slot request should be accepted",
slotManager.registerSlotRequest(slotRequest));
- verify(resourceManagerActions,
times(1)).allocateResource(eq(resourceProfile));
+ assertThat(numberAllocateResourceCalls.get(), is(1));
slotManager.registerTaskManager(
taskExecutorConnection,
slotReport);
- verify(taskExecutorGateway).requestSlot(eq(slotId),
eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId),
any(Time.class));
+ assertThat(requestFuture.get(),
is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress,
resourceManagerId))));
TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -444,7 +445,10 @@ public void testFreeSlot() throws Exception {
@Test
public void testDuplicatePendingSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final AtomicInteger numberAllocateResourceFunctionCalls = new
AtomicInteger(0);
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(resourceProfile ->
numberAllocateResourceFunctionCalls.incrementAndGet())
+ .build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = new
ResourceProfile(1.0, 2);
final ResourceProfile resourceProfile2 = new
ResourceProfile(2.0, 1);
@@ -458,7 +462,7 @@ public void testDuplicatePendingSlotRequest() throws
Exception {
// check that we have only called the resource allocation only
for the first slot request,
// since the second request is a duplicate
- verify(resourceManagerActions,
times(1)).allocateResource(any(ResourceProfile.class));
+ assertThat(numberAllocateResourceFunctionCalls.get(), is(1));
}
/**
@@ -497,21 +501,17 @@ public void
testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
@Test
public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation()
throws Exception {
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final AtomicInteger allocateResourceCalls = new
AtomicInteger(0);
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(resourceProfile ->
allocateResourceCalls.incrementAndGet())
+ .build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = new
ResourceProfile(1.0, 2);
final ResourceProfile resourceProfile2 = new
ResourceProfile(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(),
allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(),
allocationId, resourceProfile2, "barfoo");
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
-
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
@@ -534,7 +534,7 @@ public void
testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex
// check that we have only called the resource allocation only
for the first slot request,
// since the second request is a duplicate
- verify(resourceManagerActions,
never()).allocateResource(any(ResourceProfile.class));
+ assertThat(allocateResourceCalls.get(), is(0));
}
/**
@@ -544,21 +544,17 @@ public void
testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex
@Test
public void testAcceptingDuplicateSlotRequestAfterAllocationRelease()
throws Exception {
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final AtomicInteger allocateResourceCalls = new
AtomicInteger(0);
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(resourceProfile ->
allocateResourceCalls.incrementAndGet())
+ .build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = new
ResourceProfile(1.0, 2);
final ResourceProfile resourceProfile2 = new
ResourceProfile(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(),
allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(),
allocationId, resourceProfile2, "barfoo");
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
-
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new
TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -588,7 +584,7 @@ public void
testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exc
// check that we have only called the resource allocation only
for the first slot request,
// since the second request is a duplicate
- verify(resourceManagerActions,
never()).allocateResource(any(ResourceProfile.class));
+ assertThat(allocateResourceCalls.get(), is(0));
}
/**
@@ -624,7 +620,7 @@ public void testReceivingUnknownSlotReport() throws
Exception {
@Test
public void testUpdateSlotReport() throws Exception {
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
@@ -678,13 +674,16 @@ public void testUpdateSlotReport() throws Exception {
*/
@Test
public void testTaskManagerTimeout() throws Exception {
- final long tmTimeout = 500L;
+ final long tmTimeout = 10L;
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final CompletableFuture<InstanceID> releaseFuture = new
CompletableFuture<>();
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setReleaseResourceConsumer((instanceID, e) ->
releaseFuture.complete(instanceID))
+ .build();
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskManagerConnection = new
TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotID slotId = new SlotID(resourceID, 0);
@@ -702,15 +701,9 @@ public void testTaskManagerTimeout() throws Exception {
slotManager.start(resourceManagerId,
mainThreadExecutor, resourceManagerActions);
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
-
slotManager.registerTaskManager(taskManagerConnection, slotReport);
- }
- });
+ mainThreadExecutor.execute(() ->
slotManager.registerTaskManager(taskManagerConnection, slotReport));
- verify(resourceManagerActions, timeout(100L *
tmTimeout).times(1))
-
.releaseResource(eq(taskManagerConnection.getInstanceID()),
any(Exception.class));
+ assertThat(releaseFuture.get(),
is(equalTo(taskManagerConnection.getInstanceID())));
}
}
@@ -723,7 +716,10 @@ public void run() {
public void testSlotRequestTimeout() throws Exception {
final long allocationTimeout = 50L;
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final CompletableFuture<Tuple2<JobID, AllocationID>>
failedAllocationFuture = new CompletableFuture<>();
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setNotifyAllocationFailureConsumer(tuple3 ->
failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1)))
+ .build();
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
@@ -743,21 +739,15 @@ public void testSlotRequestTimeout() throws Exception {
final AtomicReference<Exception> atomicException = new
AtomicReference<>(null);
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
assertTrue(slotManager.registerSlotRequest(slotRequest));
- } catch (Exception e) {
-
atomicException.compareAndSet(null, e);
- }
+ mainThreadExecutor.execute(() -> {
+ try {
+
assertTrue(slotManager.registerSlotRequest(slotRequest));
+ } catch (Exception e) {
+ atomicException.compareAndSet(null, e);
}
});
- verify(resourceManagerActions, timeout(100L *
allocationTimeout).times(1)).notifyAllocationFailure(
- eq(jobId),
- eq(allocationId),
- any(TimeoutException.class));
+ assertThat(failedAllocationFuture.get(),
is(equalTo(Tuple2.of(jobId, allocationId))));
if (atomicException.get() != null) {
throw atomicException.get();
@@ -851,7 +841,7 @@ public void testTaskManagerSlotRequestTimeoutHandling()
throws Exception {
public void testSlotReportWhileActiveSlotRequest() throws Exception {
final long verifyTimeout = 10000L;
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
@@ -966,10 +956,12 @@ public void testSlotReportWhileActiveSlotRequest() throws
Exception {
@Test
public void testTimeoutForUnusedTaskManager() throws Exception {
final long taskManagerTimeout = 50L;
- final long verifyTimeout = taskManagerTimeout * 10L;
+ final CompletableFuture<InstanceID> releasedResourceFuture =
new CompletableFuture<>();
+ final ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+ .setReleaseResourceConsumer((instanceID, e) ->
releasedResourceFuture.complete(instanceID))
+ .build();
final ResourceManagerId resourceManagerId =
ResourceManagerId.generate();
- final ResourceActions resourceManagerActions =
mock(ResourceActions.class);
final ScheduledExecutor scheduledExecutor =
TestingUtils.defaultScheduledExecutor();
final ResourceID resourceId = ResourceID.generate();
@@ -979,14 +971,13 @@ public void testTimeoutForUnusedTaskManager() throws
Exception {
final ResourceProfile resourceProfile = new
ResourceProfile(1.0, 1);
final SlotRequest slotRequest = new SlotRequest(jobId,
allocationId, resourceProfile, "foobar");
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
-
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final CompletableFuture<SlotID> requestedSlotFuture = new
CompletableFuture<>();
+ final TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestedSlotFuture.complete(tuple5.f0);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
final TaskExecutorConnection taskManagerConnection = new
TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -1015,17 +1006,9 @@ public void testTimeoutForUnusedTaskManager() throws
Exception {
}
},
mainThreadExecutor)
- .thenAccept((Object value) ->
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
+ .thenRun(() ->
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
- ArgumentCaptor<SlotID> slotIdArgumentCaptor =
ArgumentCaptor.forClass(SlotID.class);
-
- verify(taskExecutorGateway,
timeout(verifyTimeout)).requestSlot(
- slotIdArgumentCaptor.capture(),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
- any(Time.class));
+ final SlotID slotId = requestedSlotFuture.get();
CompletableFuture<Boolean> idleFuture =
CompletableFuture.supplyAsync(
() ->
slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
@@ -1034,8 +1017,6 @@ public void testTimeoutForUnusedTaskManager() throws
Exception {
// check that the TaskManager is not idle
assertFalse(idleFuture.get());
- final SlotID slotId = slotIdArgumentCaptor.getValue();
-
CompletableFuture<TaskManagerSlot> slotFuture =
CompletableFuture.supplyAsync(
() -> slotManager.getSlot(slotId),
mainThreadExecutor);
@@ -1052,7 +1033,7 @@ public void testTimeoutForUnusedTaskManager() throws
Exception {
assertTrue(idleFuture2.get());
- verify(resourceManagerActions,
timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()),
any(Exception.class));
+ assertThat(releasedResourceFuture.get(),
is(equalTo(taskManagerConnection.getInstanceID())));
}
}
@@ -1109,7 +1090,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots()
throws Exception {
@Test
public void testReportAllocatedSlot() throws Exception {
final ResourceID taskManagerId = ResourceID.generate();
- final ResourceActions resourceActions =
mock(ResourceActions.class);
+ final ResourceActions resourceActions = new
TestingResourceActionsBuilder().build();
final TestingTaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new
TaskExecutorConnection(taskManagerId, taskExecutorGateway);
@@ -1167,7 +1148,7 @@ public void testReportAllocatedSlot() throws Exception {
@Test
public void testSlotRequestFailure() throws Exception {
try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(),
- new
TestingResourceActionsBuilder().createTestingResourceActions())) {
+ new TestingResourceActionsBuilder().build())) {
final SlotRequest slotRequest = new SlotRequest(new
JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest);
@@ -1222,7 +1203,7 @@ public void testSlotRequestFailure() throws Exception {
@Test
public void testSlotRequestRemovedIfTMReportAllocation() throws
Exception {
try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(),
- new
TestingResourceActionsBuilder().createTestingResourceActions())) {
+ new TestingResourceActionsBuilder().build())) {
final JobID jobID = new JobID();
final SlotRequest slotRequest1 = new SlotRequest(jobID,
new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
@@ -1293,7 +1274,7 @@ public void
testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti
.setNotifyAllocationFailureConsumer(
(Tuple3<JobID, AllocationID, Exception>
failureMessage) ->
allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
- .createTestingResourceActions();
+ .build();
try (final SlotManager slotManager = createSlotManager(
ResourceManagerId.generate(),
@@ -1374,17 +1355,27 @@ public void
testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti
@Nonnull
private SlotReport createSlotReport(ResourceID taskExecutorResourceId,
int numberSlots) {
+ return createSlotReport(taskExecutorResourceId, numberSlots,
ResourceProfile.UNKNOWN);
+ }
+
+ @Nonnull
+ private SlotReport createSlotReport(ResourceID taskExecutorResourceId,
int numberSlots, ResourceProfile resourceProfile) {
final Set<SlotStatus> slotStatusSet = new
HashSet<>(numberSlots);
for (int i = 0; i < numberSlots; i++) {
- slotStatusSet.add(new SlotStatus(new
SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN));
+ slotStatusSet.add(new SlotStatus(new
SlotID(taskExecutorResourceId, i), resourceProfile));
}
return new SlotReport(slotStatusSet);
}
@Nonnull
- private SlotRequest createSlotRequest(JobID jobId1) {
- return new SlotRequest(jobId1, new AllocationID(),
ResourceProfile.UNKNOWN, "foobar1");
+ private SlotRequest createSlotRequest(JobID jobId) {
+ return createSlotRequest(jobId, ResourceProfile.UNKNOWN);
+ }
+
+ @Nonnull
+ private SlotRequest createSlotRequest(JobID jobId, ResourceProfile
resourceProfile) {
+ return new SlotRequest(jobId, new AllocationID(),
resourceProfile, "foobar1");
}
private SlotManager createSlotManager(ResourceManagerId
resourceManagerId, ResourceActions resourceManagerActions) {
@@ -1398,4 +1389,171 @@ private SlotManager createSlotManager(ResourceManagerId
resourceManagerId, Resou
return slotManager;
}
+
+ /**
+ * Tests that we only request new resources/containers once we have
assigned
+ * all pending task manager slots.
+ */
+ @Test
+ public void testRequestNewResources() throws Exception {
+ final int numberSlots = 2;
+ final AtomicInteger resourceRequests = new AtomicInteger(0);
+ final TestingResourceActions testingResourceActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(
+ convert(ignored -> {
+ resourceRequests.incrementAndGet();
+ return numberSlots;
+ }))
+ .build();
+
+ try (final SlotManager slotManager = createSlotManager(
+ ResourceManagerId.generate(),
+ testingResourceActions)) {
+
+ final JobID jobId = new JobID();
+
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+ assertThat(resourceRequests.get(), is(1));
+
+ // the second slot request should not try to allocate a
new resource because the
+ // previous resource was started with 2 slots.
+
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+ assertThat(resourceRequests.get(), is(1));
+
+
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2));
+
+
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+ assertThat(resourceRequests.get(), is(2));
+ }
+ }
+
+ /**
+ * Tests that a failing allocation/slot request will return the pending
task manager slot.
+ */
+ @Test
+ public void testFailingAllocationReturnsPendingTaskManagerSlot() throws
Exception {
+ final int numberSlots = 2;
+ final TestingResourceActions resourceActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(convert(value ->
numberSlots))
+ .build();
+ try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+ final JobID jobId = new JobID();
+
+ final SlotRequest slotRequest =
createSlotRequest(jobId);
+
assertThat(slotManager.registerSlotRequest(slotRequest), is(true));
+
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+
+
slotManager.unregisterSlotRequest(slotRequest.getAllocationId());
+
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
+ }
+ }
+
+ /**
+ * Tests the completion of pending task manager slots by registering a
TaskExecutor.
+ */
+ @Test
+ public void testPendingTaskManagerSlotCompletion() throws Exception {
+ final int numberSlots = 3;
+ final TestingResourceActions resourceActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(convert(value ->
numberSlots))
+ .build();
+
+ try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+ final JobID jobId = new JobID();
+
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+ assertThat(slotManager.getNumberRegisteredSlots(),
is(0));
+
+ final TaskExecutorConnection taskExecutorConnection =
createTaskExecutorConnection();
+ final SlotReport slotReport =
createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1);
+
+ slotManager.registerTaskManager(taskExecutorConnection,
slotReport);
+
+ assertThat(slotManager.getNumberRegisteredSlots(),
is(numberSlots - 1));
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
+ }
+ }
+
+ private TaskExecutorConnection createTaskExecutorConnection() {
+ final TestingTaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ return new TaskExecutorConnection(ResourceID.generate(),
taskExecutorGateway);
+ }
+
+ /**
+ * Tests that a different slot can fulfill a pending slot request. If
the
+ * pending slot request has a pending task manager slot assigned, it
should
+ * be freed.
+ */
+ @Test
+ public void testRegistrationOfDifferentSlot() throws Exception {
+ final int numberSlots = 1;
+ final TestingResourceActions resourceActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(convert(value ->
numberSlots))
+ .build();
+
+ try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+ final JobID jobId = new JobID();
+ final ResourceProfile requestedSlotProfile = new
ResourceProfile(1.0, 1);
+
+
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId,
requestedSlotProfile)), is(true));
+
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
+ final int numberOfferedSlots = 1;
+ final TaskExecutorConnection taskExecutorConnection =
createTaskExecutorConnection();
+ final ResourceProfile offeredSlotProfile = new
ResourceProfile(2.0, 2);
+ final SlotReport slotReport =
createSlotReport(taskExecutorConnection.getResourceID(), numberOfferedSlots,
offeredSlotProfile);
+
+ slotManager.registerTaskManager(taskExecutorConnection,
slotReport);
+
+ assertThat(slotManager.getNumberRegisteredSlots(),
is(numberOfferedSlots));
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
+ }
+ }
+
+ /**
+ * Tests that only free slots can fulfill/complete a pending task
manager slot.
+ */
+ @Test
+ public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws
Exception {
+ final int numberSlots = 1;
+ final TestingResourceActions resourceActions = new
TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(convert(value ->
numberSlots))
+ .build();
+
+ try (final SlotManager slotManager =
createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+ final JobID jobId = new JobID();
+
assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+
+ final TaskExecutorConnection taskExecutorConnection =
createTaskExecutorConnection();
+ final SlotID slotId = new
SlotID(taskExecutorConnection.getResourceID(), 0);
+ final SlotStatus slotStatus = new SlotStatus(slotId,
ResourceProfile.UNKNOWN, jobId, new AllocationID());
+ final SlotReport slotReport = new
SlotReport(slotStatus);
+
+ slotManager.registerTaskManager(taskExecutorConnection,
slotReport);
+
+ assertThat(slotManager.getNumberRegisteredSlots(),
is(1));
+
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+ }
+ }
+
+ private static FunctionWithException<ResourceProfile,
Collection<ResourceProfile>, ResourceManagerException>
convert(FunctionWithException<ResourceProfile, Integer,
ResourceManagerException> function) {
+ return (ResourceProfile resourceProfile) -> {
+ final int slots = function.apply(resourceProfile);
+
+ final ArrayList<ResourceProfile> result = new
ArrayList<>(slots);
+ for (int i = 0; i < slots; i++) {
+ result.add(resourceProfile);
+ }
+
+ return result;
+ };
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 8f6317c427f..66966cc3eff 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,13 +33,13 @@
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
-import org.mockito.Mockito;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@@ -47,12 +47,13 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+/**
+ * Tests for the slot allocation protocol.
+ */
public class SlotProtocolTest extends TestLogger {
private static final long timeout = 10000L;
@@ -87,7 +88,10 @@ public void testSlotsUnavailableRequest() throws Exception {
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime())) {
- ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ final CompletableFuture<ResourceProfile>
resourceProfileFuture = new CompletableFuture<>();
+ ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder()
+
.setAllocateResourceConsumer(resourceProfileFuture::complete)
+ .build();
slotManager.start(rmLeaderID,
Executors.directExecutor(), resourceManagerActions);
@@ -99,14 +103,16 @@ public void testSlotsUnavailableRequest() throws Exception
{
slotManager.registerSlotRequest(slotRequest);
-
verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile()));
+ assertThat(resourceProfileFuture.get(),
is(equalTo(slotRequest.getResourceProfile())));
// slot becomes available
- TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- Mockito.when(
- taskExecutorGateway
- .requestSlot(any(SlotID.class),
any(JobID.class), any(AllocationID.class), any(String.class),
any(ResourceManagerId.class), any(Time.class)))
- .thenReturn(mock(CompletableFuture.class));
+ final CompletableFuture<Tuple3<SlotID, JobID,
AllocationID>> requestFuture = new CompletableFuture<>();
+ TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+
requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+ return new CompletableFuture<>();
+ })
+ .createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
final SlotID slotID = new SlotID(resourceID, 0);
@@ -119,8 +125,7 @@ public void testSlotsUnavailableRequest() throws Exception {
slotManager.registerTaskManager(new
TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
// 4) Slot becomes available and TaskExecutor gets a
SlotRequest
- verify(taskExecutorGateway, timeout(5000L))
- .requestSlot(eq(slotID), eq(jobID),
eq(allocationID), any(String.class), any(ResourceManagerId.class),
any(Time.class));
+ assertThat(requestFuture.get(),
is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
}
}
@@ -137,11 +142,13 @@ public void testSlotAvailableRequest() throws Exception {
final ResourceManagerId rmLeaderID =
ResourceManagerId.generate();
- TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
- Mockito.when(
- taskExecutorGateway
- .requestSlot(any(SlotID.class),
any(JobID.class), any(AllocationID.class), any(String.class),
any(ResourceManagerId.class), any(Time.class)))
- .thenReturn(mock(CompletableFuture.class));
+ final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>>
requestFuture = new CompletableFuture<>();
+ TaskExecutorGateway taskExecutorGateway = new
TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestFuture.complete(Tuple3.of(tuple5.f0,
tuple5.f1, tuple5.f2));
+ return new CompletableFuture<>();
+ })
+ .createTestingTaskExecutorGateway();
try (SlotManager slotManager = new SlotManager(
scheduledExecutor,
@@ -149,7 +156,7 @@ public void testSlotAvailableRequest() throws Exception {
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime())) {
- ResourceActions resourceManagerActions =
mock(ResourceActions.class);
+ ResourceActions resourceManagerActions = new
TestingResourceActionsBuilder().build();
slotManager.start(rmLeaderID,
Executors.directExecutor(), resourceManagerActions);
@@ -172,8 +179,7 @@ public void testSlotAvailableRequest() throws Exception {
slotManager.registerSlotRequest(slotRequest);
// a SlotRequest is routed to the TaskExecutor
- verify(taskExecutorGateway, timeout(5000))
- .requestSlot(eq(slotID), eq(jobID),
eq(allocationID), any(String.class), any(ResourceManagerId.class),
any(Time.class));
+ assertThat(requestFuture.get(),
is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
index 8b7c8026f92..4c6f14c1c40 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
@@ -23,9 +23,12 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nonnull;
+import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -38,29 +41,28 @@
private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
@Nonnull
- private final Consumer<ResourceProfile> allocateResourceConsumer;
+ private final FunctionWithException<ResourceProfile,
Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction;
@Nonnull
private final Consumer<Tuple3<JobID, AllocationID, Exception>>
notifyAllocationFailureConsumer;
public TestingResourceActions(
@Nonnull BiConsumer<InstanceID, Exception>
releaseResourceConsumer,
- @Nonnull Consumer<ResourceProfile>
allocateResourceConsumer,
+ @Nonnull FunctionWithException<ResourceProfile,
Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction,
@Nonnull Consumer<Tuple3<JobID, AllocationID,
Exception>> notifyAllocationFailureConsumer) {
this.releaseResourceConsumer = releaseResourceConsumer;
- this.allocateResourceConsumer = allocateResourceConsumer;
+ this.allocateResourceFunction = allocateResourceFunction;
this.notifyAllocationFailureConsumer =
notifyAllocationFailureConsumer;
}
-
@Override
public void releaseResource(InstanceID instanceId, Exception cause) {
releaseResourceConsumer.accept(instanceId, cause);
}
@Override
- public void allocateResource(ResourceProfile resourceProfile) {
- allocateResourceConsumer.accept(resourceProfile);
+ public Collection<ResourceProfile> allocateResource(ResourceProfile
resourceProfile) throws ResourceManagerException {
+ return allocateResourceFunction.apply(resourceProfile);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
index 2c1d47e8c88..ac7afd4283e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
@@ -23,7 +23,11 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.function.FunctionWithException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -32,7 +36,7 @@
*/
public class TestingResourceActionsBuilder {
private BiConsumer<InstanceID, Exception> releaseResourceConsumer =
(ignoredA, ignoredB) -> {};
- private Consumer<ResourceProfile> allocateResourceConsumer = (ignored)
-> {};
+ private FunctionWithException<ResourceProfile,
Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction
= (ignored) -> Collections.singleton(ResourceProfile.UNKNOWN);
private Consumer<Tuple3<JobID, AllocationID, Exception>>
notifyAllocationFailureConsumer = (ignored) -> {};
public TestingResourceActionsBuilder
setReleaseResourceConsumer(BiConsumer<InstanceID, Exception>
releaseResourceConsumer) {
@@ -40,8 +44,16 @@ public TestingResourceActionsBuilder
setReleaseResourceConsumer(BiConsumer<Insta
return this;
}
+ public TestingResourceActionsBuilder
setAllocateResourceFunction(FunctionWithException<ResourceProfile,
Collection<ResourceProfile>, ResourceManagerException>
allocateResourceFunction) {
+ this.allocateResourceFunction = allocateResourceFunction;
+ return this;
+ }
+
public TestingResourceActionsBuilder
setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer)
{
- this.allocateResourceConsumer = allocateResourceConsumer;
+ this.allocateResourceFunction = (ResourceProfile
resourceProfile) -> {
+ allocateResourceConsumer.accept(resourceProfile);
+ return Collections.singleton(ResourceProfile.UNKNOWN);
+ };
return this;
}
@@ -50,7 +62,7 @@ public TestingResourceActionsBuilder
setNotifyAllocationFailureConsumer(Consumer
return this;
}
- public TestingResourceActions createTestingResourceActions() {
- return new TestingResourceActions(releaseResourceConsumer,
allocateResourceConsumer, notifyAllocationFailureConsumer);
+ public TestingResourceActions build() {
+ return new TestingResourceActions(releaseResourceConsumer,
allocateResourceFunction, notifyAllocationFailureConsumer);
}
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 956e40fe61b..2a43b8b7f1a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -58,6 +58,7 @@
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -115,6 +116,8 @@
private final Map<ResourceProfile, Integer> resourcePriorities = new
HashMap<>();
+ private final Collection<ResourceProfile> slotsPerWorker;
+
public YarnResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
@@ -163,6 +166,8 @@ public YarnResourceManager(
this.numberOfTaskSlots =
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
this.defaultTaskManagerMemoryMB =
ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
this.defaultCpus =
flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
+
+ this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
}
protected AMRMClientAsync<AMRMClient.ContainerRequest>
createAndStartResourceManagerClient(
@@ -283,7 +288,7 @@ protected void internalDeregisterApplication(
}
@Override
- public void startNewWorker(ResourceProfile resourceProfile) {
+ public Collection<ResourceProfile> startNewWorker(ResourceProfile
resourceProfile) {
// Priority for worker containers - priorities are
intra-application
//TODO: set priority according to the resource allocated
Priority priority =
Priority.newInstance(generatePriority(resourceProfile));
@@ -291,6 +296,8 @@ public void startNewWorker(ResourceProfile resourceProfile)
{
int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus :
(int) resourceProfile.getCpuCores();
Resource capability = Resource.newInstance(mem, vcore);
requestYarnContainer(capability, priority);
+
+ return slotsPerWorker;
}
@Override
@@ -334,7 +341,7 @@ public void onContainersCompleted(final
List<ContainerStatus> statuses) {
if (yarnWorkerNode != null) {
// Container completed
unexpectedly ~> start a new one
final Container container =
yarnWorkerNode.getContainer();
-
requestYarnContainer(container.getResource(),
yarnWorkerNode.getContainer().getPriority());
+
requestYarnContainerIfRequired(container.getResource(),
yarnWorkerNode.getContainer().getPriority());
}
// Eagerly close the connection with
task manager.
closeTaskManagerConnection(resourceId,
new Exception(containerStatus.getDiagnostics()));
@@ -375,7 +382,7 @@ public void onContainersAllocated(List<Container>
containers) {
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(container.getId());
// and ask for a new one
-
requestYarnContainer(container.getResource(), container.getPriority());
+
requestYarnContainerIfRequired(container.getResource(),
container.getPriority());
}
} else {
// return the excessive containers
@@ -446,21 +453,26 @@ private FinalApplicationStatus
getYarnStatus(ApplicationStatus status) {
/**
* Request new container if pending containers cannot satisfies pending
slot requests.
*/
+ private void requestYarnContainerIfRequired(Resource resource, Priority
priority) {
+ int requiredTaskManagerSlots =
getNumberRequiredTaskManagerSlots();
+ int pendingTaskManagerSlots = numPendingContainerRequests *
numberOfTaskSlots;
+
+ if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+ requestYarnContainer(resource, priority);
+ }
+ }
+
private void requestYarnContainer(Resource resource, Priority priority)
{
- int pendingSlotRequests = getNumberPendingSlotRequests();
- int pendingSlotAllocation = numPendingContainerRequests *
numberOfTaskSlots;
- if (pendingSlotRequests > pendingSlotAllocation) {
- resourceManagerClient.addContainerRequest(new
AMRMClient.ContainerRequest(resource, null, null, priority));
+ resourceManagerClient.addContainerRequest(new
AMRMClient.ContainerRequest(resource, null, null, priority));
- // make sure we transmit the request fast and receive
fast news of granted allocations
-
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+ // make sure we transmit the request fast and receive fast news
of granted allocations
+
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
- numPendingContainerRequests++;
+ numPendingContainerRequests++;
- log.info("Requesting new TaskExecutor container with
resources {}. Number pending requests {}.",
- resource,
- numPendingContainerRequests);
- }
+ log.info("Requesting new TaskExecutor container with resources
{}. Number pending requests {}.",
+ resource,
+ numPendingContainerRequests);
}
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource
resource, String containerId, String host)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Make SlotManager aware of multi slot TaskManagers
> -------------------------------------------------
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination, ResourceManager
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot
> request. The started {{TaskManager}} can be started with multiple slots
> configured but currently, the {{SlotManager}} thinks that it will be started
> with a single slot. As a consequence, it might issue multiple requests to
> start new TaskManagers even though a single one would be sufficient to
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a
> {{TaskManager}} is started with. That way the SlotManager only needs to
> request a new {{TaskManager}} if all of the previously started slots
> (potentially not yet registered and, thus, future slots) are being assigned
> to slot requests.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)