[FLINK-4839] [cluster management] JobManager handle TaskManager's slot offering

This closes #2647 #2643.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f891a6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f891a6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f891a6c

Branch: refs/heads/flip-6
Commit: 4f891a6c26847ac66c477853101de31eb75993f7
Parents: 7e30eab
Author: Kurt Young <ykt...@gmail.com>
Authored: Mon Oct 17 18:15:26 2016 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Oct 17 15:35:03 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java | 46 +++++++++---
 .../flink/runtime/jobmaster/JobMaster.java      | 50 ++++++++++++-
 .../runtime/jobmaster/JobMasterGateway.java     | 26 +++++--
 .../resourcemanager/ResourceManager.java        |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 34 +++++----
 .../runtime/taskexecutor/slot/SlotOffer.java    | 79 ++++++++++++++++++++
 .../runtime/taskexecutor/slot/TaskSlot.java     | 13 ++++
 .../taskexecutor/slot/TaskSlotTable.java        | 12 +--
 .../runtime/taskexecutor/TaskExecutorTest.java  | 16 +++-
 9 files changed, 231 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index de952c3..02166a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -267,7 +267,7 @@ public class SlotPool implements SlotOwner {
        }
 
        // 
------------------------------------------------------------------------
-       //  Slot De-allocation
+       //  Slot releasing & offering
        // 
------------------------------------------------------------------------
 
        /**
@@ -323,10 +323,6 @@ public class SlotPool implements SlotOwner {
                return null;
        }
 
-       // 
------------------------------------------------------------------------
-       //  Slot Releasing
-       // 
------------------------------------------------------------------------
-
        /**
         * Release slot to TaskManager, called for finished tasks or canceled 
jobs.
         *
@@ -340,10 +336,6 @@ public class SlotPool implements SlotOwner {
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  Slot Offering
-       // 
------------------------------------------------------------------------
-
        /**
         * Slot offering by TaskManager with AllocationID. The AllocationID is 
originally generated by this pool and
         * transfer through the ResourceManager to TaskManager. We use it to 
distinguish the different allocation
@@ -401,6 +393,39 @@ public class SlotPool implements SlotOwner {
        }
 
        // 
------------------------------------------------------------------------
+       //  Error Handling
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Fail the specified allocation and release the corresponding slot if 
we have one.
+        * This may triggered by JobManager when some slot allocation failed 
with timeout.
+        * Or this could be triggered by TaskManager, when it finds out 
something went wrong with the slot,
+        * and decided to take it back.
+        *
+        * @param allocationID Represents the allocation which should be failed
+        * @param cause        The cause of the failure
+        */
+       public void failAllocation(final AllocationID allocationID, final 
Exception cause) {
+               synchronized (lock) {
+                       // 1. check whether the allocation still pending
+                       Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
+                                       pendingRequests.get(allocationID);
+                       if (pendingRequest != null) {
+                               pendingRequest.f1.completeExceptionally(cause);
+                               return;
+                       }
+
+                       // 2. check whether we have a free slot corresponding 
to this allocation id
+                       // TODO: add allocation id to slot descriptor, so we 
can remove it by allocation id
+
+                       // 3. check whether we have a in-use slot corresponding 
to this allocation id
+                       // TODO: needs mechanism to release the in-use Slot but 
don't return it back to this pool
+
+                       // TODO: add some unit tests when the previous two are 
ready, the allocation may failed at any phase
+               }
+       }
+
+       // 
------------------------------------------------------------------------
        //  Resource
        // 
------------------------------------------------------------------------
 
@@ -464,12 +489,13 @@ public class SlotPool implements SlotOwner {
         */
        static class AllocatedSlots {
 
-               /** All allocated slots organized by TaskManager */
+               /** All allocated slots organized by TaskManager's id */
                private final Map<ResourceID, Set<Slot>> 
allocatedSlotsByResource;
 
                /** All allocated slots organized by Slot object */
                private final Map<Slot, AllocationID> allocatedSlots;
 
+               /** All allocated slot descriptors organized by Slot object */
                private final Map<Slot, SlotDescriptor> 
allocatedSlotsWithDescriptor;
 
                /** All allocated slots organized by AllocationID */

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 306a28a..56fa3e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotDescriptor;
 import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -85,6 +86,7 @@ import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -95,7 +97,9 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
@@ -669,13 +673,51 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        @RpcMethod
-       public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> 
slots, UUID leaderId) {
-               throw new UnsupportedOperationException("Has to be 
implemented.");
+       public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
+                       final Iterable<SlotOffer> slots, final UUID leaderId) 
throws Exception
+       {
+               if (!this.leaderSessionID.equals(leaderId)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderId);
+               }
+
+               Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = 
registeredTaskManagers.get(taskManagerId);
+               if (taskManager == null) {
+                       throw new Exception("Unknown TaskManager " + 
taskManagerId);
+               }
+
+               final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
+               for (SlotOffer slotOffer : slots) {
+                       final SlotDescriptor slotDescriptor = new 
SlotDescriptor(
+                                       jobGraph.getJobID(),
+                                       taskManager.f0,
+                                       slotOffer.getSlotIndex(),
+                                       slotOffer.getResourceProfile(),
+                                       null); // TODO: replace the actor 
gateway with the new rpc gateway, it's ready (taskManager.f1)
+                       if (slotPool.offerSlot(slotOffer.getAllocationId(), 
slotDescriptor)) {
+                               acceptedSlotOffers.add(slotOffer);
+                       }
+               }
+
+               return acceptedSlotOffers;
        }
 
        @RpcMethod
-       public void failSlot(final AllocationID allocationId, UUID leaderId, 
Exception cause) {
-               throw new UnsupportedOperationException("Has to be 
implemented.");
+       public void failSlot(final ResourceID taskManagerId,
+                       final AllocationID allocationId,
+                       final UUID leaderId,
+                       final Exception cause) throws Exception
+       {
+               if (!this.leaderSessionID.equals(leaderId)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderId);
+               }
+
+               if (!registeredTaskManagers.containsKey(taskManagerId)) {
+                       throw new Exception("Unknown TaskManager " + 
taskManagerId);
+               }
+
+               slotPool.failAllocation(allocationId, cause);
        }
 
        @RpcMethod

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4ee9f92..27308d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -177,21 +178,30 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
        /**
         * Offer the given slots to the job manager. The response contains the 
set of accepted slots.
         *
-        * @param slots to offer to the job manager
-        * @param leaderId identifying the job leader
-        * @param timeout for the rpc call
+        * @param taskManagerId identifying the task manager
+        * @param slots         to offer to the job manager
+        * @param leaderId      identifying the job leader
+        * @param timeout       for the rpc call
         * @return Future set of accepted slots.
         */
-       Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> 
slots, UUID leaderId, @RpcTimeout final Time timeout);
+       Future<Iterable<SlotOffer>> offerSlots(
+                       final ResourceID taskManagerId,
+                       final Iterable<SlotOffer> slots,
+                       final UUID leaderId,
+                       @RpcTimeout final Time timeout);
 
        /**
         * Fail the slot with the given allocation id and cause.
         *
-        * @param allocationId identifying the slot to fail
-        * @param leaderId identifying the job leader
-        * @param cause of the failing
+        * @param taskManagerId identifying the task manager
+        * @param allocationId  identifying the slot to fail
+        * @param leaderId      identifying the job leader
+        * @param cause         of the failing
         */
-       void failSlot(final AllocationID allocationId, UUID leaderId, Exception 
cause);
+       void failSlot(final ResourceID taskManagerId,
+                       final AllocationID allocationId,
+                       final UUID leaderId,
+                       final Exception cause);
 
        /**
         * Register the task manager at the job manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3122804..f1a5073 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -69,7 +69,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * ResourceManager implementation. The resource manager is responsible for 
resource de-/allocation
  * and bookkeeping.
  *
- * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
+ * It offers the following methods as part of its rpc interface to interact 
with him remotely:
  * <ul>
  *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a 
{@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from 
the resource manager</li>

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3e3a544..601d804 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -70,6 +70,8 @@ import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -630,47 +632,49 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                                final JobMasterGateway jobMasterGateway = 
jobManagerConnection.getJobManagerGateway();
 
-                               final Iterator<AllocationID> 
reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+                               final Iterator<TaskSlot> reservedSlotsIterator 
= taskSlotTable.getAllocatedSlots(jobId);
                                final UUID leaderId = 
jobManagerConnection.getLeaderId();
 
-                               final Collection<AllocationID> reservedSlots = 
new HashSet<>(2);
+                               final Collection<SlotOffer> reservedSlots = new 
HashSet<>(2);
 
                                while (reservedSlotsIterator.hasNext()) {
-                                       
reservedSlots.add(reservedSlotsIterator.next());
+                                       
reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
                                }
 
-                               Future<Iterable<AllocationID>> 
acceptedSlotsFuture = jobMasterGateway.offerSlots(
+                               Future<Iterable<SlotOffer>> acceptedSlotsFuture 
= jobMasterGateway.offerSlots(
+                                       getResourceID(),
                                        reservedSlots,
                                        leaderId,
                                        taskManagerConfiguration.getTimeout());
 
-                               acceptedSlotsFuture.thenAcceptAsync(new 
AcceptFunction<Iterable<AllocationID>>() {
+                               acceptedSlotsFuture.thenAcceptAsync(new 
AcceptFunction<Iterable<SlotOffer>>() {
                                        @Override
-                                       public void 
accept(Iterable<AllocationID> acceptedSlots) {
+                                       public void accept(Iterable<SlotOffer> 
acceptedSlots) {
                                                // check if the response is 
still valid
                                                if 
(isJobManagerConnectionValid(jobId, leaderId)) {
                                                        // mark accepted slots 
active
-                                                       for (AllocationID 
acceptedSlot: acceptedSlots) {
+                                                       for (SlotOffer 
acceptedSlot: acceptedSlots) {
                                                                try {
-                                                                       if 
(!taskSlotTable.markSlotActive(acceptedSlot)) {
+                                                                       if 
(!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
                                                                                
// the slot is either free or releasing at the moment
                                                                                
final String message = "Could not mark slot " + jobId + " active.";
                                                                                
log.debug(message);
-                                                                               
jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message));
+                                                                               
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
+                                                                               
                leaderId, new Exception(message));
                                                                        }
 
                                                                        // 
remove the assigned slots so that we can free the left overs
                                                                        
reservedSlots.remove(acceptedSlot);
                                                                } catch 
(SlotNotFoundException e) {
                                                                        
log.debug("Could not mark slot {} active.", acceptedSlot,  e);
-                                                                       
jobMasterGateway.failSlot(acceptedSlot, leaderId, e);
+                                                                       
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), 
leaderId, e);
                                                                }
                                                        }
 
                                                        final Exception e = new 
Exception("The slot was rejected by the JobManager.");
 
-                                                       for (AllocationID 
rejectedSlot: reservedSlots) {
-                                                               
freeSlot(rejectedSlot, e);
+                                                       for (SlotOffer 
rejectedSlot: reservedSlots) {
+                                                               
freeSlot(rejectedSlot.getAllocationId(), e);
                                                        }
                                                } else {
                                                        // discard the response 
since there is a new leader for the job
@@ -688,8 +692,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                                        
offerSlotsToJobManager(jobId);
                                                } else {
                                                        // We encountered an 
exception. Free the slots and return them to the RM.
-                                                       for (AllocationID 
reservedSlot: reservedSlots) {
-                                                               
freeSlot(reservedSlot, throwable);
+                                                       for (SlotOffer 
reservedSlot: reservedSlots) {
+                                                               
freeSlot(reservedSlot.getAllocationId(), throwable);
                                                        }
                                                }
 
@@ -840,7 +844,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        private void unregisterTaskAndNotifyFinalState(
                        final UUID jobMasterLeaderId,
-                       final JobMasterGateway jobMasterGateway,                
+                       final JobMasterGateway jobMasterGateway,
                        final ExecutionAttemptID executionAttemptID) {
 
                Task task = taskSlotTable.removeTask(executionAttemptID);

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
new file mode 100644
index 0000000..f8d7e6c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Describe the slot offering to job manager provided by task manager.
+ */
+public class SlotOffer implements Serializable {
+
+       private static final long serialVersionUID = -7067814231108250971L;
+
+       /** Allocation id of this slot, this would be the only identifier for 
this slot offer */
+       private AllocationID allocationId;
+
+       /** Index of the offered slot */
+       private final int slotIndex;
+
+       /** The resource profile of the offered slot */
+       private final ResourceProfile resourceProfile;
+
+       public SlotOffer(final AllocationID allocationID, final int index, 
final ResourceProfile resourceProfile) {
+               Preconditions.checkArgument(0 <= index, "The index must be 
greater than 0.");
+               this.allocationId = Preconditions.checkNotNull(allocationID);
+               this.slotIndex = index;
+               this.resourceProfile = 
Preconditions.checkNotNull(resourceProfile);
+       }
+
+       public AllocationID getAllocationId() {
+               return allocationId;
+       }
+
+       public int getSlotIndex() {
+               return slotIndex;
+       }
+
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               SlotOffer slotOffer = (SlotOffer) o;
+               return allocationId.equals(slotOffer.allocationId);
+       }
+
+       @Override
+       public int hashCode() {
+               return allocationId.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 0942772..e12c15b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -286,4 +286,17 @@ public class TaskSlot {
                state = TaskSlotState.RELEASING;
                return true;
        }
+
+       /**
+        * Generate the slot offer from this TaskSlot.
+        *
+        * @return The sot offer which this task slot can provide
+        */
+       public SlotOffer generateSlotOffer() {
+               Preconditions.checkState(TaskSlotState.ACTIVE == state || 
TaskSlotState.ALLOCATED == state,
+                               "The task slot is not in state active or 
allocated.");
+               Preconditions.checkState(allocationId != null, "The task slot 
are not allocated");
+
+               return new SlotOffer(allocationId, index, resourceProfile);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 88123b4..88b83a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,7 +70,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
 
        /** Interface for slot actions, such as freeing them or timing them out 
*/
        private SlotActions slotActions;
-       
+
        /** Whether the table has been started */
        private boolean started;
 
@@ -250,7 +250,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         */
        public int freeSlot(AllocationID allocationId, Throwable cause) throws 
SlotNotFoundException {
                checkInit();
-               
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Free slot {}.", allocationId, cause);
                } else {
@@ -370,13 +370,13 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
        }
 
        /**
-        * Return an iterator of allocated slots (their allocation ids) for the 
given job id.
+        * Return an iterator of allocated slots for the given job id.
         *
         * @param jobId for which to return the allocated slots
-        * @return Iterator of allocation ids of allocated slots.
+        * @return Iterator of allocated slots.
         */
-       public Iterator<AllocationID> getAllocatedSlots(JobID jobId) {
-               return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED);
+       public Iterator<TaskSlot> getAllocatedSlots(JobID jobId) {
+               return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4f891a6c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2b5d2dd..87bde35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -63,6 +63,7 @@ import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -391,6 +392,7 @@ public class TaskExecutorTest extends TestLogger {
 
                final AllocationID allocationId = new AllocationID();
                final SlotID slotId = new SlotID(resourceId, 0);
+               final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
ResourceProfile.UNKNOWN);
 
                try {
                        TaskExecutor taskManager = new TaskExecutor(
@@ -425,7 +427,11 @@ public class TaskExecutorTest extends TestLogger {
                        
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
 
                        // the job leader should get the allocation id offered
-                       
verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)),
 eq(jobManagerLeaderId), any(Time.class));
+                       verify(jobMasterGateway).offerSlots(
+                                       any(ResourceID.class),
+                                       
(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+                                       eq(jobManagerLeaderId),
+                                       any(Time.class));
                } finally {
                        // check if a concurrent error occurred
                        testingFatalErrorHandler.rethrowException();
@@ -481,6 +487,9 @@ public class TaskExecutorTest extends TestLogger {
                final AllocationID allocationId1 = new AllocationID();
                final AllocationID allocationId2 = new AllocationID();
 
+               final SlotOffer offer1 = new SlotOffer(allocationId1, 0, 
ResourceProfile.UNKNOWN);
+               final SlotOffer offer2 = new SlotOffer(allocationId2, 0, 
ResourceProfile.UNKNOWN);
+
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
 
                when(jobMasterGateway.registerTaskManager(
@@ -491,8 +500,9 @@ public class TaskExecutorTest extends TestLogger {
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 
-               when(jobMasterGateway.offerSlots(any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class)))
-                       
.thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1)));
+               when(jobMasterGateway.offerSlots(
+                               any(ResourceID.class), any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class)))
+                       
.thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1)));
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);

Reply via email to