TAJO-1385: Remove locking on RMContext. Signed-off-by: Jihoon Son <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/db912712 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/db912712 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/db912712 Branch: refs/heads/index_support Commit: db912712d338fa61472a0b78f304d1ce8cff3ae0 Parents: 67a3117 Author: navis.ryu <[email protected]> Authored: Fri Apr 10 22:38:15 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Fri Apr 10 22:38:15 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/util/BasicFuture.java | 104 +++++++ .../tajo/master/rm/TajoResourceTracker.java | 49 +--- .../master/rm/TajoWorkerResourceManager.java | 279 +++++++++++-------- .../java/org/apache/tajo/master/rm/Worker.java | 26 +- .../tajo/master/rm/WorkerResourceManager.java | 8 +- .../main/java/org/apache/tajo/rpc/RpcUtils.java | 9 +- 7 files changed, 305 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 706cb2f..9fe35f5 100644 --- a/CHANGES +++ b/CHANGES @@ -14,6 +14,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1385: Remove locking on RMContext. (Contributed by navis, + Committed by jihoon) + TAJO-1499: Check the bind status when EvalNode::eval() is called. (jihoon) TAJO-1400: Add TajoStatement::setMaxRows method support. http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java b/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java new file mode 100644 index 0000000..e857817 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class BasicFuture<T> implements Future<T> { + + private T result; + private Exception exception; + private boolean finished; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public synchronized boolean isDone() { + return finished; + } + + @Override + public synchronized T get() throws InterruptedException, ExecutionException { + while (!finished) { + wait(); + } + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } + + @Override + public synchronized T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + long remain = unit.toMillis(timeout); + long prev = System.currentTimeMillis(); + while (!finished && remain > 0) { + wait(remain); + long current = System.currentTimeMillis(); + remain -= current - prev; + prev = current; + } + if (!finished) { + throw new TimeoutException("Timed-out"); + } + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } + + public synchronized boolean done(T result) { + try { + if (finished) { + return false; + } + this.result = result; + this.finished = true; + return true; + } finally { + notifyAll(); + } + } + + public synchronized boolean failed(Exception ex) { + try { + if (finished) { + return false; + } + this.exception = ex; + this.finished = true; + return true; + } finally { + notifyAll(); + } + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 334dbf6..ba021fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; @@ -57,6 +56,8 @@ import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTracke public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface { /** Class logger */ private Log LOG = LogFactory.getLog(TajoResourceTracker.class); + + private final WorkerResourceManager manager; /** the context of TajoWorkerResourceManager */ private final TajoRMContext rmContext; /** Liveliness monitor which checks ping expiry times of workers */ @@ -67,9 +68,10 @@ public class TajoResourceTracker extends AbstractService implements TajoResource /** The bind address of RPC server of worker resource tracker */ private InetSocketAddress bindAddress; - public TajoResourceTracker(TajoRMContext rmContext, WorkerLivelinessMonitor workerLivelinessMonitor) { + public TajoResourceTracker(WorkerResourceManager manager, WorkerLivelinessMonitor workerLivelinessMonitor) { super(TajoResourceTracker.class.getSimpleName()); - this.rmContext = rmContext; + this.manager = manager; + this.rmContext = manager.getRMContext(); this.workerLivelinessMonitor = workerLivelinessMonitor; } @@ -175,7 +177,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource } } finally { - builder.setClusterResourceSummary(getClusterResourceSummary()); + builder.setClusterResourceSummary(manager.getClusterResourceSummary()); done.run(builder.build()); } } @@ -199,43 +201,4 @@ public class TajoResourceTracker extends AbstractService implements TajoResource return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo())); } - - public ClusterResourceSummary getClusterResourceSummary() { - int totalDiskSlots = 0; - int totalCpuCoreSlots = 0; - int totalMemoryMB = 0; - - int totalAvailableDiskSlots = 0; - int totalAvailableCpuCoreSlots = 0; - int totalAvailableMemoryMB = 0; - - synchronized(rmContext) { - for(int eachWorker: rmContext.getWorkers().keySet()) { - Worker worker = rmContext.getWorkers().get(eachWorker); - - if(worker != null) { - WorkerResource resource = worker.getResource(); - - totalMemoryMB += resource.getMemoryMB(); - totalAvailableMemoryMB += resource.getAvailableMemoryMB(); - - totalDiskSlots += resource.getDiskSlots(); - totalAvailableDiskSlots += resource.getAvailableDiskSlots(); - - totalCpuCoreSlots += resource.getCpuCoreSlots(); - totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots(); - } - } - } - - return ClusterResourceSummary.newBuilder() - .setNumWorkers(rmContext.getWorkers().size()) - .setTotalCpuCoreSlots(totalCpuCoreSlots) - .setTotalDiskSlots(totalDiskSlots) - .setTotalMemoryMB(totalMemoryMB) - .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots) - .setTotalAvailableDiskSlots(totalAvailableDiskSlots) - .setTotalAvailableMemoryMB(totalAvailableMemoryMB) - .build(); - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 091da2f..90a4eb5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -38,14 +38,13 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.rpc.CancelableRpcCallback; +import org.apache.tajo.rpc.RpcUtils; import org.apache.tajo.util.ApplicationIdUtils; +import org.apache.tajo.util.BasicFuture; import java.io.IOException; import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -72,7 +71,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke */ private WorkerLivelinessMonitor workerLivelinessMonitor; - private BlockingQueue<WorkerResourceRequest> requestQueue; + private final BlockingQueue<WorkerResourceRequest> requestQueue = + new LinkedBlockingDeque<WorkerResourceRequest>(); + private final RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>> summaryRequest = + new RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>>(); private AtomicBoolean stopped = new AtomicBoolean(false); @@ -105,8 +107,6 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke this.queryIdSeed = String.valueOf(System.currentTimeMillis()); - requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>(); - workerResourceAllocator = new WorkerResourceAllocationThread(); workerResourceAllocator.start(); @@ -116,7 +116,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke // Register event handler for Workers rmContext.getDispatcher().register(WorkerEventType.class, new WorkerEventDispatcher(rmContext)); - resourceTracker = new TajoResourceTracker(rmContext, workerLivelinessMonitor); + resourceTracker = new TajoResourceTracker(this, workerLivelinessMonitor); addIfService(resourceTracker); super.serviceInit(systemConf); @@ -160,11 +160,6 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } @Override - public ClusterResourceSummary getClusterResourceSummary() { - return resourceTracker.getClusterResourceSummary(); - } - - @Override public void serviceStop() throws Exception { if(stopped.get()) { return; @@ -289,13 +284,23 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke float allocatedDiskSlots; } + private static final long QUEUE_POLLING_TIME = 100; + class WorkerResourceAllocationThread extends Thread { @Override public void run() { LOG.info("WorkerResourceAllocationThread start"); while(!stopped.get()) { + BasicFuture<ClusterResourceSummary> future = summaryRequest.expire(); + if (future != null) { + future.done(makeClusterResourceSummary()); + } try { - WorkerResourceRequest resourceRequest = requestQueue.take(); + WorkerResourceRequest resourceRequest = requestQueue.poll( + QUEUE_POLLING_TIME, TimeUnit.MILLISECONDS); + if (resourceRequest == null) { + continue; + } if (LOG.isDebugEnabled()) { LOG.debug("allocateWorkerResources:" + @@ -351,13 +356,13 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke if(LOG.isDebugEnabled()) { LOG.debug("========================================="); LOG.debug("Available Workers"); - for(int liveWorker: rmContext.getWorkers().keySet()) { - LOG.debug(rmContext.getWorkers().get(liveWorker).toString()); + for(Worker worker: rmContext.getWorkers().values()) { + LOG.debug(worker.toString()); } LOG.debug("========================================="); } requestQueue.put(resourceRequest); - Thread.sleep(100); + Thread.sleep(QUEUE_POLLING_TIME); } } } catch(InterruptedException ie) { @@ -369,6 +374,54 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } } + private static final long MAX_WAIT_TIME = 10000; + + public ClusterResourceSummary getClusterResourceSummary() { + BasicFuture<ClusterResourceSummary> future = + summaryRequest.check(new BasicFuture<ClusterResourceSummary>()); + try { + return future.get(MAX_WAIT_TIME, TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.warn("Failed to get cluster summary by exception", e); + } + return null; + } + + private ClusterResourceSummary makeClusterResourceSummary() { + + int totalDiskSlots = 0; + int totalCpuCoreSlots = 0; + int totalMemoryMB = 0; + + int totalAvailableDiskSlots = 0; + int totalAvailableCpuCoreSlots = 0; + int totalAvailableMemoryMB = 0; + + for(Worker worker: rmContext.getWorkers().values()) { + + WorkerResource resource = worker.getResource(); + + totalMemoryMB += resource.getMemoryMB(); + totalAvailableMemoryMB += resource.getAvailableMemoryMB(); + + totalDiskSlots += resource.getDiskSlots(); + totalAvailableDiskSlots += resource.getAvailableDiskSlots(); + + totalCpuCoreSlots += resource.getCpuCoreSlots(); + totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots(); + } + + return ClusterResourceSummary.newBuilder() + .setNumWorkers(rmContext.getWorkers().size()) + .setTotalCpuCoreSlots(totalCpuCoreSlots) + .setTotalDiskSlots(totalDiskSlots) + .setTotalMemoryMB(totalMemoryMB) + .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots) + .setTotalAvailableDiskSlots(totalAvailableDiskSlots) + .setTotalAvailableMemoryMB(totalAvailableMemoryMB) + .build(); + } + private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) { List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>(); @@ -377,141 +430,133 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke ResourceRequestPriority resourceRequestPriority = resourceRequest.request.getResourceRequestPriority(); + List<Worker> randomWorkers = new ArrayList<Worker>(rmContext.getWorkers().values()); + Collections.shuffle(randomWorkers); + if(resourceRequestPriority == ResourceRequestPriority.MEMORY) { - synchronized(rmContext) { - List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet()); - Collections.shuffle(randomWorkers); - - int numContainers = resourceRequest.request.getNumContainers(); - int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer(); - int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer(); - float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(), + + int numContainers = resourceRequest.request.getNumContainers(); + int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer(); + int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer(); + float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(), resourceRequest.request.getMinDiskSlotPerContainer()); - int liveWorkerSize = randomWorkers.size(); - Set<Integer> insufficientWorkers = new HashSet<Integer>(); - boolean stop = false; - boolean checkMax = true; - while(!stop) { + int liveWorkerSize = randomWorkers.size(); + Set<Integer> insufficientWorkers = new HashSet<Integer>(); + boolean stop = false; + boolean checkMax = true; + while(!stop) { + if(allocatedResources >= numContainers) { + break; + } + + if(insufficientWorkers.size() >= liveWorkerSize) { + if(!checkMax) { + break; + } + insufficientWorkers.clear(); + checkMax = false; + } + int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB; + + for(Worker worker: randomWorkers) { if(allocatedResources >= numContainers) { + stop = true; break; } if(insufficientWorkers.size() >= liveWorkerSize) { - if(!checkMax) { - break; - } - insufficientWorkers.clear(); - checkMax = false; + break; } - int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB; - for(int eachWorker: randomWorkers) { - if(allocatedResources >= numContainers) { - stop = true; - break; + WorkerResource workerResource = worker.getResource(); + if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) { + int workerMemory; + if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) { + workerMemory = maxMemoryMB; + } else { + workerMemory = workerResource.getAvailableMemoryMB(); } - - if(insufficientWorkers.size() >= liveWorkerSize) { - break; + AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource(); + allocatedWorkerResource.worker = worker; + allocatedWorkerResource.allocatedMemoryMB = workerMemory; + if(workerResource.getAvailableDiskSlots() >= diskSlot) { + allocatedWorkerResource.allocatedDiskSlots = diskSlot; + } else { + allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots(); } - Worker worker = rmContext.getWorkers().get(eachWorker); - WorkerResource workerResource = worker.getResource(); - if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) { - int workerMemory; - if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) { - workerMemory = maxMemoryMB; - } else { - workerMemory = workerResource.getAvailableMemoryMB(); - } - AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource(); - allocatedWorkerResource.worker = worker; - allocatedWorkerResource.allocatedMemoryMB = workerMemory; - if(workerResource.getAvailableDiskSlots() >= diskSlot) { - allocatedWorkerResource.allocatedDiskSlots = diskSlot; - } else { - allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots(); - } - - workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, + workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, allocatedWorkerResource.allocatedMemoryMB); - selectedWorkers.add(allocatedWorkerResource); + selectedWorkers.add(allocatedWorkerResource); - allocatedResources++; - } else { - insufficientWorkers.add(eachWorker); - } + allocatedResources++; + } else { + insufficientWorkers.add(worker.getWorkerId()); } } } } else { - synchronized(rmContext) { - List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet()); - Collections.shuffle(randomWorkers); - - int numContainers = resourceRequest.request.getNumContainers(); - float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer(); - float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer(); - int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(), + int numContainers = resourceRequest.request.getNumContainers(); + float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer(); + float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer(); + int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(), resourceRequest.request.getMinMemoryMBPerContainer()); - int liveWorkerSize = randomWorkers.size(); - Set<Integer> insufficientWorkers = new HashSet<Integer>(); - boolean stop = false; - boolean checkMax = true; - while(!stop) { + int liveWorkerSize = randomWorkers.size(); + Set<Integer> insufficientWorkers = new HashSet<Integer>(); + boolean stop = false; + boolean checkMax = true; + while(!stop) { + if(allocatedResources >= numContainers) { + break; + } + + if(insufficientWorkers.size() >= liveWorkerSize) { + if(!checkMax) { + break; + } + insufficientWorkers.clear(); + checkMax = false; + } + float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots; + + for(Worker worker: randomWorkers) { if(allocatedResources >= numContainers) { + stop = true; break; } if(insufficientWorkers.size() >= liveWorkerSize) { - if(!checkMax) { - break; - } - insufficientWorkers.clear(); - checkMax = false; + break; } - float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots; - for(int eachWorker: randomWorkers) { - if(allocatedResources >= numContainers) { - stop = true; - break; + WorkerResource workerResource = worker.getResource(); + if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) { + float workerDiskSlots; + if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) { + workerDiskSlots = maxDiskSlots; + } else { + workerDiskSlots = workerResource.getAvailableDiskSlots(); } + AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource(); + allocatedWorkerResource.worker = worker; + allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots; - if(insufficientWorkers.size() >= liveWorkerSize) { - break; + if(workerResource.getAvailableMemoryMB() >= memoryMB) { + allocatedWorkerResource.allocatedMemoryMB = memoryMB; + } else { + allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB(); } - - Worker worker = rmContext.getWorkers().get(eachWorker); - WorkerResource workerResource = worker.getResource(); - if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) { - float workerDiskSlots; - if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) { - workerDiskSlots = maxDiskSlots; - } else { - workerDiskSlots = workerResource.getAvailableDiskSlots(); - } - AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource(); - allocatedWorkerResource.worker = worker; - allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots; - - if(workerResource.getAvailableMemoryMB() >= memoryMB) { - allocatedWorkerResource.allocatedMemoryMB = memoryMB; - } else { - allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB(); - } - workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, + workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots, allocatedWorkerResource.allocatedMemoryMB); - selectedWorkers.add(allocatedWorkerResource); + selectedWorkers.add(allocatedWorkerResource); - allocatedResources++; - } else { - insufficientWorkers.add(eachWorker); - } + allocatedResources++; + } else { + insufficientWorkers.add(worker.getWorkerId()); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java index a2ab598..6535688 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java @@ -44,7 +44,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { private long lastHeartbeatTime; /** Resource capability */ - private WorkerResource resource; + private final WorkerResource resource; /** Worker connection information */ private WorkerConnectionInfo connectionInfo; @@ -210,20 +210,26 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { throw new IllegalArgumentException("event should be a WorkerStatusEvent type."); } WorkerStatusEvent statusEvent = (WorkerStatusEvent) event; - - // TODO - the synchronization scope using rmContext is too coarsen. - synchronized (worker.rmContext) { - worker.setLastHeartbeatTime(System.currentTimeMillis()); - worker.getResource().setNumRunningTasks(statusEvent.getRunningTaskNum()); - worker.getResource().setMaxHeap(statusEvent.maxHeap()); - worker.getResource().setFreeHeap(statusEvent.getFreeHeap()); - worker.getResource().setTotalHeap(statusEvent.getTotalHeap()); - } + worker.updateStatus(statusEvent); return WorkerState.RUNNING; } } + private void updateStatus(WorkerStatusEvent statusEvent) { + this.writeLock.lock(); + + try { + lastHeartbeatTime = System.currentTimeMillis(); + resource.setNumRunningTasks(statusEvent.getRunningTaskNum()); + resource.setMaxHeap(statusEvent.maxHeap()); + resource.setFreeHeap(statusEvent.getFreeHeap()); + resource.setTotalHeap(statusEvent.getTotalHeap()); + } finally { + this.writeLock.unlock(); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition<Worker, WorkerEvent> { private final WorkerState finalState; http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index 662b699..3d5e062 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -105,5 +105,11 @@ public interface WorkerResourceManager extends Service { * * @return WorkerIds on which QueryMasters are running */ - Collection<Integer> getQueryMasters(); + public Collection<Integer> getQueryMasters(); + + /** + * + * @return RMContext + */ + public TajoRMContext getRMContext(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java index 152d426..99905b9 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java @@ -101,11 +101,16 @@ public class RpcUtils { } } + // non-blocking lock which passes only a ticket before cleared or removed public static class Scrutineer<T> { private final AtomicReference<T> reference = new AtomicReference<T>(); - T check(T ticket) { + public T expire() { + return reference.getAndSet(null); + } + + public T check(T ticket) { T granted = reference.get(); for (;granted == null; granted = reference.get()) { if (reference.compareAndSet(null, ticket)) { @@ -115,7 +120,7 @@ public class RpcUtils { return granted; } - boolean clear(T granted) { + public boolean clear(T granted) { return reference.compareAndSet(granted, null); } }
