[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453958#comment-15453958
 ] 

ASF GitHub Bot commented on FLINK-4490:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2447#discussion_r77101490
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
    @@ -20,73 +20,125 @@
     
     import org.apache.flink.runtime.instance.SimpleSlot;
     
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * 
    + */
     public class SlotAllocationFuture {
    -   
    +
        private final Object monitor = new Object();
    -   
    +
        private volatile SimpleSlot slot;
    -   
    +
        private volatile SlotAllocationFutureAction action;
    -   
    +
        // 
--------------------------------------------------------------------------------------------
     
    +   /**
    +    * Creates a future that is uncompleted.
    +    */
        public SlotAllocationFuture() {}
    -   
    +
    +   /**
    +    * Creates a future that is immediately completed.
    +    * 
    +    * @param slot The task slot that completes the future.
    +    */
        public SlotAllocationFuture(SimpleSlot slot) {
                this.slot = slot;
        }
    -   
    +
        // 
--------------------------------------------------------------------------------------------
    -   
    -   public SimpleSlot waitTillAllocated() throws InterruptedException {
    -           return waitTillAllocated(0);
    -   }
    -   
    -   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
    +
    +   public SimpleSlot waitTillCompleted() throws InterruptedException {
                synchronized (monitor) {
                        while (slot == null) {
    -                           monitor.wait(timeout);
    +                           monitor.wait();
    +                   }
    +                   return slot;
    +           }
    +   }
    +
    +   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
    +           checkArgument(timeout >= 0, "timeout may not be negative");
    +           checkNotNull(timeUnit, "timeUnit");
    +
    +           if (timeout == 0) {
    +                   return waitTillCompleted();
    +           } else {
    +                   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
    +                   long millisToWait;
    +
    +                   synchronized (monitor) {
    +                           while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
    +                                   monitor.wait(millisToWait);
    +                           }
    +
    +                           if (slot != null) {
    +                                   return slot;
    +                           } else {
    +                                   throw new TimeoutException();
    +                           }
                        }
    -                   
    +           }
    +   }
    +
    +   /**
    +    * Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
    +    * This method never blocks.
    +    * 
    +    * @return The slot with which this future was completed.
    +    * @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
    +    */
    +   public SimpleSlot get() {
    +           final SimpleSlot slot = this.slot;
    +           if (slot != null) {
                        return slot;
    +           } else {
    +                   throw new IllegalStateException("The future is not 
complete - not slot available");
    --- End diff --
    
    Can we throw a explicitly exception like SlotNotReadyException instead of 
RuntimeException? 


> Decouple Slot and Instance
> --------------------------
>
>                 Key: FLINK-4490
>                 URL: https://issues.apache.org/jira/browse/FLINK-4490
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>            Reporter: Kurt Young
>             Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to