Updated Branches: refs/heads/master 6628017a5 -> fc4743686
TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik) Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fc474368 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fc474368 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fc474368 Branch: refs/heads/master Commit: fc4743686553a5bccfb0195c93d9e1078f4f22ff Parents: 6628017 Author: hyunsik <[email protected]> Authored: Wed May 8 16:40:48 2013 +0900 Committer: hyunsik <[email protected]> Committed: Wed May 8 16:40:48 2013 +0900 ---------------------------------------------------------------------- CHANGES.txt | 4 ++- .../src/main/java/tajo/master/QueryMaster.java | 7 ++++- .../src/main/java/tajo/master/SubQuery.java | 11 +++----- .../java/tajo/master/rm/RMContainerAllocator.java | 19 ++++++++++++++- 4 files changed, 31 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1a6a6db..cba4f8f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,7 +9,7 @@ Release 0.2.0 - unreleased IMPROVEMENTS TAJO-37: Remove obsolete classes WorkerEventDispatcher, WorkerEvent and - WorkerEventType. (sunny.1324 via hyunsik) + WorkerEventType. (sunny.1324 via hyunsik) TAJO-50: Cleanup SubQuery. (hyunsik) @@ -37,6 +37,8 @@ Release 0.2.0 - unreleased BUG FIXES + TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik) + TAJO-47: RowFile has the duplicated initialization problem and unflipped ByteBuffer problem. (jihoon) http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java index 8891861..3f3db05 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java @@ -227,6 +227,7 @@ public class QueryMaster extends CompositeService implements EventHandler { public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>(); int minCapability; int maxCapability; + int numCluster; public QueryContext(QueryConf conf) { this.conf = conf; @@ -297,7 +298,11 @@ public class QueryMaster extends CompositeService implements EventHandler { } public int getNumClusterNode() { - return rmAllocator.getClusterNodeCount(); + return numCluster; + } + + public void setNumClusterNodes(int num) { + numCluster = num; } public CatalogService getCatalog() { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java index 398e1ae..986ecaf 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.state.*; import org.apache.hadoop.yarn.util.Records; import tajo.QueryIdFactory; @@ -614,13 +613,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { ExecutionBlock execBlock = subQuery.getBlock(); QueryUnit [] tasks = subQuery.getQueryUnits(); - int numRequest = Math.min(tasks.length, - subQuery.context.getNumClusterNode() * 4); + int numClusterNodes = subQuery.getContext().getNumClusterNode(); + int numRequest = Math.min(tasks.length, numClusterNodes * 4); - final Resource resource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - Resource.class); - if (tasks.length <= subQuery.context.getNumClusterNode()) { + final Resource resource = Records.newRecord(Resource.class); + if (tasks.length <= numClusterNodes) { resource.setMemory(subQuery.context.getMaxContainerCapability()); } else { resource.setMemory(2000); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java index 0a1ad42..9d5ae6c 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.AMRMClientImpl; @@ -62,6 +63,7 @@ public class RMContainerAllocator extends AMRMClientImpl super.init(conf); } + private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second public void start() { super.start(); @@ -70,6 +72,19 @@ public class RMContainerAllocator extends AMRMClientImpl response = registerApplicationMaster("locahost", 10080, "http://localhost:1234"); context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory()); context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory()); + + // If the number of cluster nodes is ZERO, it waits for available nodes. + AllocateResponse allocateResponse = allocate(0.0f); + while(allocateResponse.getNumClusterNodes() < 1) { + try { + Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES); + LOG.info("Waiting for Available Cluster Nodes"); + allocateResponse = allocate(0); + } catch (InterruptedException e) { + LOG.error(e); + } + } + context.setNumClusterNodes(allocateResponse.getNumClusterNodes()); } catch (YarnRemoteException e) { LOG.error(e); } @@ -135,9 +150,11 @@ public class RMContainerAllocator extends AMRMClientImpl new HashMap<Priority, SubQueryId>(); public void heartbeat() throws Exception { - AMResponse response = allocate(context.getProgress()).getAMResponse(); + AllocateResponse allocateResponse = allocate(context.getProgress()); + AMResponse response = allocateResponse.getAMResponse(); List<Container> allocatedContainers = response.getAllocatedContainers(); + LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes()); LOG.info("Available Resource: " + response.getAvailableResources()); LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size()); if (response.getAllocatedContainers().size() > 0) {
