[ 
https://issues.apache.org/jira/browse/FLINK-5810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928745#comment-15928745
 ] 

ASF GitHub Bot commented on FLINK-5810:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3394#discussion_r106506811
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
    @@ -21,519 +21,897 @@
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.time.Time;
     import org.apache.flink.runtime.clusterframework.types.AllocationID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceID;
    -import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
     import org.apache.flink.runtime.clusterframework.types.SlotID;
     import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
     import org.apache.flink.runtime.concurrent.Future;
    -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.messages.Acknowledge;
     import org.apache.flink.runtime.resourcemanager.SlotRequest;
    -import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
    -import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
    -import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
    -import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
    +import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
    +import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
     import org.apache.flink.runtime.taskexecutor.SlotReport;
     import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
    +import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
     import java.util.LinkedHashMap;
     import java.util.Map;
    -
    -import static org.apache.flink.util.Preconditions.checkNotNull;
    +import java.util.Objects;
    +import java.util.UUID;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
     
     /**
    - * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
    - * slots from registered TaskManagers and issues container allocation 
requests in case of there are not
    - * enough available slots. Besides, it should sync its slot allocation 
with TaskManager's heartbeat.
    - * <p>
    - * The main operation principle of SlotManager is:
    - * <ul>
    - * <li>1. All slot allocation status should be synced with TaskManager, 
which is the ground truth.</li>
    - * <li>2. All slots that have registered must be tracked, either by free 
pool or allocated pool.</li>
    - * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
    - * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
    - * be handled outside SlotManager. SlotManager will make each decision 
based on the information it currently
    - * holds.</li>
    - * </ul>
    - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
    + * The slot manager is responsible for maintaining a view on all 
registered task manager slots,
    + * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
    + * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
    + * are not enough slots available the slot manager will notify the 
resource manager about it via
    + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
    + *
    + * In order to free resources and avoid resource leaks, idling task 
managers (task managers whose
    + * slots are currently not used) and not fulfilled pending slot requests 
time out triggering their
    + * release and failure, respectively.
      */
    -public abstract class SlotManager {
    +public class SlotManager implements AutoCloseable {
    --- End diff --
    
    Implementing `AutoCloseable` is a neat trick, but I find it almost a bit 
confusing. Would one ever use a SlotManager in a "try-with-resource" statement?


> Harden SlotManager
> ------------------
>
>                 Key: FLINK-5810
>                 URL: https://issues.apache.org/jira/browse/FLINK-5810
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Harden the {{SlotManager}} logic to better cope with lost messages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to