Author: sseth
Date: Tue Apr 21 18:08:56 2015
New Revision: 1675176

URL: http://svn.apache.org/r1675176
Log:
HIVE-10408. Fix NPE in scheduler in case of rejected tasks. (Siddharth Seth)

Added:
    
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
Modified:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
    
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
    
hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java?rev=1675176&r1=1675175&r2=1675176&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
 Tue Apr 21 18:08:56 2015
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -26,8 +27,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.PathNotFoundException;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
@@ -37,7 +38,6 @@ import org.apache.hadoop.registry.client
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import 
org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
-import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
 import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
 import org.apache.hadoop.registry.client.types.AddressTypes;
 import org.apache.hadoop.registry.client.types.Endpoint;
@@ -55,9 +55,10 @@ public class LlapYarnRegistryImpl implem
   private static final Logger LOG = 
Logger.getLogger(LlapYarnRegistryImpl.class);
 
   private RegistryOperationsService client;
-  private String instanceName;
-  private Configuration conf;
-  private ServiceRecordMarshal encoder;
+  private final String instanceName;
+  private final Configuration conf;
+  private final ServiceRecordMarshal encoder;
+  private final String path;
 
   private final DynamicServiceInstanceSet instances = new 
DynamicServiceInstanceSet();
 
@@ -68,7 +69,8 @@ public class LlapYarnRegistryImpl implem
 
   private final static String SERVICE_CLASS = "org-apache-hive";
 
-  final ScheduledExecutorService refresher = 
Executors.newScheduledThreadPool(1);
+  final ScheduledExecutorService refresher = 
Executors.newScheduledThreadPool(1,
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
   final long refreshDelay;
 
   static {
@@ -90,6 +92,8 @@ public class LlapYarnRegistryImpl implem
     // registry reference
     client = (RegistryOperationsService) 
RegistryOperationsFactory.createInstance(conf);
     encoder = new RegistryUtils.ServiceRecordMarshal();
+    this.path = 
RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
+        SERVICE_CLASS, instanceName, "workers"), "worker-");
     refreshDelay =
         conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
             LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
@@ -114,8 +118,7 @@ public class LlapYarnRegistryImpl implem
   }
 
   private final String getPath() {
-    return 
RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
-        SERVICE_CLASS, instanceName, "workers"), "worker-");
+    return this.path;
   }
 
   @Override
@@ -199,7 +202,8 @@ public class LlapYarnRegistryImpl implem
     }
 
     public void kill() {
-      LOG.info("Killing " + this);
+      // May be possible to generate a notification back to the scheduler from 
here.
+      LOG.info("Killing service instance: " + this);
       this.alive = false;
     }
 
@@ -217,74 +221,90 @@ public class LlapYarnRegistryImpl implem
 
     @Override
     public String toString() {
-      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" 
+ rpcPort + "]";
+      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" 
+ rpcPort + " with resources=" + getResource() +"]";
     }
+
+    // Relying on the identity hashCode and equality, since refreshing 
instances retains the old copy
+    // of an already known instance.
   }
 
   private class DynamicServiceInstanceSet implements ServiceInstanceSet {
 
-    Map<String, ServiceInstance> instances;
+    // LinkedHashMap to retain iteration order.
+    private final Map<String, ServiceInstance> instances = new 
LinkedHashMap<>();
 
     @Override
-    public Map<String, ServiceInstance> getAll() {
-      return instances;
+    public synchronized Map<String, ServiceInstance> getAll() {
+      // Return a copy. Instances may be modified during a refresh.
+      return new LinkedHashMap<>(instances);
     }
 
     @Override
-    public ServiceInstance getInstance(String name) {
+    public synchronized ServiceInstance getInstance(String name) {
       return instances.get(name);
     }
 
     @Override
-    public synchronized void refresh() throws IOException {
+    public  void refresh() throws IOException {
       /* call this from wherever */
       Map<String, ServiceInstance> freshInstances = new HashMap<String, 
ServiceInstance>();
 
       String path = getPath();
       Map<String, ServiceRecord> records =
           RegistryUtils.listServiceRecords(client, 
RegistryPathUtils.parentOf(path));
-      Set<String> latestKeys = new HashSet<String>();
-      LOG.info("Starting to refresh ServiceInstanceSet " + 
System.identityHashCode(this));
-      for (ServiceRecord rec : records.values()) {
-        ServiceInstance instance = new DynamicServiceInstance(rec);
-        if (instance != null) {
-          if (instances != null && 
instances.containsKey(instance.getWorkerIdentity()) == false) {
-            // add a new object
-            freshInstances.put(instance.getWorkerIdentity(), instance);
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Adding new worker " + instance.getWorkerIdentity() + " 
which mapped to "
-                  + instance);
+      // Synchronize after reading the service records from the external 
service (ZK)
+      synchronized (this) {
+        Set<String> latestKeys = new HashSet<String>();
+        LOG.info("Starting to refresh ServiceInstanceSet " + 
System.identityHashCode(this));
+        for (ServiceRecord rec : records.values()) {
+          ServiceInstance instance = new DynamicServiceInstance(rec);
+          if (instance != null) {
+            if (instances != null && 
instances.containsKey(instance.getWorkerIdentity()) == false) {
+              // add a new object
+              freshInstances.put(instance.getWorkerIdentity(), instance);
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Adding new worker " + instance.getWorkerIdentity() + 
" which mapped to "
+                    + instance);
+              }
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Retaining running worker " + 
instance.getWorkerIdentity() +
+                    " which mapped to " + instance);
+              }
             }
-          } else if (LOG.isDebugEnabled()) {
-            LOG.debug("Retaining running worker " + 
instance.getWorkerIdentity() + " which mapped to " + instance);
           }
+          latestKeys.add(instance.getWorkerIdentity());
         }
-        latestKeys.add(instance.getWorkerIdentity());
-      }
 
-      if (instances != null) {
-        // deep-copy before modifying
-        Set<String> oldKeys = new HashSet(instances.keySet());
-        if (oldKeys.removeAll(latestKeys)) {
-          for (String k : oldKeys) {
-            // this is so that people can hold onto ServiceInstance references 
as placeholders for tasks
-            final DynamicServiceInstance dead = (DynamicServiceInstance) 
instances.get(k);
-            dead.kill();
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Deleting dead worker " + k + " which mapped to " + 
dead);
+        if (instances != null) {
+          // deep-copy before modifying
+          Set<String> oldKeys = new HashSet(instances.keySet());
+          if (oldKeys.removeAll(latestKeys)) {
+            // This is all the records which have not checked in, and are 
effectively dead.
+            for (String k : oldKeys) {
+              // this is so that people can hold onto ServiceInstance 
references as placeholders for tasks
+              final DynamicServiceInstance dead = (DynamicServiceInstance) 
instances.get(k);
+              dead.kill();
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Deleting dead worker " + k + " which mapped to " + 
dead);
+              }
             }
           }
+          // oldKeys contains the set of dead instances at this point.
+          this.instances.keySet().removeAll(oldKeys);
+          this.instances.putAll(freshInstances);
+        } else {
+          this.instances.putAll(freshInstances);
         }
-        this.instances.keySet().removeAll(oldKeys);
-        this.instances.putAll(freshInstances);
-      } else {
-        this.instances = freshInstances;
       }
     }
 
     @Override
-    public Set<ServiceInstance> getByHost(String host) {
+    public synchronized Set<ServiceInstance> getByHost(String host) {
+      // TODO Maybe store this as a map which is populated during 
construction, to avoid walking
+      // the map on each request.
       Set<ServiceInstance> byHost = new HashSet<ServiceInstance>();
+
       for (ServiceInstance i : instances.values()) {
         if (host.equals(i.getHost())) {
           // all hosts in instances should be alive in this impl

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1675176&r1=1675175&r2=1675176&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 Tue Apr 21 18:08:56 2015
@@ -218,7 +218,6 @@ public class LlapTaskCommunicator extend
 
           @Override
           public void indicateError(Throwable t) {
-            LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " + containerId, t);
             if (t instanceof ServiceException) {
               ServiceException se = (ServiceException) t;
               t = se.getCause();
@@ -228,10 +227,14 @@ public class LlapTaskCommunicator extend
               String message = re.toString();
               // RejectedExecutions from the remote service treated as KILLED
               if 
(message.contains(RejectedExecutionException.class.getName())) {
+                LOG.info(
+                    "Unable to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
+                        containerId + ", Service Busy");
                 
getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
               } else {
                 // All others from the remote service cause the task to FAIL.
+                LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() 
+ " on containerId: " + containerId, t);
                 getTaskCommunicatorContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
                         t.toString());
@@ -239,10 +242,14 @@ public class LlapTaskCommunicator extend
             } else {
               // Exception from the RPC layer - communication failure, 
consider as KILLED / service down.
               if (t instanceof IOException) {
+                LOG.info(
+                    "Unable to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
+                        containerId + ", Communication Error");
                 
getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication 
Error");
               } else {
                 // Anything else is a FAIL.
+                LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() 
+ " on containerId: " + containerId, t);
                 getTaskCommunicatorContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
                         t.getMessage());

Added: 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java?rev=1675176&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java
 Tue Apr 21 18:08:56 2015
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.app.AppContext;
+
+class ContainerFactory {
+  final ApplicationAttemptId customAppAttemptId;
+  AtomicLong nextId;
+
+  public ContainerFactory(AppContext appContext, long appIdLong) {
+    this.nextId = new AtomicLong(1);
+    ApplicationId appId =
+        ApplicationId.newInstance(appIdLong, 
appContext.getApplicationAttemptId()
+            .getApplicationId().getId());
+    this.customAppAttemptId =
+        ApplicationAttemptId.newInstance(appId, 
appContext.getApplicationAttemptId()
+            .getAttemptId());
+  }
+
+  public Container createContainer(Resource capability, Priority priority, 
String hostname,
+      int port) {
+    ContainerId containerId =
+        ContainerId.newContainerId(customAppAttemptId, 
nextId.getAndIncrement());
+    NodeId nodeId = NodeId.newInstance(hostname, port);
+    String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
+
+    Container container =
+        Container.newInstance(containerId, nodeId, nodeHttpAddress, 
capability, priority, null);
+
+    return container;
+  }
+}

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1675176&r1=1675175&r2=1675176&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 Tue Apr 21 18:08:56 2015
@@ -21,6 +21,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +39,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -48,8 +48,6 @@ import org.apache.hadoop.hive.llap.confi
 import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -78,15 +76,11 @@ public class LlapTaskSchedulerService ex
   // interface into the registry service
   private ServiceInstanceSet activeInstances;
 
+  // Tracks all instances, including ones which have been disabled in the past.
+  // LinkedHashMap to provide the same iteration order when selecting a random 
host.
   @VisibleForTesting
-  final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new HashMap<>();
-  
-  @VisibleForTesting
-  final Set<ServiceInstance> instanceBlackList = new 
HashSet<ServiceInstance>();
-
-  @VisibleForTesting
-  // Tracks currently allocated containers.
-  final Map<ContainerId, String> containerToInstanceMap = new HashMap<>();
+  final Map<ServiceInstance, NodeInfo> instanceToNodeMap = new 
LinkedHashMap<>();
+  // TODO Ideally, remove elements from this once it's known that no tasks are 
linked to the instance (all deallocated)
 
   // Tracks tasks which could not be allocated immediately.
   @VisibleForTesting
@@ -100,8 +94,9 @@ public class LlapTaskSchedulerService ex
   // Tracks running and queued tasks. Cleared after a task completes.
   private final ConcurrentMap<Object, TaskInfo> knownTasks = new 
ConcurrentHashMap<>();
 
+  // Queue for disabled nodes. Nodes make it out of this queue when their 
expiration timeout is hit.
   @VisibleForTesting
-  final DelayQueue<NodeInfo> disabledNodes = new DelayQueue<>();
+  final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
 
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
@@ -263,9 +258,9 @@ public class LlapTaskSchedulerService ex
     int vcores = 0;
     readLock.lock();
     try {
-      for (ServiceInstance inst : instanceToNodeMap.keySet()) {
-        if (inst.isAlive()) {
-          Resource r = inst.getResource();
+      for (Entry<ServiceInstance, NodeInfo> entry : 
instanceToNodeMap.entrySet()) {
+        if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) {
+          Resource r = entry.getKey().getResource();
           memory += r.getMemory();
           vcores += r.getVirtualCores();
         }
@@ -375,8 +370,6 @@ public class LlapTaskSchedulerService ex
         }
         return false;
       }
-      String hostForContainer = 
containerToInstanceMap.remove(taskInfo.containerId);
-      assert hostForContainer != null;
       ServiceInstance assignedInstance = taskInfo.assignedInstance;
       assert assignedInstance != null;
 
@@ -410,6 +403,8 @@ public class LlapTaskSchedulerService ex
   @Override
   public Object deallocateContainer(ContainerId containerId) {
     LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + 
containerId);
+    // Containers are not being tracked for re-use.
+    // This is safe to ignore since a deallocate task should have come in 
earlier.
     return null;
   }
 
@@ -435,7 +430,7 @@ public class LlapTaskSchedulerService ex
   }
 
   /**
-   * @param requestedHosts the list of preferred hosts. null implies any host
+   * @param request the list of preferred hosts. null implies any host
    * @return
    */
   private ServiceInstance selectHost(TaskInfo request) {
@@ -444,6 +439,9 @@ public class LlapTaskSchedulerService ex
     try {
       // Check if any hosts are active.
       if (getAvailableResources().getMemory() <= 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Refreshing instances since total memory is 0");
+        }
         refreshInstances();
       }
 
@@ -453,16 +451,20 @@ public class LlapTaskSchedulerService ex
       }
 
       if (requestedHosts != null) {
+        int prefHostCount = -1;
         for (String host : requestedHosts) {
+          prefHostCount++;
           // Pick the first host always. Weak attempt at cache affinity.
           Set<ServiceInstance> instances = activeInstances.getByHost(host);
           if (!instances.isEmpty()) {
             for (ServiceInstance inst : instances) {
-              if (inst.isAlive() && instanceToNodeMap.containsKey(inst)) {
-                // only allocate from the "available" list
+              NodeInfo nodeInfo = instanceToNodeMap.get(inst);
+              if (inst.isAlive() && nodeInfo != null && 
!nodeInfo.isDisabled()) {
                 // TODO Change this to work off of what we think is remaining 
capacity for an
                 // instance
-                LOG.info("Assigning " + inst + " when looking for " + host);
+                LOG.info(
+                    "Assigning " + inst + " when looking for " + host + ". 
FirstRequestedHost=" +
+                        (prefHostCount == 0));
                 return inst;
               }
             }
@@ -470,16 +472,16 @@ public class LlapTaskSchedulerService ex
         }
       }
       /* fall through - miss in locality (random scheduling) */
-      ServiceInstance[] all = instanceToNodeMap.keySet().toArray(new 
ServiceInstance[0]);
+      Entry<ServiceInstance, NodeInfo> [] all = 
instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]);
       // Check again
       if (all.length > 0) {
         int n = random.nextInt(all.length);
         // start at random offset and iterate whole list
         for (int i = 0; i < all.length; i++) {
-          ServiceInstance inst = all[(i + n) % all.length];
-          if (inst.isAlive()) {
-            LOG.info("Assigning " + inst + " when looking for any host");
-            return inst;
+          Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length];
+          if (inst.getKey().isAlive() && !inst.getValue().isDisabled()) {
+            LOG.info("Assigning " + inst + " when looking for any host, from 
#hosts=" + all.length);
+            return inst.getKey();
           }
         }
       }
@@ -487,15 +489,27 @@ public class LlapTaskSchedulerService ex
       readLock.unlock();
     }
 
+    // TODO Ideally, each refresh operation should addNodes if they don't 
already exist.
+    // Even better would be to get notifications from the service impl when a 
node gets added or removed.
+    // Instead of having to walk through the entire list. The computation of a 
node getting added or
+    // removed already exists in the DynamicRegistry implementation.
+
+
+    // This will only happen if no allocations are possible, which means all 
other nodes have
+    // been blacklisted.
+    // TODO Look for new nodes more often. See comment above.
+
     /* check again whether nodes are disabled or just missing */
     writeLock.lock();
     try {
       for (ServiceInstance inst : activeInstances.getAll().values()) {
-        if (inst.isAlive() && instanceBlackList.contains(inst) == false
-            && instanceToNodeMap.containsKey(inst) == false) {
+        if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) {
           /* that's a good node, not added to the allocations yet */
+          LOG.info("Found a new node: " + inst + ". Adding to node list and 
disabling to trigger scheduling");
           addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock));
           // mark it as disabled to let the pending tasks go there
+          // TODO If disabling the instance, have it wake up immediately 
instead of waiting.
+          // Ideally get rid of this requirement, by having all tasks 
allocated via a queue.
           disableInstance(inst, true);
         }
       }
@@ -515,19 +529,22 @@ public class LlapTaskSchedulerService ex
   }
 
   private void addNode(ServiceInstance inst, NodeInfo node) {
+    LOG.info("Adding node: " + inst);
     instanceToNodeMap.put(inst, node);
+    // TODO Trigger a scheduling run each time a new node is added.
   }
 
   private void reenableDisabledNode(NodeInfo nodeInfo) {
     writeLock.lock();
     try {
       if (!nodeInfo.isBusy()) {
+        // If the node being re-enabled was not marked busy previously, then 
it was disabled due to
+        // some other failure. Refresh the service list to see if it's been 
removed permanently.
         refreshInstances();
       }
+      LOG.info("Attempting to re-enable node: " + nodeInfo.host);
       if (nodeInfo.host.isAlive()) {
         nodeInfo.enableNode();
-        instanceBlackList.remove(nodeInfo.host);
-        instanceToNodeMap.put(nodeInfo.host, nodeInfo);
       } else {
         if (LOG.isInfoEnabled()) {
           LOG.info("Removing dead node " + nodeInfo);
@@ -541,19 +558,18 @@ public class LlapTaskSchedulerService ex
   private void disableInstance(ServiceInstance instance, boolean busy) {
     writeLock.lock();
     try {
-      NodeInfo nodeInfo = instanceToNodeMap.remove(instance);
-      if (nodeInfo == null) {
+      NodeInfo nodeInfo = instanceToNodeMap.get(instance);
+      if (nodeInfo == null || nodeInfo.isDisabled()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Node: " + instance + " already disabled, or invalid. Not 
doing anything.");
         }
       } else {
-        instanceBlackList.add(instance);
         nodeInfo.disableNode(nodeReEnableTimeout);
         nodeInfo.setBusy(busy); // daemon failure vs daemon busy
         // TODO: handle task to container map events in case of hard failures
-        disabledNodes.add(nodeInfo);
+        disabledNodesQueue.add(nodeInfo);
         if (LOG.isInfoEnabled()) {
-          LOG.info("Disabling instance " + instance + " for " + 
nodeReEnableTimeout + " seconds");
+          LOG.info("Disabling instance " + instance + " for " + 
nodeReEnableTimeout + " milli-seconds");
         }
       }
     } finally {
@@ -640,7 +656,6 @@ public class LlapTaskSchedulerService ex
             host.getHost());
         taskInfo.setAssignmentInfo(host, container.getId());
         knownTasks.putIfAbsent(taskInfo.task, taskInfo);
-        containerToInstanceMap.put(container.getId(), 
host.getWorkerIdentity());
       } finally {
         writeLock.unlock();
       }
@@ -660,7 +675,7 @@ public class LlapTaskSchedulerService ex
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           while (true) {
-            NodeInfo nodeInfo = disabledNodes.take();
+            NodeInfo nodeInfo = disabledNodesQueue.take();
             // A node became available. Enable the node and try scheduling.
             reenableDisabledNode(nodeInfo);
             schedulePendingTasks();
@@ -694,8 +709,12 @@ public class LlapTaskSchedulerService ex
     private long numSuccessfulTasks = 0;
     private long numSuccessfulTasksAtLastBlacklist = -1;
     float cumulativeBackoffFactor = 1.0f;
+    // A node could be disabled for reasons other than being busy.
+    private boolean disabled = false;
+    // If disabled, the node could be marked as busy.
     private boolean busy;
 
+
     NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) {
       this.host = host;
       constBackOffFactor = backoffFactor;
@@ -704,10 +723,12 @@ public class LlapTaskSchedulerService ex
 
     void enableNode() {
       expireTimeMillis = -1;
+      disabled = false;
     }
 
     void disableNode(long duration) {
       long currentTime = clock.getTime();
+      disabled = true;
       if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) {
         // Blacklisted again, without any progress. Will never kick in for the 
first run.
         cumulativeBackoffFactor = cumulativeBackoffFactor * constBackOffFactor;
@@ -721,7 +742,12 @@ public class LlapTaskSchedulerService ex
     }
 
     void registerTaskSuccess() {
-      this.busy = false; // if a task exited, we might have free slots
+      // TODO If a task succeeds, we may have free slots. Mark the node as 
!busy. Ideally take it out
+      // of the queue for more allocations.
+      // For now, not chanigng the busy status,
+
+      // this.busy = false;
+      // this.disabled = false;
       numSuccessfulTasks++;
     }
 
@@ -733,9 +759,13 @@ public class LlapTaskSchedulerService ex
       return busy;
     }
 
+    public boolean isDisabled() {
+      return disabled;
+    }
+
     @Override
     public long getDelay(TimeUnit unit) {
-      return expireTimeMillis - clock.getTime();
+      return unit.convert(expireTimeMillis - clock.getTime(), 
TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -869,31 +899,4 @@ public class LlapTaskSchedulerService ex
     }
   }
 
-  static class ContainerFactory {
-    final ApplicationAttemptId customAppAttemptId;
-    AtomicLong nextId;
-
-    public ContainerFactory(AppContext appContext, long appIdLong) {
-      this.nextId = new AtomicLong(1);
-      ApplicationId appId =
-          ApplicationId.newInstance(appIdLong, 
appContext.getApplicationAttemptId()
-              .getApplicationId().getId());
-      this.customAppAttemptId =
-          ApplicationAttemptId.newInstance(appId, 
appContext.getApplicationAttemptId()
-              .getAttemptId());
-    }
-
-    public Container createContainer(Resource capability, Priority priority, 
String hostname,
-        int port) {
-      ContainerId containerId =
-          ContainerId.newContainerId(customAppAttemptId, 
nextId.getAndIncrement());
-      NodeId nodeId = NodeId.newInstance(hostname, port);
-      String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
-
-      Container container =
-          Container.newInstance(containerId, nodeId, nodeHttpAddress, 
capability, priority, null);
-
-      return container;
-    }
-  }
 }

Modified: 
hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java?rev=1675176&r1=1675175&r2=1675176&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 (original)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 Tue Apr 21 18:08:56 2015
@@ -22,9 +22,7 @@ import static org.junit.Assert.assertTru
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.AppContext;
@@ -46,7 +43,6 @@ import org.apache.tez.dag.app.Controlled
 import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.mortbay.log.Log;
 
 public class TestLlapTaskSchedulerService {
 
@@ -112,7 +108,7 @@ public class TestLlapTaskSchedulerServic
       // Verify that the node is blacklisted
       assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks);
       assertEquals(2, tsWrapper.ts.instanceToNodeMap.size());
-      LlapTaskSchedulerService.NodeInfo disabledNodeInfo = 
tsWrapper.ts.disabledNodes.peek();
+      LlapTaskSchedulerService.NodeInfo disabledNodeInfo = 
tsWrapper.ts.disabledNodesQueue.peek();
       assertNotNull(disabledNodeInfo);
       assertEquals(HOST1, disabledNodeInfo.host.getHost());
       assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.NANOSECONDS));
@@ -164,7 +160,7 @@ public class TestLlapTaskSchedulerServic
       // Verify that the node is blacklisted
       assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks);
       assertEquals(0, tsWrapper.ts.instanceToNodeMap.size());
-      assertEquals(3, tsWrapper.ts.disabledNodes.size());
+      assertEquals(3, tsWrapper.ts.disabledNodesQueue.size());
 
 
       Object task4 = new Object();


Reply via email to