Updated Branches: refs/heads/master fef3dd509 -> 39bb519c8
TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik) Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/39bb519c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/39bb519c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/39bb519c Branch: refs/heads/master Commit: 39bb519c8dbcb5915cdd618e8113df9bc1284182 Parents: fef3dd5 Author: Hyunsik Choi <[email protected]> Authored: Tue Apr 30 20:03:28 2013 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Tue Apr 30 20:52:26 2013 +0900 ---------------------------------------------------------------------- CHANGES.txt | 5 +- .../src/main/java/tajo/master/QueryMaster.java | 7 +- .../main/java/tajo/master/rm/RMCommunicator.java | 349 --------------- .../java/tajo/master/rm/RMContainerAllocator.java | 300 +++++-------- 4 files changed, 122 insertions(+), 539 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 33b422c..e2cae30 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ Release 0.2.0 - unreleased IMPROVEMENTS + TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik) + TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik) TAJO-32: Cleanup TaskRunner. (hyunsik) @@ -30,7 +32,8 @@ Release 0.2.0 - unreleased BUG FIXES - TAJO-38: Update class comment in TaskAttemptContext from Korean to English (hsaputra) + TAJO-38: Update class comment in TaskAttemptContext from Korean to English + (hsaputra) TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik) http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java index 98878d3..86581f0 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java @@ -222,7 +222,6 @@ public class QueryMaster extends CompositeService implements EventHandler { public class QueryContext { private QueryConf conf; - int clusterNode; public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>(); int minCapability; int maxCapability; @@ -296,11 +295,7 @@ public class QueryMaster extends CompositeService implements EventHandler { } public int getNumClusterNode() { - return clusterNode; - } - - public void setNumClusterNode(int num) { - clusterNode = num; + return rmAllocator.getClusterNodeCount(); } public CatalogService getCatalog() { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java deleted file mode 100644 index 90a0737..0000000 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tajo.master.rm; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.AMRMProtocol; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ContainerManager; -import org.apache.hadoop.yarn.api.protocolrecords.*; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.service.AbstractService; -import org.apache.hadoop.yarn.util.BuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.ProtoUtils; -import tajo.TajoProtos.QueryState; -import tajo.master.Query; -import tajo.master.QueryMaster.QueryContext; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class RMCommunicator extends AbstractService { - private static final Log LOG = LogFactory.getLog(RMCommunicator.class); - - protected static final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - - // Resource Manager RPC - private YarnRPC rpc; - protected AMRMProtocol scheduler; - - // For Query - protected QueryContext context; - protected Query query; - private int rmPollInterval = 1000;//millis - protected ApplicationId applicationId; - protected ApplicationAttemptId applicationAttemptId; - protected Map<ApplicationAccessType, String> applicationACLs; - - // RMCommunicator - private final AtomicBoolean stopped; - protected Thread allocatorThread; - - // resource - private Resource minContainerCapability; - private Resource maxContainerCapability; - - // Has a signal (SIGTERM etc) been issued? - protected volatile boolean isSignalled = false; - - public RMCommunicator(QueryContext context) { - super(RMCommunicator.class.getName()); - this.context = context; - this.applicationId = context.getApplicationId(); - this.applicationAttemptId = context.getApplicationAttemptId(); - - stopped = new AtomicBoolean(false); - } - - @Override - public void init(Configuration conf) { - LOG.info("defaultFS: " + conf.get("fs.default.name")); - LOG.info("defaultFS: " + conf.get("fs.defaultFS")); - - super.init(conf); - } - - public void start() { - this.query = context.getQuery(); - rpc = YarnRPC.create(getConfig()); - this.scheduler = createSchedulerProxy(); - - - AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( - applicationAttemptId, 0, 0.0f, - new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()); - try { - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - context.setNumClusterNode(allocateResponse.getNumClusterNodes()); - } catch (YarnRemoteException e) { - e.printStackTrace(); - } - register(); - startAllocatorThread(); - super.start(); - } - - public void stop() { - if (stopped.getAndSet(true)) { - // return if already stopped - return; - } - allocatorThread.interrupt(); - try { - allocatorThread.join(); - } catch (InterruptedException ie) { - LOG.warn("InterruptedException while stopping", ie); - } - unregister(); - super.stop(); - } - - protected void register() { - //Register - try { - RegisterApplicationMasterRequest request = - recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); - request.setApplicationAttemptId(applicationAttemptId); - LOG.info("Tracking Addr: " + context.getRpcAddress()); - request.setHost(context.getRpcAddress().getHostName()); - request.setRpcPort(context.getRpcAddress().getPort()); - // TODO - to be changed to http server - //request.setTrackingUrl("http://" + NetUtils.getIpPortString(context.getRpcAddress())); - request.setTrackingUrl("http://localhost:1234"); - RegisterApplicationMasterResponse response = - scheduler.registerApplicationMaster(request); - minContainerCapability = response.getMinimumResourceCapability(); - maxContainerCapability = response.getMaximumResourceCapability(); - context.setMaxContainerCapability(maxContainerCapability.getMemory()); - context.setMinContainerCapability(minContainerCapability.getMemory()); - this.applicationACLs = response.getApplicationACLs(); - LOG.info("minContainerCapability: " + minContainerCapability.getMemory()); - LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory()); - } catch (Exception are) { - LOG.error("Exception while registering", are); - throw new YarnException(are); - } - } - - protected void unregister() { - try { - FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; - if (query.getState() == QueryState.QUERY_SUCCEEDED) { - finishState = FinalApplicationStatus.SUCCEEDED; - } else if (query.getState() == QueryState.QUERY_KILLED - || (query.getState() == QueryState.QUERY_RUNNING && isSignalled)) { - finishState = FinalApplicationStatus.KILLED; - } else if (query.getState() == QueryState.QUERY_FAILED - || query.getState() == QueryState.QUERY_ERROR) { - finishState = FinalApplicationStatus.FAILED; - } - StringBuffer sb = new StringBuffer(); -// for (String s : query.getDiagnostics()) { -// sb.append(s).append("\n"); -// } - LOG.info("Setting job diagnostics to " + sb.toString()); - - // TODO - to be implemented -// String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(), -// context.getApplicationId()); -// LOG.info("History url is " + historyUrl); - - FinishApplicationMasterRequest request = - recordFactory.newRecordInstance(FinishApplicationMasterRequest.class); - request.setAppAttemptId(this.applicationAttemptId); - request.setFinishApplicationStatus(finishState); - request.setDiagnostics(""); // TODO - tobe implemented - request.setTrackingUrl(""); - scheduler.finishApplicationMaster(request); - } catch(Exception are) { - LOG.error("Exception while unregistering ", are); - } - } - - protected Resource getMinContainerCapability() { - return minContainerCapability; - } - - protected Resource getMaxContainerCapability() { - return maxContainerCapability; - } - - public abstract void heartbeat() throws Exception; - - protected AMRMProtocol createSchedulerProxy() { - final Configuration conf = getConfig(); - final InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - - UserGroupInformation currentUser; - try { - currentUser = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new YarnException(e); - } - - if (UserGroupInformation.isSecurityEnabled()) { - String tokenURLEncodedStr = System.getenv().get( - ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); - Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>(); - - try { - token.decodeFromUrlString(tokenURLEncodedStr); - } catch (IOException e) { - throw new YarnException(e); - } - - SecurityUtil.setTokenService(token, serviceAddr); - if (LOG.isDebugEnabled()) { - LOG.debug("AppMasterToken is " + token); - } - currentUser.addToken(token); - } - - return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() { - @Override - public AMRMProtocol run() { - return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, - serviceAddr, conf); - } - }); - } - - protected void startAllocatorThread() { - allocatorThread = new Thread(new Runnable() { - @Override - public void run() { - while (!stopped.get() && !Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(rmPollInterval); - try { - heartbeat(); - } catch (YarnException e) { - LOG.error("Error communicating with RM: " + e.getMessage() , e); - return; - } catch (Exception e) { - LOG.error("ERROR IN CONTACTING RM. ", e); - // TODO: for other exceptions - } - } catch (InterruptedException e) { - if (!stopped.get()) { - LOG.warn("Allocated thread interrupted. Returning."); - } - return; - } - } - } - }); - allocatorThread.setName("RMCommunicator Allocator"); - allocatorThread.start(); - } - - public void setSignalled(boolean isSignalled) { - this.isSignalled = isSignalled; - LOG.info("RMCommunicator notified that iSignalled is: " - + isSignalled); - } - - protected ContainerManager getCMProxy(ContainerId containerID, - final String containerManagerBindAddr, ContainerToken containerToken) - throws IOException { - - final InetSocketAddress cmAddr = - NetUtils.createSocketAddr(containerManagerBindAddr); - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - - if (UserGroupInformation.isSecurityEnabled()) { - Token<ContainerTokenIdentifier> token = - ProtoUtils.convertFromProtoFormat(containerToken, cmAddr); - // the user in createRemoteUser in this context has to be ContainerID - user = UserGroupInformation.createRemoteUser(containerID.toString()); - user.addToken(token); - } - - ContainerManager proxy = user - .doAs(new PrivilegedAction<ContainerManager>() { - @Override - public ContainerManager run() { - return (ContainerManager) rpc.getProxy(ContainerManager.class, - cmAddr, getConfig()); - } - }); - return proxy; - } - - public static ContainerLaunchContext newContainerLaunchContext( - ContainerId containerID, String user, Resource assignedCapability, - Map<String, LocalResource> localResources, - Map<String, String> environment, List<String> commands, - Map<String, ByteBuffer> serviceData, ByteBuffer containerTokens, - Map<ApplicationAccessType, String> acls) { - ContainerLaunchContext container = recordFactory - .newRecordInstance(ContainerLaunchContext.class); - container.setContainerId(containerID); - container.setUser(user); - container.setResource(assignedCapability); - container.setLocalResources(localResources); - container.setEnvironment(environment); - container.setCommands(commands); - container.setServiceData(serviceData); - container.setContainerTokens(containerTokens); - container.setApplicationACLs(acls); - return container; - } - - private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) - throws IOException { - LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); - FileStatus rsrcStat = fs.getFileStatus(p); - rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs - .getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); - rsrc.setSize(rsrcStat.getLen()); - rsrc.setTimestamp(rsrcStat.getModificationTime()); - rsrc.setType(type); - rsrc.setVisibility(LocalResourceVisibility.APPLICATION); - return rsrc; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java index 7bb7bf0..4c78c26 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java @@ -21,158 +21,126 @@ package tajo.master.rm; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.client.AMRMClientImpl; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.util.BuilderUtils; -import org.apache.hadoop.yarn.util.Records; import tajo.SubQueryId; +import tajo.TajoProtos.QueryState; import tajo.master.QueryMaster.QueryContext; import tajo.master.SubQueryState; -import tajo.master.event.*; +import tajo.master.event.ContainerAllocationEvent; +import tajo.master.event.ContainerAllocatorEventType; +import tajo.master.event.SubQueryContainerAllocationEvent; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; -public class RMContainerAllocator extends RMCommunicator +public class RMContainerAllocator extends AMRMClientImpl implements EventHandler<ContainerAllocationEvent> { /** Class Logger */ private static final Log LOG = LogFactory.getLog(RMContainerAllocator. class.getName()); + private QueryContext context; private final EventHandler eventHandler; public RMContainerAllocator(QueryContext context) { - super(context); + super(context.getApplicationAttemptId()); + this.context = context; this.eventHandler = context.getDispatcher().getEventHandler(); } - private Map<Priority, SubQueryId> subQueryMap - = new HashMap<Priority, SubQueryId>(); - - @Override - public void heartbeat() throws Exception { - List<Container> allocatedContainers = getResources(); - Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>(); - if (allocatedContainers.size() > 0) { - for (Container container : allocatedContainers) { - SubQueryId subQueryId = subQueryMap.get(container.getPriority()); - if (!subQueryMap.containsKey(container.getPriority()) || - query.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) { - release.add(container.getId()); - synchronized (subQueryMap) { - subQueryMap.remove(container.getPriority()); - } - } else { - if (allocated.containsKey(subQueryId)) { - allocated.get(subQueryId).add(container); - } else { - allocated.put(subQueryId, Lists.newArrayList(container)); - } - } - } - - for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) { - eventHandler.handle( - new SubQueryContainerAllocationEvent(entry.getKey(), - entry.getValue())); - } - } + public void init(Configuration conf) { + super.init(conf); } - @Override - public void handle(ContainerAllocationEvent event) { - - if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) { - LOG.info(event); - assign(event); + public void start() { + super.start(); - } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) { - LOG.info(event); - } else { - LOG.info(event); + RegisterApplicationMasterResponse response; + try { + response = registerApplicationMaster("locahost", 10080, "http://localhost:1234"); + context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory()); + context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory()); + } catch (YarnRemoteException e) { + LOG.error(e); } - } - public void assign(ContainerAllocationEvent event) { - SubQueryId subQueryId = event.getSubQueryId(); - subQueryMap.put(event.getPriority(), event.getSubQueryId()); + startAllocatorThread(); + } - int memRequred; - int minContainerCapability; - int supportedMaxContainerCapability = - getMaxContainerCapability().getMemory(); + protected Thread allocatorThread; + private final AtomicBoolean stopped = new AtomicBoolean(false); + private int rmPollInterval = 1000;//millis + protected void startAllocatorThread() { + allocatorThread = new Thread(new Runnable() { + @Override + public void run() { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + try { + try { + heartbeat(); + } catch (YarnException e) { + LOG.error("Error communicating with RM: " + e.getMessage() , e); + return; + } catch (Exception e) { + LOG.error("ERROR IN CONTACTING RM. ", e); + // TODO: for other exceptions + } + Thread.sleep(rmPollInterval); + } catch (InterruptedException e) { + if (!stopped.get()) { + LOG.warn("Allocated thread interrupted. Returning."); + } + return; + } + } + } + }); + allocatorThread.setName("RMContainerAllocator"); + allocatorThread.start(); + } - memRequred = event.getCapability().getMemory(); - minContainerCapability = getMinContainerCapability().getMemory(); - if (memRequred < minContainerCapability) { - memRequred = minContainerCapability; + public void stop() { + stopped.set(true); + super.stop(); + FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; + QueryState state = context.getQuery().getState(); + if (state == QueryState.QUERY_SUCCEEDED) { + finishState = FinalApplicationStatus.SUCCEEDED; + } else if (state == QueryState.QUERY_KILLED + || (state == QueryState.QUERY_RUNNING)) { + finishState = FinalApplicationStatus.KILLED; + } else if (state == QueryState.QUERY_FAILED + || state == QueryState.QUERY_ERROR) { + finishState = FinalApplicationStatus.FAILED; } - if (memRequred > getMaxContainerCapability().getMemory()) { - String diagMsg = "Task capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - memRequred + " maxContainerCapability:" + supportedMaxContainerCapability; - LOG.info(diagMsg); - eventHandler.handle(new QueryDiagnosticsUpdateEvent( - subQueryId.getQueryId(), diagMsg)); - eventHandler.handle(new QueryEvent(subQueryId.getQueryId(), - QueryEventType.KILL)); + try { + unregisterApplicationMaster(finishState, "", "http://localhost:1234"); + } catch (YarnRemoteException e) { + LOG.error(e); } - LOG.info("mapResourceReqt:"+memRequred); - /* - if (event.isLeafQuery() && event instanceof GrouppedContainerAllocatorEvent) { - GrouppedContainerAllocatorEvent allocatorEvent = - (GrouppedContainerAllocatorEvent) event; - List<ResourceRequest> requestList = new ArrayList<>(); - for (Entry<String, Integer> request : - allocatorEvent.getRequestMap().entrySet()) { - - ResourceRequest resReq = Records.newRecord(ResourceRequest.class); - // TODO - to consider the data locality - resReq.setHostName("*"); - resReq.setCapability(allocatorEvent.getCapability()); - resReq.setNumContainers(request.getValue()); - resReq.setPriority(allocatorEvent.getPriority()); - requestList.add(resReq); - } - - ask.addAll(new ArrayList<>(requestList)); - LOG.info(requestList.size()); - LOG.info(ask.size()); - } else {*/ - ResourceRequest resReq = Records.newRecord(ResourceRequest.class); - resReq.setHostName("*"); - resReq.setCapability(event.getCapability()); - resReq.setNumContainers(event.getRequiredNum()); - resReq.setPriority(event.getPriority()); - ask.add(resReq); - //} } - Set<ResourceRequest> ask = new HashSet<ResourceRequest>(); - Set<ContainerId> release = new HashSet<ContainerId>(); - Resource availableResources; - int lastClusterNmCount = 0; - int clusterNmCount = 0; - int lastResponseID = 1; + private final Map<Priority, SubQueryId> subQueryMap = + new HashMap<Priority, SubQueryId>(); - protected AMResponse makeRemoteRequest() throws YarnException, YarnRemoteException { - AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( - applicationAttemptId, lastResponseID, query.getProgress(), - new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(release)); - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); - lastResponseID = response.getResponseId(); - availableResources = response.getAvailableResources(); - lastClusterNmCount = clusterNmCount; - clusterNmCount = allocateResponse.getNumClusterNodes(); + public void heartbeat() throws Exception { + AMResponse response = allocate(context.getProgress()).getAMResponse(); + List<Container> allocatedContainers = response.getAllocatedContainers(); - //LOG.info("Response Id: " + response.getResponseId()); LOG.info("Available Resource: " + response.getAvailableResources()); LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size()); if (response.getAllocatedContainers().size() > 0) { @@ -186,79 +154,45 @@ public class RMContainerAllocator extends RMCommunicator } LOG.info("================================================================"); } - /* - LOG.info("Reboot: " + response.getReboot()); - LOG.info("Num of Updated Node: " + response.getUpdatedNodes()); - for (NodeReport nodeReport : response.getUpdatedNodes()) { - LOG.info("> Node Id: " + nodeReport.getNodeId()); - LOG.info("> Node State: " + nodeReport.getNodeState()); - LOG.info("> Rack Name: " + nodeReport.getRackName()); - LOG.info("> Used: " + nodeReport.getUsed()); - } - */ + Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>(); + if (allocatedContainers.size() > 0) { + for (Container container : allocatedContainers) { + SubQueryId subQueryId = subQueryMap.get(container.getPriority()); + if (!subQueryMap.containsKey(container.getPriority()) || + context.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) { + releaseAssignedContainer(container.getId()); + synchronized (subQueryMap) { + subQueryMap.remove(container.getPriority()); + } + } else { + if (allocated.containsKey(subQueryId)) { + allocated.get(subQueryId).add(container); + } else { + allocated.put(subQueryId, Lists.newArrayList(container)); + } + } + } - if (ask.size() > 0 || release.size() > 0) { - LOG.info("getResources() for " + applicationId + ":" + " ask=" - + ask.size() + " release= " + release.size() + " newContainers=" - + response.getAllocatedContainers().size() + " finishedContainers=" - + response.getCompletedContainersStatuses().size() - + " resourcelimit=" + availableResources + " knownNMs=" - + clusterNmCount); + for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) { + eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue())); + } } - - ask.clear(); - release.clear(); - return response; - } - - public Resource getAvailableResources() { - return availableResources; } + @Override + public void handle(ContainerAllocationEvent event) { - long retrystartTime; - long retryInterval = 3000; - private List<Container> getResources() throws Exception { - int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null - AMResponse response; - - /* - * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS - * milliseconds before aborting. During this interval, AM will still try - * to contact the RM. - */ - try { - response = makeRemoteRequest(); - // Reset retry count if no exception occurred. - retrystartTime = System.currentTimeMillis(); - } catch (Exception e) { - // This can happen when the connection to the RM has gone down. Keep - // re-trying until the retryInterval has expired. - if (System.currentTimeMillis() - retrystartTime >= retryInterval) { - LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); - eventHandler.handle(new QueryEvent(query.getId(), - QueryEventType.INTERNAL_ERROR)); - throw new YarnException("Could not contact RM after " + - retryInterval + " milliseconds."); - } - // Throw this up to the caller, which may decide to ignore it and - // continue to attempt to contact the RM. - throw e; - } + if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) { + LOG.info(event); + subQueryMap.put(event.getPriority(), event.getSubQueryId()); + addContainerRequest(new ContainerRequest(event.getCapability(), null, null, + event.getPriority(), event.getRequiredNum())); - if (response.getReboot()) { - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new QueryEvent(query.getId(), - QueryEventType.INTERNAL_ERROR)); - throw new YarnException("Resource Manager doesn't recognize AttemptId: " + - context.getApplicationId()); + } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) { + LOG.info(event); + } else { + LOG.info(event); } - - int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - List<Container> newContainers = response.getAllocatedContainers(); - - return newContainers; } }
