http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java deleted file mode 100644 index 90a4eb5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ /dev/null @@ -1,605 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.protobuf.RpcCallback; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.QueryId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; -import org.apache.tajo.master.QueryInProgress; -import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.rpc.CancelableRpcCallback; -import org.apache.tajo.rpc.RpcUtils; -import org.apache.tajo.util.ApplicationIdUtils; -import org.apache.tajo.util.BasicFuture; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * It manages all resources of tajo workers. - */ -public class TajoWorkerResourceManager extends CompositeService implements WorkerResourceManager { - /** class logger */ - private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class); - - static AtomicInteger containerIdSeq = new AtomicInteger(0); - - private TajoMaster.MasterContext masterContext; - - private TajoRMContext rmContext; - - private String queryIdSeed; - - private WorkerResourceAllocationThread workerResourceAllocator; - - /** - * Worker Liveliness monitor - */ - private WorkerLivelinessMonitor workerLivelinessMonitor; - - private final BlockingQueue<WorkerResourceRequest> requestQueue = - new LinkedBlockingDeque<WorkerResourceRequest>(); - private final RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>> summaryRequest = - new RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>>(); - - private AtomicBoolean stopped = new AtomicBoolean(false); - - private TajoConf systemConf; - - private ConcurrentMap<ContainerProtocol.TajoContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps - .newConcurrentMap(); - - /** It receives status messages from workers and their resources. */ - private TajoResourceTracker resourceTracker; - - public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) { - super(TajoWorkerResourceManager.class.getSimpleName()); - this.masterContext = masterContext; - } - - public TajoWorkerResourceManager(TajoConf systemConf) { - super(TajoWorkerResourceManager.class.getSimpleName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - Preconditions.checkArgument(conf instanceof TajoConf); - this.systemConf = (TajoConf) conf; - - AsyncDispatcher dispatcher = new AsyncDispatcher(); - addIfService(dispatcher); - - rmContext = new TajoRMContext(dispatcher); - - this.queryIdSeed = String.valueOf(System.currentTimeMillis()); - - workerResourceAllocator = new WorkerResourceAllocationThread(); - workerResourceAllocator.start(); - - this.workerLivelinessMonitor = new WorkerLivelinessMonitor(this.rmContext.getDispatcher()); - addIfService(this.workerLivelinessMonitor); - - // Register event handler for Workers - rmContext.getDispatcher().register(WorkerEventType.class, new WorkerEventDispatcher(rmContext)); - - resourceTracker = new TajoResourceTracker(this, workerLivelinessMonitor); - addIfService(resourceTracker); - - super.serviceInit(systemConf); - } - - @InterfaceAudience.Private - public static final class WorkerEventDispatcher implements EventHandler<WorkerEvent> { - - private final TajoRMContext rmContext; - - public WorkerEventDispatcher(TajoRMContext rmContext) { - this.rmContext = rmContext; - } - - @Override - public void handle(WorkerEvent event) { - int workerId = event.getWorkerId(); - Worker node = this.rmContext.getWorkers().get(workerId); - if (node != null) { - try { - node.handle(event); - } catch (Throwable t) { - LOG.error("Error in handling event type " + event.getType() + " for node " + workerId, t); - } - } - } - } - - @Override - public Map<Integer, Worker> getWorkers() { - return ImmutableMap.copyOf(rmContext.getWorkers()); - } - - @Override - public Map<Integer, Worker> getInactiveWorkers() { - return ImmutableMap.copyOf(rmContext.getInactiveWorkers()); - } - - public Collection<Integer> getQueryMasters() { - return Collections.unmodifiableSet(rmContext.getQueryMasterWorker()); - } - - @Override - public void serviceStop() throws Exception { - if(stopped.get()) { - return; - } - stopped.set(true); - if(workerResourceAllocator != null) { - workerResourceAllocator.interrupt(); - } - - super.serviceStop(); - } - - /** - * - * @return The prefix of queryId. It is generated when a TajoMaster starts up. - */ - @Override - public String getSeedQueryId() throws IOException { - return queryIdSeed; - } - - @VisibleForTesting - TajoResourceTracker getResourceTracker() { - return resourceTracker; - } - - private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) { - float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar( - TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT); - int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB); - - WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder(); - builder.setQueryId(queryId.getProto()); - builder.setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB); - builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB); - builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot); - builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot); - builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY); - builder.setNumContainers(1); - return builder.build(); - } - - @Override - public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) { - - // 3 seconds, by default - long timeout = masterContext.getConf().getTimeVar( - TajoConf.ConfVars.TAJO_QUERYMASTER_ALLOCATION_TIMEOUT, TimeUnit.MILLISECONDS); - - // Create a resource request for a query master - WorkerResourceAllocationRequest qmResourceRequest = createQMResourceRequest(queryInProgress.getQueryId()); - - // call future for async call - final CancelableRpcCallback<WorkerResourceAllocationResponse> callFuture = - new CancelableRpcCallback<WorkerResourceAllocationResponse>() { - @Override - protected void cancel(WorkerResourceAllocationResponse canceled) { - if (canceled != null && !canceled.getWorkerAllocatedResourceList().isEmpty()) { - LOG.info("Canceling resources allocated"); - WorkerAllocatedResource resource = canceled.getWorkerAllocatedResource(0); - releaseWorkerResource(resource.getContainerId()); - } - } - }; - allocateWorkerResources(qmResourceRequest, callFuture); - - WorkerResourceAllocationResponse response = null; - try { - response = callFuture.get(timeout, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - response = callFuture.cancel(); // try cancel - if (response == null) { - // canceled successfuly - LOG.warn("Got exception waiting resources for query master " + queryInProgress.getQueryId(), t); - return null; - } - } - - if (response == null || response.getWorkerAllocatedResourceList().size() == 0) { - return null; - } - - WorkerAllocatedResource resource = response.getWorkerAllocatedResource(0); - registerQueryMaster(queryInProgress.getQueryId(), resource.getContainerId()); - return resource; - } - - private void registerQueryMaster(QueryId queryId, ContainerProtocol.TajoContainerIdProto containerId) { - rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId); - } - - @Override - public void allocateWorkerResources(WorkerResourceAllocationRequest request, - RpcCallback<WorkerResourceAllocationResponse> callBack) { - try { - //TODO checking queue size - requestQueue.put(new WorkerResourceRequest(new QueryId(request.getQueryId()), false, request, callBack)); - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - } - } - - static class WorkerResourceRequest { - boolean queryMasterRequest; - QueryId queryId; - WorkerResourceAllocationRequest request; - RpcCallback<WorkerResourceAllocationResponse> callBack; - WorkerResourceRequest( - QueryId queryId, - boolean queryMasterRequest, WorkerResourceAllocationRequest request, - RpcCallback<WorkerResourceAllocationResponse> callBack) { - this.queryId = queryId; - this.queryMasterRequest = queryMasterRequest; - this.request = request; - this.callBack = callBack; - } - } - - static class AllocatedWorkerResource { - Worker worker; - int allocatedMemoryMB; - float allocatedDiskSlots; - } - - private static final long QUEUE_POLLING_TIME = 100; - - class WorkerResourceAllocationThread extends Thread { - @Override - public void run() { - LOG.info("WorkerResourceAllocationThread start"); - while(!stopped.get()) { - BasicFuture<ClusterResourceSummary> future = summaryRequest.expire(); - if (future != null) { - future.done(makeClusterResourceSummary()); - } - try { - WorkerResourceRequest resourceRequest = requestQueue.poll( - QUEUE_POLLING_TIME, TimeUnit.MILLISECONDS); - if (resourceRequest == null) { - continue; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("allocateWorkerResources:" + - (new QueryId(resourceRequest.request.getQueryId())) + - ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() + - "~" + resourceRequest.request.getMaxMemoryMBPerContainer() + - ", requiredContainers:" + resourceRequest.request.getNumContainers() + - ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() + - "~" + resourceRequest.request.getMaxDiskSlotPerContainer() + - ", queryMasterRequest=" + resourceRequest.queryMasterRequest + - ", liveWorkers=" + rmContext.getWorkers().size()); - } - - // TajoWorkerResourceManager can't return allocated disk slots occasionally. - // Because the rest resource request can remains after QueryMaster stops. - // Thus we need to find whether QueryId stopped or not. - if (!rmContext.getStoppedQueryIds().contains(resourceRequest.queryId)) { - List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest); - - if(allocatedWorkerResources.size() > 0) { - List<WorkerAllocatedResource> allocatedResources = - new ArrayList<WorkerAllocatedResource>(); - - for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) { - NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(), - allocatedResource.worker.getConnectionInfo().getPeerRpcPort()); - - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); - - containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); - containerId.setId(containerIdSeq.incrementAndGet()); - - ContainerProtocol.TajoContainerIdProto containerIdProto = containerId.getProto(); - allocatedResources.add(WorkerAllocatedResource.newBuilder() - .setContainerId(containerIdProto) - .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto()) - .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) - .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) - .build()); - - - allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource); - } - - resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder() - .setQueryId(resourceRequest.request.getQueryId()) - .addAllWorkerAllocatedResource(allocatedResources) - .build() - ); - - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("========================================="); - LOG.debug("Available Workers"); - for(Worker worker: rmContext.getWorkers().values()) { - LOG.debug(worker.toString()); - } - LOG.debug("========================================="); - } - requestQueue.put(resourceRequest); - Thread.sleep(QUEUE_POLLING_TIME); - } - } - } catch(InterruptedException ie) { - LOG.error(ie); - } catch (Throwable t) { - LOG.error(t, t); - } - } - } - } - - private static final long MAX_WAIT_TIME = 10000; - - public ClusterResourceSummary getClusterResourceSummary() { - BasicFuture<ClusterResourceSummary> future = - summaryRequest.check(new BasicFuture<ClusterResourceSummary>()); - try { - return future.get(MAX_WAIT_TIME, TimeUnit.MILLISECONDS); - } catch (Exception e) { - LOG.warn("Failed to get cluster summary by exception", e); - } - return null; - } - - private ClusterResourceSummary makeClusterResourceSummary() { - - int totalDiskSlots = 0; - int totalCpuCoreSlots = 0; - int totalMemoryMB = 0; - - int totalAvailableDiskSlots = 0; - int totalAvailableCpuCoreSlots = 0; - int totalAvailableMemoryMB = 0; - - for(Worker worker: rmContext.getWorkers().values()) { - - WorkerResource resource = worker.getResource(); - - totalMemoryMB += resource.getMemoryMB(); - totalAvailableMemoryMB += resource.getAvailableMemoryMB(); - - totalDiskSlots += resource.getDiskSlots(); - totalAvailableDiskSlots += resource.getAvailableDiskSlots(); - - totalCpuCoreSlots += resource.getCpuCoreSlots(); - totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots(); - } - - return ClusterResourceSummary.newBuilder() - .setNumWorkers(rmContext.getWorkers().size()) - .setTotalCpuCoreSlots(totalCpuCoreSlots) - .setTotalDiskSlots(totalDiskSlots) - .setTotalMemoryMB(totalMemoryMB) - .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots) - .setTotalAvailableDiskSlots(totalAvailableDiskSlots) - .setTotalAvailableMemoryMB(totalAvailableMemoryMB) - .build(); - } - - private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) { - List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>(); - - int allocatedResources = 0; - - ResourceRequestPriority resourceRequestPriority - = resourceRequest.request.getResourceRequestPriority(); - - List<Worker> randomWorkers = new ArrayList<Worker>(rmContext.getWorkers().values()); - Collections.shuffle(randomWorkers); - - if(resourceRequestPriority == ResourceRequestPriority.MEMORY) { - - int numContainers = resourceRequest.request.getNumContainers(); - int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer(); - int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer(); - float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(), - resourceRequest.request.getMinDiskSlotPerContainer()); - - int liveWorkerSize = randomWorkers.size(); - Set<Integer> insufficientWorkers = new HashSet<Integer>(); - boolean stop = false; - boolean checkMax = true; - while(!stop) { - if(allocatedResources >= numContainers) { - break; - } - - if(insufficientWorkers.size() >= liveWorkerSize) { - if(!checkMax) { - break; - } - insufficientWorkers.clear(); - checkMax = false; - } - int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB; - - for(Worker worker: randomWorkers) { - if(allocatedResources >= numContainers) { - stop = true; - break; - } - - if(insufficientWorkers.size() >= liveWorkerSize) { - break; - } - - WorkerResource workerResource = worker.getResource(); - if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) { - int workerMemory; - if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) { - workerMemory = maxMemoryMB; - } else { - workerMemory = workerResource.getAvailableMemoryMB(); - } - AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource(); - allocatedWorkerResource.worker = worker; - allocatedWorkerResource.allocatedMemoryMB = workerMemory; - if(workerResource.getAvailableDiskSlots() >= diskSlot) { - allocatedWorkerResource.allocatedDiskSlots = diskSlot; - } else { - allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots(); - } - - workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, - allocatedWorkerResource.allocatedMemoryMB); - - selectedWorkers.add(allocatedWorkerResource); - - allocatedResources++; - } else { - insufficientWorkers.add(worker.getWorkerId()); - } - } - } - } else { - int numContainers = resourceRequest.request.getNumContainers(); - float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer(); - float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer(); - int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(), - resourceRequest.request.getMinMemoryMBPerContainer()); - - int liveWorkerSize = randomWorkers.size(); - Set<Integer> insufficientWorkers = new HashSet<Integer>(); - boolean stop = false; - boolean checkMax = true; - while(!stop) { - if(allocatedResources >= numContainers) { - break; - } - - if(insufficientWorkers.size() >= liveWorkerSize) { - if(!checkMax) { - break; - } - insufficientWorkers.clear(); - checkMax = false; - } - float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots; - - for(Worker worker: randomWorkers) { - if(allocatedResources >= numContainers) { - stop = true; - break; - } - - if(insufficientWorkers.size() >= liveWorkerSize) { - break; - } - - WorkerResource workerResource = worker.getResource(); - if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) { - float workerDiskSlots; - if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) { - workerDiskSlots = maxDiskSlots; - } else { - workerDiskSlots = workerResource.getAvailableDiskSlots(); - } - AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource(); - allocatedWorkerResource.worker = worker; - allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots; - - if(workerResource.getAvailableMemoryMB() >= memoryMB) { - allocatedWorkerResource.allocatedMemoryMB = memoryMB; - } else { - allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB(); - } - workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, - allocatedWorkerResource.allocatedMemoryMB); - - selectedWorkers.add(allocatedWorkerResource); - - allocatedResources++; - } else { - insufficientWorkers.add(worker.getWorkerId()); - } - } - } - } - return selectedWorkers; - } - - /** - * Release allocated resource. - * - * @param containerId ContainerIdProto to be released - */ - @Override - public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId) { - AllocatedWorkerResource allocated = allocatedResourceMap.remove(containerId); - if(allocated != null) { - LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB); - allocated.worker.getResource().releaseResource( allocated.allocatedDiskSlots, allocated.allocatedMemoryMB); - } else { - LOG.warn("No AllocatedWorkerResource data for [" + containerId + "]"); - return; - } - } - - @Override - public boolean isQueryMasterStopped(QueryId queryId) { - return !rmContext.getQueryMasterContainer().containsKey(queryId); - } - - @Override - public void releaseQueryMaster(QueryId queryId) { - if(!rmContext.getQueryMasterContainer().containsKey(queryId)) { - LOG.warn("No QueryMaster resource info for " + queryId); - return; - } else { - ContainerProtocol.TajoContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId); - releaseWorkerResource(containerId); - rmContext.getStoppedQueryIds().add(queryId); - LOG.info(String.format("Released QueryMaster (%s) resource." , queryId.toString())); - } - } - - public TajoRMContext getRMContext() { - return rmContext; - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java deleted file mode 100644 index 6535688..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; - -import java.util.EnumSet; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * It contains resource and various information for a worker. - */ -public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { - /** class logger */ - private static final Log LOG = LogFactory.getLog(Worker.class); - - private final ReentrantReadWriteLock.ReadLock readLock; - private final ReentrantReadWriteLock.WriteLock writeLock; - - /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */ - private final TajoRMContext rmContext; - - /** last heartbeat time */ - private long lastHeartbeatTime; - - /** Resource capability */ - private final WorkerResource resource; - - /** Worker connection information */ - private WorkerConnectionInfo connectionInfo; - - private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition(); - private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition(); - - private static final StateMachineFactory<Worker, - WorkerState, - WorkerEventType, - WorkerEvent> stateMachineFactory - = new StateMachineFactory<Worker, - WorkerState, - WorkerEventType, - WorkerEvent>(WorkerState.NEW) - - // Transition from NEW - .addTransition(WorkerState.NEW, WorkerState.RUNNING, - WorkerEventType.STARTED, - new AddNodeTransition()) - - // Transition from RUNNING - .addTransition(WorkerState.RUNNING, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY), - WorkerEventType.STATE_UPDATE, - STATUS_UPDATE_TRANSITION) - .addTransition(WorkerState.RUNNING, WorkerState.LOST, - WorkerEventType.EXPIRE, - new DeactivateNodeTransition(WorkerState.LOST)) - .addTransition(WorkerState.RUNNING, WorkerState.RUNNING, - WorkerEventType.RECONNECTED, - RECONNECT_NODE_TRANSITION) - - // Transitions from UNHEALTHY state - .addTransition(WorkerState.UNHEALTHY, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY), - WorkerEventType.STATE_UPDATE, - STATUS_UPDATE_TRANSITION) - .addTransition(WorkerState.UNHEALTHY, WorkerState.LOST, - WorkerEventType.EXPIRE, - new DeactivateNodeTransition(WorkerState.LOST)) - .addTransition(WorkerState.UNHEALTHY, WorkerState.UNHEALTHY, - WorkerEventType.RECONNECTED, - RECONNECT_NODE_TRANSITION); - - private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine = - stateMachineFactory.make(this, WorkerState.NEW); - - public Worker(TajoRMContext rmContext, WorkerResource resource, WorkerConnectionInfo connectionInfo) { - this.rmContext = rmContext; - - this.connectionInfo = connectionInfo; - this.lastHeartbeatTime = System.currentTimeMillis(); - this.resource = resource; - - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.readLock = lock.readLock(); - this.writeLock = lock.writeLock(); - } - - public int getWorkerId() { - return connectionInfo.getId(); - } - - public WorkerConnectionInfo getConnectionInfo() { - return connectionInfo; - } - - public void setLastHeartbeatTime(long lastheartbeatReportTime) { - this.writeLock.lock(); - - try { - this.lastHeartbeatTime = lastheartbeatReportTime; - } finally { - this.writeLock.unlock(); - } - } - - public long getLastHeartbeatTime() { - this.readLock.lock(); - - try { - return this.lastHeartbeatTime; - } finally { - this.readLock.unlock(); - } - } - - /** - * - * @return the current state of worker - */ - public WorkerState getState() { - this.readLock.lock(); - - try { - return this.stateMachine.getCurrentState(); - } finally { - this.readLock.unlock(); - } - } - - /** - * - * @return the current resource capability of worker - */ - public WorkerResource getResource() { - return this.resource; - } - - @Override - public int compareTo(Worker o) { - if(o == null) { - return 1; - } - return connectionInfo.compareTo(o.connectionInfo); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Worker worker = (Worker) o; - - if (lastHeartbeatTime != worker.lastHeartbeatTime) return false; - if (connectionInfo != null ? !connectionInfo.equals(worker.connectionInfo) : worker.connectionInfo != null) - return false; - if (readLock != null ? !readLock.equals(worker.readLock) : worker.readLock != null) return false; - if (resource != null ? !resource.equals(worker.resource) : worker.resource != null) return false; - if (rmContext != null ? !rmContext.equals(worker.rmContext) : worker.rmContext != null) return false; - if (stateMachine != null ? !stateMachine.equals(worker.stateMachine) : worker.stateMachine != null) return false; - if (writeLock != null ? !writeLock.equals(worker.writeLock) : worker.writeLock != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = readLock != null ? readLock.hashCode() : 0; - result = 31 * result + (writeLock != null ? writeLock.hashCode() : 0); - result = 31 * result + (rmContext != null ? rmContext.hashCode() : 0); - result = 31 * result + (int) (lastHeartbeatTime ^ (lastHeartbeatTime >>> 32)); - result = 31 * result + (resource != null ? resource.hashCode() : 0); - result = 31 * result + (connectionInfo != null ? connectionInfo.hashCode() : 0); - result = 31 * result + (stateMachine != null ? stateMachine.hashCode() : 0); - return result; - } - - public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> { - @Override - public void transition(Worker worker, WorkerEvent workerEvent) { - - worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId()); - LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster"); - } - } - - public static class StatusUpdateTransition implements - MultipleArcTransition<Worker, WorkerEvent, WorkerState> { - - @Override - public WorkerState transition(Worker worker, WorkerEvent event) { - if (!(event instanceof WorkerStatusEvent)) { - throw new IllegalArgumentException("event should be a WorkerStatusEvent type."); - } - WorkerStatusEvent statusEvent = (WorkerStatusEvent) event; - worker.updateStatus(statusEvent); - - return WorkerState.RUNNING; - } - } - - private void updateStatus(WorkerStatusEvent statusEvent) { - this.writeLock.lock(); - - try { - lastHeartbeatTime = System.currentTimeMillis(); - resource.setNumRunningTasks(statusEvent.getRunningTaskNum()); - resource.setMaxHeap(statusEvent.maxHeap()); - resource.setFreeHeap(statusEvent.getFreeHeap()); - resource.setTotalHeap(statusEvent.getTotalHeap()); - } finally { - this.writeLock.unlock(); - } - } - - public static class DeactivateNodeTransition implements SingleArcTransition<Worker, WorkerEvent> { - private final WorkerState finalState; - - public DeactivateNodeTransition(WorkerState finalState) { - this.finalState = finalState; - } - - @Override - public void transition(Worker worker, WorkerEvent workerEvent) { - - worker.rmContext.getWorkers().remove(worker.getWorkerId()); - LOG.info("Deactivating Node " + worker.getWorkerId() + " as it is now " + finalState); - worker.rmContext.getInactiveWorkers().putIfAbsent(worker.getWorkerId(), worker); - } - } - - public static class ReconnectNodeTransition implements SingleArcTransition<Worker, WorkerEvent> { - - @Override - public void transition(Worker worker, WorkerEvent workerEvent) { - if (!(workerEvent instanceof WorkerReconnectEvent)) { - throw new IllegalArgumentException("workerEvent should be a WorkerReconnectEvent type."); - } - WorkerReconnectEvent castedEvent = (WorkerReconnectEvent) workerEvent; - - Worker newWorker = castedEvent.getWorker(); - worker.rmContext.getWorkers().put(castedEvent.getWorkerId(), newWorker); - worker.rmContext.getDispatcher().getEventHandler().handle( - new WorkerEvent(worker.getWorkerId(), WorkerEventType.STARTED)); - } - } - - @Override - public void handle(WorkerEvent event) { - LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType()); - try { - writeLock.lock(); - WorkerState oldState = getState(); - try { - stateMachine.doTransition(event.getType(), event); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state" - + ", eventType:" + event.getType().name() - + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() - , e); - LOG.error("Invalid event " + event.getType() + " on Worker " + getWorkerId()); - } - if (oldState != getState()) { - LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState()); - } - } - - finally { - writeLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java deleted file mode 100644 index c208990..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import org.apache.hadoop.yarn.event.AbstractEvent; - -/** - * WorkerEvent describes all kinds of events which sent to {@link Worker}. - */ -public class WorkerEvent extends AbstractEvent<WorkerEventType> { - private final int workerId; - - public WorkerEvent(int workerId, WorkerEventType workerEventType) { - super(workerEventType); - this.workerId = workerId; - } - - public int getWorkerId() { - return workerId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java deleted file mode 100644 index 0c97654..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -public enum WorkerEventType { - - /** Source : {@link TajoResourceTracker}, Destination: {@link Worker} */ - STARTED, - STATE_UPDATE, - RECONNECTED, - - /** Source : {@link WorkerLivelinessMonitor}, Destination: {@link Worker} */ - EXPIRE -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java deleted file mode 100644 index 2751886..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tajo.conf.TajoConf; - -/** - * It periodically checks the latest heartbeat time of {@link Worker}. - * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}. - */ -public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<Integer> { - - private EventHandler dispatcher; - - public WorkerLivelinessMonitor(Dispatcher d) { - super(WorkerLivelinessMonitor.class.getSimpleName(), new SystemClock()); - this.dispatcher = d.getEventHandler(); - } - - public void serviceInit(Configuration conf) throws Exception { - Preconditions.checkArgument(conf instanceof TajoConf); - TajoConf systemConf = (TajoConf) conf; - // milliseconds - int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT); - setExpireInterval(expireIntvl); - setMonitorInterval(expireIntvl/3); - super.serviceInit(conf); - } - - @Override - protected void expire(Integer id) { - dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java deleted file mode 100644 index 3828b6a..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -/** - * {@link TajoResourceTracker} produces this event, and it's destination is {@link Worker}. - * This event occurs only when an inactive worker sends a ping again. - */ -public class WorkerReconnectEvent extends WorkerEvent { - private final Worker worker; - public WorkerReconnectEvent(int workerId, Worker worker) { - super(workerId, WorkerEventType.RECONNECTED); - this.worker = worker; - } - - public Worker getWorker() { - return worker; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java deleted file mode 100644 index 3d5e062..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import com.google.protobuf.RpcCallback; -import org.apache.hadoop.service.Service; -import org.apache.tajo.QueryId; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest; -import org.apache.tajo.master.QueryInProgress; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -/** - * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers - * and release the allocated containers. - */ -public interface WorkerResourceManager extends Service { - - /** - * Request a resource container for a QueryMaster. - * - * @param queryInProgress QueryInProgress - * @return A allocated container resource - */ - @Deprecated - public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress); - - /** - * Request one or more resource containers. You can set the number of containers and resource capabilities, such as - * memory, CPU cores, and disk slots. This is an asynchronous call. You should use a callback to get allocated - * resource containers. Each container is identified {@link org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto}. - * - * @param request Request description - * @param rpcCallBack Callback function - */ - public void allocateWorkerResources(WorkerResourceAllocationRequest request, - RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> rpcCallBack); - - /** - * Release a container - * - * @param containerId ContainerIdProto to be released - */ - public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId); - - public String getSeedQueryId() throws IOException; - - /** - * Check if a query master is stopped. - * - * @param queryId QueryId to be checked - * @return True if QueryMaster is stopped - */ - public boolean isQueryMasterStopped(QueryId queryId); - - /** - * Stop a query master - * - * @param queryId QueryId to be stopped - */ - public void releaseQueryMaster(QueryId queryId); - - /** - * - * @return a Map instance containing active workers - */ - public Map<Integer, Worker> getWorkers(); - - /** - * - * @return a Map instance containing inactive workers - */ - public Map<Integer, Worker> getInactiveWorkers(); - - public void stop(); - - /** - * - * @return The overall summary of cluster resources - */ - public ClusterResourceSummary getClusterResourceSummary(); - - /** - * - * @return WorkerIds on which QueryMasters are running - */ - public Collection<Integer> getQueryMasters(); - - /** - * - * @return RMContext - */ - public TajoRMContext getRMContext(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java deleted file mode 100644 index a941008..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -/** - * It presents the states of {@link Worker}. - */ -public enum WorkerState { - /** New worker */ - NEW, - - /** Running worker */ - RUNNING, - - /** Worker is unhealthy */ - UNHEALTHY, - - /** worker is out of service */ - DECOMMISSIONED, - - /** worker has not sent a heartbeat for some configured time threshold */ - LOST; - - @SuppressWarnings("unused") - public boolean isUnusable() { - return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java deleted file mode 100644 index f1ab401..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -/** - * {@link TajoResourceTracker} produces this event, and its destination is - * {@link org.apache.tajo.master.rm.Worker.StatusUpdateTransition} of {@link Worker}. - */ -public class WorkerStatusEvent extends WorkerEvent { - private final int runningTaskNum; - private final long maxHeap; - private final long freeHeap; - private final long totalHeap; - - public WorkerStatusEvent(int workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) { - super(workerId, WorkerEventType.STATE_UPDATE); - this.runningTaskNum = runningTaskNum; - this.maxHeap = maxHeap; - this.freeHeap = freeHeap; - this.totalHeap = totalHeap; - } - - public int getRunningTaskNum() { - return runningTaskNum; - } - - public long maxHeap() { - return maxHeap; - } - - public long getFreeHeap() { - return freeHeap; - } - - public long getTotalHeap() { - return totalHeap; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java new file mode 100644 index 0000000..1380417 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import org.apache.hadoop.service.AbstractService; +import org.apache.tajo.QueryId; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.resource.ResourceCalculator; + +/** + * please refer to {@TajoResourceScheduler} for detailed information. + */ +public abstract class AbstractQueryScheduler extends AbstractService implements TajoResourceScheduler { + + protected final NodeResource clusterResource; + protected final NodeResource minResource; + protected final NodeResource maxResource; + protected final NodeResource qmMinResource; + + public AbstractQueryScheduler(String name) { + super(name); + this.minResource = NodeResources.createResource(0); + this.qmMinResource = NodeResources.createResource(0); + this.maxResource = NodeResources.createResource(0); + this.clusterResource = NodeResources.createResource(0); + } + + @Override + public NodeResource getClusterResource() { + return clusterResource; + } + + @Override + public NodeResource getMinimumResourceCapability() { + return minResource; + } + + @Override + public NodeResource getMaximumResourceCapability() { + return maxResource; + } + + @Override + public NodeResource getQMMinimumResourceCapability() { + return qmMinResource; + } + + public abstract int getRunningQuery(); + + public abstract ResourceCalculator getResourceCalculator(); + + public abstract void submitQuery(QuerySchedulingInfo schedulingInfo); + + public abstract void stopQuery(QueryId queryId); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java index 3dd3389..a1fe743 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java @@ -20,13 +20,27 @@ package org.apache.tajo.master.scheduler; import com.google.common.base.Objects; import org.apache.tajo.QueryId; +import org.apache.tajo.util.NumberUtil; -public class QuerySchedulingInfo { +/** + * A QuerySchedulingInfo represents an scheduling information. + * It provides a common interface for queue and priority + */ + +public class QuerySchedulingInfo implements Comparable<QuerySchedulingInfo> { + /** Name of queue */ + private String queue; + /** Query owner */ + private String user; private QueryId queryId; - private Integer priority; - private Long startTime; + /** Query priority for queries in same queue */ + private int priority; + /** Start time for query in same queue */ + private long startTime; - public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) { + public QuerySchedulingInfo(String queue, String user, QueryId queryId, int priority, long startTime) { + this.queue = queue; + this.user = user; this.queryId = queryId; this.priority = priority; this.startTime = startTime; @@ -36,11 +50,15 @@ public class QuerySchedulingInfo { return queryId; } - public Integer getPriority() { + public String getUser() { + return user; + } + + public int getPriority() { return priority; } - public Long getStartTime() { + public long getStartTime() { return startTime; } @@ -48,8 +66,36 @@ public class QuerySchedulingInfo { return queryId.getId(); } + public String getQueue() { + return queue; + } + + + @Override + public int compareTo(QuerySchedulingInfo o) { + int ret = NumberUtil.compare(priority, o.priority); + if(ret == 0) { + ret = NumberUtil.compare(startTime, o.startTime); + } + return ret; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + QuerySchedulingInfo other = (QuerySchedulingInfo) obj; + if (!this.getQueryId().equals(other.getQueryId())) + return false; + return true; + } + @Override public int hashCode() { - return Objects.hashCode(startTime, getName(), priority); + return Objects.hashCode(queryId, queue, user, priority, startTime); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java new file mode 100644 index 0000000..acf793c --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import java.lang.String;import java.util.List; + +/** + * QueueInfo is a report of the runtime information of the queue. + * <p> + * It includes information such as: + * <ul> + * <li>Queue name.</li> + * <li>Capacity of the queue.</li> + * <li>Maximum capacity of the queue.</li> + * <li>Current capacity of the queue.</li> + * <li>Child queues.</li> + * <li>Running applications.</li> + * <li>{@link QueueState} of the queue.</li> + * </ul> + * + */ + +public abstract class QueueInfo { + /** + * Get the <em>name</em> of the queue. + * @return <em>name</em> of the queue + */ + public abstract String getQueueName(); + + public abstract void setQueueName(String queueName); + + /** + * Get the <em>configured capacity</em> of the queue. + * @return <em>configured capacity</em> of the queue + */ + public abstract float getCapacity(); + + public abstract void setCapacity(float capacity); + + /** + * Get the <em>maximum capacity</em> of the queue. + * @return <em>maximum capacity</em> of the queue + */ + + public abstract float getMaximumCapacity(); + + public abstract void setMaximumCapacity(float maximumCapacity); + + /** + * Get the <em>maximum query capacity</em> of the queue. + * @return <em>maximum query capacity</em> of the queue + */ + + public abstract float getMaximumQueryCapacity(); + + public abstract void setMaximumQueryCapacity(float maximumQueryCapacity); + + /** + * Get the <em>current capacity</em> of the queue. + * @return <em>current capacity</em> of the queue + */ + + public abstract float getCurrentCapacity(); + + public abstract void setCurrentCapacity(float currentCapacity); + + /** + * Get the <em>child queues</em> of the queue. + * @return <em>child queues</em> of the queue + */ + + public abstract List<QueueInfo> getChildQueues(); + + public abstract void setChildQueues(List<QueueInfo> childQueues); + + + /** + * Get the <code>QueueState</code> of the queue. + * @return <code>QueueState</code> of the queue + */ + public abstract QueueState getQueueState(); + + public abstract void setQueueState(QueueState queueState); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java new file mode 100644 index 0000000..d7acbfe --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.QueueInfo; + +/** + * State of a Queue. + * <p> + * A queue is in one of: + * <ul> + * <li>{@link #RUNNING} - normal state.</li> + * <li>{@link #STOPPED} - not accepting new application submissions.</li> + * </ul> + * + * @see QueueInfo + * @see ApplicationClientProtocol#getQueueInfo(org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest) + */ +@Public +@Stable +public enum QueueState { + /** + * Stopped - Not accepting submissions of new applications. + */ + STOPPED, + + /** + * Running - normal operation. + */ + RUNNING +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java index 7fd07b5..efe4561 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java @@ -18,6 +18,8 @@ package org.apache.tajo.master.scheduler; +import org.apache.tajo.util.NumberUtil; + import java.util.Comparator; /** @@ -32,9 +34,9 @@ public class SchedulingAlgorithms { public static class FifoComparator implements Comparator<QuerySchedulingInfo> { @Override public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) { - int res = q1.getPriority().compareTo(q2.getPriority()); + int res = NumberUtil.compare(q1.getPriority(), q2.getPriority()); if (res == 0) { - res = (int) Math.signum(q1.getStartTime() - q2.getStartTime()); + res = NumberUtil.compare(q1.getStartTime(), q2.getStartTime()); } if (res == 0) { // In the rare case where jobs were submitted at the exact same time, http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java deleted file mode 100644 index 6cb98eb..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.scheduler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.QueryId; -import org.apache.tajo.master.QueryInProgress; -import org.apache.tajo.master.QueryManager; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - -public class SimpleFifoScheduler implements Scheduler { - private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName()); - private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>(); - private final Thread queryProcessor; - private AtomicBoolean stopped = new AtomicBoolean(); - private QueryManager manager; - private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator(); - - public SimpleFifoScheduler(QueryManager manager) { - this.manager = manager; - this.queryProcessor = new Thread(new QueryProcessor()); - this.queryProcessor.setName("Query Processor"); - } - - @Override - public Mode getMode() { - return Mode.FIFO; - } - - @Override - public String getName() { - return manager.getName(); - } - - @Override - public boolean addQuery(QueryInProgress queryInProgress) { - int qSize = pool.size(); - if (qSize != 0 && qSize % 100 == 0) { - LOG.info("Size of Fifo queue is " + qSize); - } - - QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, - queryInProgress.getQueryInfo().getStartTime()); - boolean result = pool.add(querySchedulingInfo); - if (getRunningQueries().size() == 0) wakeupProcessor(); - return result; - } - - @Override - public boolean removeQuery(QueryId queryId) { - return pool.remove(getQueryByQueryId(queryId)); - } - - public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) { - for (QuerySchedulingInfo querySchedulingInfo : pool) { - if (querySchedulingInfo.getQueryId().equals(queryId)) { - return querySchedulingInfo; - } - } - return null; - } - - @Override - public List<QueryInProgress> getRunningQueries() { - return new ArrayList<QueryInProgress>(manager.getRunningQueries()); - } - - public void start() { - queryProcessor.start(); - } - - public void stop() { - if (stopped.getAndSet(true)) { - return; - } - pool.clear(); - synchronized (queryProcessor) { - queryProcessor.interrupt(); - } - } - - private QuerySchedulingInfo pollScheduledQuery() { - if (pool.size() > 1) { - Collections.sort(pool, COMPARATOR); - } - return pool.poll(); - } - - private void wakeupProcessor() { - synchronized (queryProcessor) { - queryProcessor.notifyAll(); - } - } - - private final class QueryProcessor implements Runnable { - @Override - public void run() { - - QuerySchedulingInfo query; - - while (!stopped.get() && !Thread.currentThread().isInterrupted()) { - query = null; - if (getRunningQueries().size() == 0) { - query = pollScheduledQuery(); - } - - if (query != null) { - try { - manager.startQueryJob(query.getQueryId()); - } catch (Throwable t) { - LOG.fatal("Exception during query startup:", t); - manager.stopQuery(query.getQueryId()); - } - } - - synchronized (queryProcessor) { - try { - queryProcessor.wait(500); - } catch (InterruptedException e) { - if (stopped.get()) { - break; - } - LOG.warn("Exception during shutdown: ", e); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java new file mode 100644 index 0000000..e41ac95 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.QueryId; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.rm.TajoRMContext; +import org.apache.tajo.master.rm.NodeStatus; +import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent; +import org.apache.tajo.master.scheduler.event.SchedulerEvent; +import org.apache.tajo.resource.DefaultResourceCalculator; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.resource.ResourceCalculator; +import org.apache.tajo.util.TUtil; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + +import static org.apache.tajo.ResourceProtos.*; + +/** + * SimpleScheduler can execute query and stages simultaneously. + * Each query and the stage competes to get the resource + */ +public class SimpleScheduler extends AbstractQueryScheduler { + + private static final Log LOG = LogFactory.getLog(SimpleScheduler.class); + private static final float MAXIMUM_RUNNING_QM_RATE = 0.5f; + private static final Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator(); + + private volatile boolean isStopped = false; + private final TajoMaster.MasterContext masterContext; + + private final TajoRMContext rmContext; + private final BlockingQueue<QuerySchedulingInfo> queryQueue; + private final Map<QueryId, QuerySchedulingInfo> pendingQueryMap = Maps.newHashMap(); + + private final Map<QueryId, Integer> assignedQueryMasterMap = Maps.newHashMap(); + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + + private final Thread queryProcessor; + private TajoConf tajoConf; + + @VisibleForTesting + public SimpleScheduler(TajoMaster.MasterContext context, TajoRMContext rmContext) { + super(SimpleScheduler.class.getName()); + this.masterContext = context; + this.rmContext = rmContext; + //Copy default array capacity from PriorityBlockingQueue. + this.queryQueue = new PriorityBlockingQueue<QuerySchedulingInfo>(11, COMPARATOR); + this.queryProcessor = new Thread(new QueryProcessor()); + } + + public SimpleScheduler(TajoMaster.MasterContext context) { + this(context, context.getResourceManager().getRMContext()); + } + + private void initScheduler(TajoConf conf) { + this.minResource.setMemory(conf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY)).setVirtualCores(1); + this.qmMinResource.setMemory(conf.getIntVar(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY)).setVirtualCores(1); + updateResource(); + this.queryProcessor.setName("Query Processor"); + } + + private void updateResource() { + NodeResource resource = NodeResources.createResource(0); + NodeResource totalResource = NodeResources.createResource(0); + for (NodeStatus nodeStatus : getRMContext().getNodes().values()) { + NodeResources.addTo(resource, nodeStatus.getAvailableResource()); + NodeResources.addTo(totalResource, nodeStatus.getTotalResourceCapability()); + + } + + NodeResources.update(maxResource, totalResource); + NodeResources.update(clusterResource, resource); + + if(LOG.isDebugEnabled()) { + LOG.debug("Cluster Resource. available : " + getClusterResource() + + " maximum: " + getMaximumResourceCapability()); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + initScheduler(tajoConf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.queryProcessor.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + this.isStopped = true; + super.serviceStop(); + } + + @Override + public int getRunningQuery() { + return assignedQueryMasterMap.size(); + } + + @Override + public ResourceCalculator getResourceCalculator() { + return resourceCalculator; + } + + private NodeResourceRequest createQMResourceRequest(QueryInfo queryInfo) { + NodeResource qmResource = getQMMinimumResourceCapability(); + + int containers = 1; + Set<Integer> assignedQMNodes = Sets.newHashSet(assignedQueryMasterMap.values()); + List<Integer> idleNode = Lists.newArrayList(); + + for (NodeStatus nodeStatus : getRMContext().getNodes().values()) { + + //find idle node for QM + if (!assignedQMNodes.contains(nodeStatus.getWorkerId())) { + idleNode.add(nodeStatus.getWorkerId()); + } + + if (idleNode.size() > containers * 3) break; + } + + NodeResourceRequest.Builder builder = NodeResourceRequest.newBuilder(); + + builder.setQueryId(queryInfo.getQueryId().getProto()) + .setCapacity(qmResource.getProto()) + .setType(ResourceType.QUERYMASTER) + .setPriority(1) + .setNumContainers(containers) + .setRunningTasks(1) + .addAllCandidateNodes(idleNode) + .setUserId(queryInfo.getQueryContext().getUser()); + //TODO .setQueue(queryInfo.getQueue()); + return builder.build(); + } + + + @Override + public int getNumClusterNodes() { + return rmContext.getNodes().size(); + } + + @Override + public List<AllocationResourceProto> + reserve(QueryId queryId, NodeResourceRequest request) { + + List<AllocationResourceProto> reservedResources; + NodeResource capacity = new NodeResource(request.getCapacity()); + if (!NodeResources.fitsIn(capacity, getClusterResource())) { + return Lists.newArrayList(); + } + + LinkedList<Integer> workers = new LinkedList<Integer>(); + + if (request.getCandidateNodesCount() > 0) { + workers.addAll(request.getCandidateNodesList()); + Collections.shuffle(workers); + } + + int requiredContainers = request.getNumContainers(); + // reserve resource from candidate workers for locality + reservedResources = reserveClusterResource(workers, capacity, requiredContainers); + + // reserve resource in random workers + if (reservedResources.size() < requiredContainers) { + LinkedList<Integer> randomNodes = new LinkedList<Integer>(getRMContext().getNodes().keySet()); + Collections.shuffle(randomNodes); + + reservedResources.addAll(reserveClusterResource( + randomNodes, capacity, requiredContainers - reservedResources.size())); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Request: " + request.getCapacity() + ", containerNum:" + request.getNumContainers() + + "Current cluster resource: " + getClusterResource()); + } + return reservedResources; + } + + private List<AllocationResourceProto> reserveClusterResource(List<Integer> workers, + NodeResource capacity, int requiredNum) { + + List<AllocationResourceProto> reservedResources = Lists.newArrayList(); + AllocationResourceProto.Builder resourceBuilder = AllocationResourceProto.newBuilder(); + int allocatedResources = 0; + + while (workers.size() > 0) { + Iterator<Integer> iter = workers.iterator(); + while (iter.hasNext()) { + + int workerId = iter.next(); + NodeStatus nodeStatus = getRMContext().getNodes().get(workerId); + if (nodeStatus == null) { + iter.remove(); + LOG.warn("Can't find the node. id :" + workerId); + continue; + } else { + if (NodeResources.fitsIn(capacity, nodeStatus.getAvailableResource())) { + NodeResources.subtractFrom(getClusterResource(), capacity); + NodeResources.subtractFrom(nodeStatus.getAvailableResource(), capacity); + allocatedResources++; + resourceBuilder.setResource(capacity.getProto()); + resourceBuilder.setWorkerId(workerId); + reservedResources.add(resourceBuilder.build()); + } else { + // remove unavailable nodeStatus; + iter.remove(); + } + } + + if (allocatedResources >= requiredNum) { + return reservedResources; + } + } + } + return reservedResources; + } + + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case RESOURCE_RESERVE: + //TODO should consider request priority + reserveResource(TUtil.checkTypeAndGet(event, ResourceReserveSchedulerEvent.class)); + break; + case RESOURCE_UPDATE: + updateResource(); + break; + default: + break; + + } + } + + /** + * This is an asynchronous call. You should use a callback to get reserved resource containers. + */ + protected void reserveResource(ResourceReserveSchedulerEvent schedulerEvent) { + List<AllocationResourceProto> resources = + reserve(new QueryId(schedulerEvent.getRequest().getQueryId()), schedulerEvent.getRequest()); + + NodeResourceResponse.Builder response = NodeResourceResponse.newBuilder(); + response.setQueryId(schedulerEvent.getRequest().getQueryId()); + schedulerEvent.getCallBack().run(response.addAllResource(resources).build()); + } + + /** + * Submit a query to scheduler + */ + public void submitQuery(QuerySchedulingInfo schedulingInfo) { + queryQueue.add(schedulingInfo); + pendingQueryMap.put(schedulingInfo.getQueryId(), schedulingInfo); + } + + protected boolean startQuery(QueryId queryId, AllocationResourceProto allocation) { + return masterContext.getQueryJobManager().startQueryJob(queryId, allocation); + } + + public void stopQuery(QueryId queryId) { + if(pendingQueryMap.containsKey(queryId)){ + queryQueue.remove(pendingQueryMap.remove(queryId)); + } + assignedQueryMasterMap.remove(queryId); + } + + public BlockingQueue<QuerySchedulingInfo> getQueryQueue() { + return queryQueue; + } + + private NodeStatus getWorker(int workerId) { + return rmContext.getNodes().get(workerId); + } + + protected TajoRMContext getRMContext() { + return rmContext; + } + + public WorkerConnectionInfo getQueryMaster(QueryId queryId) { + if (assignedQueryMasterMap.containsKey(queryId)) { + return rmContext.getNodes().get(assignedQueryMasterMap.get(queryId)).getConnectionInfo(); + } + return null; + } + + protected QueryInfo getQueryInfo(QueryId queryId) { + return masterContext.getQueryJobManager().getQueryInProgress(queryId).getQueryInfo(); + } + + private final class QueryProcessor implements Runnable { + @Override + public void run() { + + QuerySchedulingInfo query; + + while (!isStopped && !Thread.currentThread().isInterrupted()) { + try { + query = queryQueue.take(); + } catch (InterruptedException e) { + LOG.warn(e.getMessage(), e); + break; + } + //TODO get by assigned queue + int maxAvailable = getResourceCalculator().computeAvailableContainers( + getMaximumResourceCapability(), getQMMinimumResourceCapability()); + + // check maximum parallel running QM. allow 50% parallel running + if (assignedQueryMasterMap.size() >= Math.floor(maxAvailable * MAXIMUM_RUNNING_QM_RATE)) { + queryQueue.add(query); + synchronized (this) { + try { + this.wait(1000); + } catch (InterruptedException e) { + if(!isStopped) { + LOG.fatal(e.getMessage(), e); + return; + } + } + } + } else { + QueryInfo queryInfo = getQueryInfo(query.getQueryId()); + List<AllocationResourceProto> allocation = reserve(query.getQueryId(), createQMResourceRequest(queryInfo)); + + if(allocation.size() == 0) { + queryQueue.add(query); + LOG.info("No Available Resources for QueryMaster :" + queryInfo.getQueryId() + "," + queryInfo); + + synchronized (this) { + try { + this.wait(100); + } catch (InterruptedException e) { + LOG.fatal(e); + } + } + } else { + try { + //if QM resource can't be allocated to a node, it should retry + boolean started = startQuery(query.getQueryId(), allocation.get(0)); + if(!started) { + queryQueue.put(query); + } else { + assignedQueryMasterMap.put(query.getQueryId(), allocation.get(0).getWorkerId()); + } + } catch (Throwable t) { + LOG.fatal("Exception during query startup:", t); + masterContext.getQueryJobManager().stopQuery(query.getQueryId()); + } + } + } + LOG.info("Running Queries: " + assignedQueryMasterMap.size()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java new file mode 100644 index 0000000..c7c37c4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.QueryId; +import org.apache.tajo.ResourceProtos.NodeResourceRequest; +import org.apache.tajo.ResourceProtos.AllocationResourceProto; +import org.apache.tajo.master.scheduler.event.SchedulerEvent; +import org.apache.tajo.resource.NodeResource; + +import java.util.List; + +/** + * This interface is used by scheduler for allocating of resources. + */ +public interface TajoResourceScheduler extends EventHandler<SchedulerEvent> { + + /** + * Get the whole resource capacity of the cluster. + * @return the whole resource capacity of the cluster. + */ + + NodeResource getClusterResource(); + + /** + * Get minimum allocatable {@link NodeResource}. + * @return minimum allocatable resource + */ + NodeResource getMinimumResourceCapability(); + + /** + * Get minimum allocatable {@link NodeResource} of QueryMaster. + * @return minimum allocatable resource + */ + NodeResource getQMMinimumResourceCapability(); + + /** + * Get maximum allocatable {@link NodeResource}. + * @return maximum allocatable resource + */ + NodeResource getMaximumResourceCapability(); + + /** + * Get the number of nodes available in the cluster. + * @return the number of available nodes. + */ + int getNumClusterNodes(); + + /** + * Get reservation resource. The cluster resource is updated by TajoResourceTracker + * Request one or more resource containers. You can set the number of containers and resource capabilities, + * such as memory, CPU cores, and disk slots. + * @return the number of reserved resources. + */ + List<AllocationResourceProto> + reserve(QueryId queryId, NodeResourceRequest ask); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java new file mode 100644 index 0000000..47ee53b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler.event; + +import com.google.protobuf.RpcCallback; +import static org.apache.tajo.ResourceProtos.NodeResourceRequest; +import static org.apache.tajo.ResourceProtos.NodeResourceResponse; + +public class ResourceReserveSchedulerEvent extends SchedulerEvent { + + private NodeResourceRequest request; + + private RpcCallback<NodeResourceResponse> callBack; + + public ResourceReserveSchedulerEvent(NodeResourceRequest request, + RpcCallback<NodeResourceResponse> callback) { + super(SchedulerEventType.RESOURCE_RESERVE); + this.request = request; + this.callBack = callback; + } + + public NodeResourceRequest getRequest() { + return request; + } + + public RpcCallback<NodeResourceResponse> getCallBack() { + return callBack; + } +}
