Repository: tajo Updated Branches: refs/heads/master 4b2ab6107 -> 25bd5cb44
TAJO-1599: Implement NodeResourceManager and Status updater. (jinho) Closes #577 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/25bd5cb4 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/25bd5cb4 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/25bd5cb4 Branch: refs/heads/master Commit: 25bd5cb44a03ee425b02e2bc2553f7d0f8affff5 Parents: 4b2ab61 Author: Jinho Kim <[email protected]> Authored: Tue May 26 16:46:17 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Tue May 26 16:46:17 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 9 +- tajo-common/src/main/proto/tajo_protos.proto | 6 + .../tajo/master/rm/TajoResourceTracker.java | 9 + .../resource/DefaultResourceCalculator.java | 109 ++++++++ .../org/apache/tajo/resource/NodeResource.java | 188 +++++++++++++ .../org/apache/tajo/resource/NodeResources.java | 195 +++++++++++++ .../tajo/resource/ResourceCalculator.java | 169 ++++++++++++ .../apache/tajo/worker/NodeResourceManager.java | 148 ++++++++++ .../apache/tajo/worker/NodeStatusUpdater.java | 274 +++++++++++++++++++ .../tajo/worker/WorkerHeartbeatService.java | 1 + .../worker/event/NodeResourceAllocateEvent.java | 46 ++++ .../event/NodeResourceDeallocateEvent.java | 40 +++ .../worker/event/NodeResourceManagerEvent.java | 34 +++ .../tajo/worker/event/NodeStatusEvent.java | 40 +++ .../main/proto/ResourceTrackerProtocol.proto | 27 ++ .../src/main/proto/TajoWorkerProtocol.proto | 16 ++ .../org/apache/tajo/resource/TestResources.java | 48 ++++ .../tajo/worker/MockNodeStatusUpdater.java | 105 +++++++ .../tajo/worker/TestNodeResourceManager.java | 235 ++++++++++++++++ .../tajo/worker/TestNodeStatusUpdater.java | 115 ++++++++ .../java/org/apache/tajo/storage/DiskUtil.java | 4 +- 22 files changed, 1816 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0d71d2f..c79a185 100644 --- a/CHANGES +++ b/CHANGES @@ -310,6 +310,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1599: Implement NodeResourceManager and Status updater. (jinho) + TAJO-1613: Rename StorageManager to Tablespace. (hyunsik) TAJO-1359: Add nested field projector and language extension to project http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 59b1f43..e20658b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -171,9 +171,14 @@ public class TajoConf extends Configuration { WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false, Validators.bool()), // Tajo Worker Resources - WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1, Validators.min("1")), + WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", + Runtime.getRuntime().availableProcessors(), Validators.min("1")), WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")), + @Deprecated WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f), + WORKER_RESOURCE_AVAILABLE_DISKS_NUM("tajo.worker.resource.disks.num", 1, Validators.min("1")), + WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2, + Validators.min("1")), WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2), WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()), @@ -186,7 +191,7 @@ public class TajoConf extends Configuration { WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours - WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000), // 120 sec + WORKER_HEARTBEAT_INTERVAL("tajo.worker.heartbeat.interval", 10 * 1000), // 10 sec // Resource Manager RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager", http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-common/src/main/proto/tajo_protos.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto index b6cd9ef..8474f54 100644 --- a/tajo-common/src/main/proto/tajo_protos.proto +++ b/tajo-common/src/main/proto/tajo_protos.proto @@ -62,4 +62,10 @@ message WorkerConnectionInfoProto { optional int32 queryMasterPort = 5; required int32 clientPort = 6; required int32 httpInfoPort = 7; +} + +message NodeResourceProto { + optional int32 memory = 1; + optional int32 virtual_cores = 2; + optional int32 disks = 3; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 4f3b66a..af28886 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -20,10 +20,12 @@ package org.apache.tajo.master.rm; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; @@ -182,6 +184,13 @@ public class TajoResourceTracker extends AbstractService implements TajoResource } } + @Override + public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request, + RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) { + //TODO implement with ResourceManager for scheduler + throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage())); + } + private Worker createWorkerResource(NodeHeartbeat request) { WorkerResource workerResource = new WorkerResource(); http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java new file mode 100644 index 0000000..58b8a26 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java @@ -0,0 +1,109 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tajo.resource; + + +public class DefaultResourceCalculator extends ResourceCalculator { + + @Override + public int compare(NodeResource unused, NodeResource lhs, NodeResource rhs) { + return lhs.compareTo(rhs); + } + + @Override + public int computeAvailableContainers(NodeResource available, NodeResource required) { + return Math.min(Math.min( + available.getMemory() / required.getMemory(), + available.getDisks() / required.getDisks()), + available.getVirtualCores() / required.getVirtualCores()); + } + + @Override + public float divide(NodeResource unused, + NodeResource numerator, NodeResource denominator) { + return ratio(numerator, denominator); + } + + public boolean isInvalidDivisor(NodeResource r) { + if (r.getMemory() == 0.0f) { + return true; + } + return false; + } + + @Override + public float ratio(NodeResource a, NodeResource b) { + return (float)a.getMemory() / b.getMemory(); + } + + @Override + public NodeResource divideAndCeil(NodeResource numerator, int denominator) { + return NodeResources.createResource( + divideAndCeil(numerator.getMemory(), denominator)); + } + + @Override + public NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource, NodeResource stepFactor) { + int normalizedMemory = Math.min( + roundUp( + Math.max(r.getMemory(), minimumResource.getMemory()), + stepFactor.getMemory()), + maximumResource.getMemory()); + return NodeResources.createResource(normalizedMemory); + } + + @Override + public NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource) { + return normalize(r, minimumResource, maximumResource, minimumResource); + } + + @Override + public NodeResource roundUp(NodeResource r, NodeResource stepFactor) { + return NodeResources.createResource( + roundUp(r.getMemory(), stepFactor.getMemory()) + ); + } + + @Override + public NodeResource roundDown(NodeResource r, NodeResource stepFactor) { + return NodeResources.createResource( + roundDown(r.getMemory(), stepFactor.getMemory())); + } + + @Override + public NodeResource multiplyAndNormalizeUp(NodeResource r, double by, + NodeResource stepFactor) { + return NodeResources.createResource( + roundUp((int) (r.getMemory() * by + 0.5), stepFactor.getMemory()) + ); + } + + @Override + public NodeResource multiplyAndNormalizeDown(NodeResource r, double by, + NodeResource stepFactor) { + return NodeResources.createResource( + roundDown( + (int) (r.getMemory() * by), + stepFactor.getMemory() + ) + ); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java new file mode 100644 index 0000000..f51fc07 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.resource; + +import com.google.common.base.Objects; +import io.netty.util.internal.PlatformDependent; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.common.ProtoObject; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * <p><code>NodeResource</code> models a set of computer resources in the + * cluster.</p> + * <p/> + * <p>Currently it models <em>memory</em> and <em>disk</em> and <em>CPU</em>.</p> + * <p/> + * <p>The unit for memory is megabytes. The unit for disks is the number of disk. + * CPU is modeled with virtual cores (vcores), a unit for expressing parallelism. + * A node's capacity should be configured with virtual cores equal to its number of physical cores. + * A task should be requested with the number of cores it can saturate.</p> + * <p/> + */ + +public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>, Comparable<NodeResource> { + + private volatile int memory; + private volatile int disks; + private volatile int vCores; + + private static AtomicIntegerFieldUpdater MEMORY_UPDATER; + private static AtomicIntegerFieldUpdater DISKS_UPDATER; + private static AtomicIntegerFieldUpdater VCORES_UPDATER; + + static { + MEMORY_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "memory"); + if (MEMORY_UPDATER == null) { + MEMORY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "memory"); + DISKS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "disks"); + VCORES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "vCores"); + } else { + DISKS_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "disks"); + VCORES_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "vCores"); + } + } + + public NodeResource(TajoProtos.NodeResourceProto proto) { + setMemory(proto.getMemory()); + setDisks(proto.getDisks()); + setVirtualCores(proto.getVirtualCores()); + } + + private NodeResource() { + + } + + public static NodeResource createResource(int memory, int disks, int vCores) { + return new NodeResource().setMemory(memory).setDisks(disks).setVirtualCores(vCores); + } + + /** + * Get <em>memory</em> of the resource. + * + * @return <em>memory</em> of the resource + */ + public int getMemory() { + return memory; + } + + /** + * Set <em>memory</em> of the resource. + * + * @param memory <em>memory</em> of the resource + */ + @SuppressWarnings("unchecked") + public NodeResource setMemory(int memory) { + MEMORY_UPDATER.lazySet(this, memory); + return this; + } + + + /** + * Get <em>number of disks</em> of the resource. + * + * @return <em>number of disks</em> of the resource + */ + public int getDisks() { + return disks; + } + + /** + * Set <em>number of disks </em> of the resource. + * + * @param disks <em>number of disks</em> of the resource + */ + @SuppressWarnings("unchecked") + public NodeResource setDisks(int disks) { + DISKS_UPDATER.lazySet(this, disks); + return this; + } + + /** + * Get <em>number of virtual cpu cores</em> of the resource. + * Virtual cores are a unit for expressing CPU parallelism. A node's capacity + * should be configured with virtual cores equal to its number of physical cores. + * + * @return <em>num of virtual cpu cores</em> of the resource + */ + public int getVirtualCores() { + return vCores; + } + + + /** + * Set <em>number of virtual cpu cores</em> of the resource. + * + * @param vCores <em>number of virtual cpu cores</em> of the resource + */ + @SuppressWarnings("unchecked") + public NodeResource setVirtualCores(int vCores) { + VCORES_UPDATER.lazySet(this, vCores); + return this; + } + + @Override + public TajoProtos.NodeResourceProto getProto() { + TajoProtos.NodeResourceProto.Builder builder = TajoProtos.NodeResourceProto.newBuilder(); + builder.setMemory(memory) + .setDisks(disks) + .setVirtualCores(vCores); + return builder.build(); + } + + @Override + public int hashCode() { + return Objects.hashCode(getMemory(), getDisks(), getVirtualCores()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof NodeResource)) + return false; + NodeResource other = (NodeResource) obj; + if (getMemory() != other.getMemory() || + getDisks() != other.getDisks() || + getVirtualCores() != other.getVirtualCores()) { + return false; + } + return true; + } + + @Override + public int compareTo(NodeResource other) { + int diff = this.getMemory() - other.getMemory(); + if (diff == 0) { + diff = this.getDisks() - other.getDisks(); + } + if (diff == 0) { + diff = this.getVirtualCores() - other.getVirtualCores(); + } + return diff; + } + + @Override + public String toString() { + return "<memory:" + getMemory() + ", disks:" + getDisks() + ", vCores:" + getVirtualCores() + ">"; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java new file mode 100644 index 0000000..01e9dcf --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java @@ -0,0 +1,195 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tajo.resource; + + +public class NodeResources { + + public static NodeResource createResource(int memory) { + return createResource(memory, 0); + } + + public static NodeResource createResource(int memory, int disks) { + return NodeResource.createResource(memory, disks, (memory > 0) ? 1 : 0); + } + + public static NodeResource createResource(int memory, int disks, int vCores) { + return NodeResource.createResource(memory, disks, vCores); + } + + public static NodeResource clone(NodeResource res) { + return NodeResource.createResource(res.getMemory(), res.getDisks(), res.getVirtualCores()); + } + + public static NodeResource update(NodeResource lhs, NodeResource rhs) { + return lhs.setMemory(rhs.getMemory()).setDisks(rhs.getDisks()).setVirtualCores(rhs.getVirtualCores()); + } + + public static NodeResource addTo(NodeResource lhs, NodeResource rhs) { + lhs.setMemory(lhs.getMemory() + rhs.getMemory()) + .setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores()) + .setDisks(lhs.getDisks() + rhs.getDisks()); + return lhs; + } + + public static NodeResource add(NodeResource lhs, NodeResource rhs) { + return addTo(clone(lhs), rhs); + } + + public static NodeResource subtractFrom(NodeResource lhs, NodeResource rhs) { + lhs.setMemory(lhs.getMemory() - rhs.getMemory()) + .setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores()) + .setDisks(lhs.getDisks() - rhs.getDisks()); + return lhs; + } + + public static NodeResource subtract(NodeResource lhs, NodeResource rhs) { + return subtractFrom(clone(lhs), rhs); + } + + public static NodeResource multiplyTo(NodeResource lhs, double by) { + lhs.setMemory((int) (lhs.getMemory() * by)) + .setVirtualCores((int) (lhs.getVirtualCores() * by)) + .setDisks((int) (lhs.getDisks() * by)); + return lhs; + } + + public static NodeResource multiply(NodeResource lhs, double by) { + return multiplyTo(clone(lhs), by); + } + + public static NodeResource multiplyAndNormalizeUp( + ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) { + return calculator.multiplyAndNormalizeUp(lhs, by, factor); + } + + public static NodeResource multiplyAndNormalizeDown( + ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) { + return calculator.multiplyAndNormalizeDown(lhs, by, factor); + } + + public static NodeResource multiplyAndRoundDown(NodeResource lhs, double by) { + NodeResource out = clone(lhs); + out.setMemory((int)(lhs.getMemory() * by)); + out.setDisks((int)(lhs.getDisks() * by)); + out.setVirtualCores((int)(lhs.getVirtualCores() * by)); + return out; + } + + public static NodeResource normalize( + ResourceCalculator calculator, NodeResource lhs, NodeResource min, + NodeResource max, NodeResource increment) { + return calculator.normalize(lhs, min, max, increment); + } + + public static NodeResource roundUp( + ResourceCalculator calculator, NodeResource lhs, NodeResource factor) { + return calculator.roundUp(lhs, factor); + } + + public static NodeResource roundDown( + ResourceCalculator calculator, NodeResource lhs, NodeResource factor) { + return calculator.roundDown(lhs, factor); + } + + public static boolean isInvalidDivisor( + ResourceCalculator resourceCalculator, NodeResource divisor) { + return resourceCalculator.isInvalidDivisor(divisor); + } + + public static float ratio( + ResourceCalculator resourceCalculator, NodeResource lhs, NodeResource rhs) { + return resourceCalculator.ratio(lhs, rhs); + } + + public static float divide( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, NodeResource lhs, NodeResource rhs) { + return resourceCalculator.divide(clusterResource, lhs, rhs); + } + + public static NodeResource divideAndCeil( + ResourceCalculator resourceCalculator, NodeResource lhs, int rhs) { + return resourceCalculator.divideAndCeil(lhs, rhs); + } + + public static boolean equals(NodeResource lhs, NodeResource rhs) { + return lhs.equals(rhs); + } + + public static boolean lessThan( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return (resourceCalculator.compare(clusterResource, lhs, rhs) < 0); + } + + public static boolean lessThanOrEqual( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return (resourceCalculator.compare(clusterResource, lhs, rhs) <= 0); + } + + public static boolean greaterThan( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) > 0; + } + + public static boolean greaterThanOrEqual( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0; + } + + public static NodeResource min( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) <= 0 ? lhs : rhs; + } + + public static NodeResource max( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs; + } + + public static boolean fitsIn(NodeResource smaller, NodeResource bigger) { + return smaller.getMemory() <= bigger.getMemory() && + smaller.getDisks() <= bigger.getDisks() && + smaller.getVirtualCores() <= bigger.getVirtualCores(); + } + + public static NodeResource componentwiseMin(NodeResource lhs, NodeResource rhs) { + return createResource(Math.min(lhs.getMemory(), rhs.getMemory()), + Math.min(lhs.getDisks(), rhs.getDisks()), + Math.min(lhs.getVirtualCores(), rhs.getVirtualCores())); + } + + public static NodeResource componentwiseMax(NodeResource lhs, NodeResource rhs) { + return createResource(Math.max(lhs.getMemory(), rhs.getMemory()), + Math.max(lhs.getDisks(), rhs.getDisks()), + Math.max(lhs.getVirtualCores(), rhs.getVirtualCores())); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java new file mode 100644 index 0000000..b08228f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java @@ -0,0 +1,169 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tajo.resource; + + +/** + * A set of {@link NodeResource} comparison and manipulation interfaces. + */ + +public abstract class ResourceCalculator { + + public abstract int + compare(NodeResource clusterResource, NodeResource lhs, NodeResource rhs); + + public static int divideAndCeil(int a, int b) { + if (b == 0) { + return 0; + } + return (a + (b - 1)) / b; + } + + public static int roundUp(int a, int b) { + return divideAndCeil(a, b) * b; + } + + public static int roundDown(int a, int b) { + return (a / b) * b; + } + + /** + * Compute the number of containers which can be allocated given + * <code>available</code> and <code>required</code> resources. + * + * @param available available resources + * @param required required resources + * @return number of containers which can be allocated + */ + public abstract int computeAvailableContainers( + NodeResource available, NodeResource required); + /** + * Multiply resource <code>r</code> by factor <code>by</code> + * and normalize up using step-factor <code>stepFactor</code>. + * + * @param r resource to be multiplied + * @param by multiplier + * @param stepFactor factor by which to normalize up + * @return resulting normalized resource + */ + public abstract NodeResource multiplyAndNormalizeUp( + NodeResource r, double by, NodeResource stepFactor); + + /** + * Multiply resource <code>r</code> by factor <code>by</code> + * and normalize down using step-factor <code>stepFactor</code>. + * + * @param r resource to be multiplied + * @param by multiplier + * @param stepFactor factor by which to normalize down + * @return resulting normalized resource + */ + public abstract NodeResource multiplyAndNormalizeDown( + NodeResource r, double by, NodeResource stepFactor); + + /** + * Normalize resource <code>r</code> given the base + * <code>minimumResource</code> and verify against max allowed + * <code>maximumResource</code> + * + * @param r resource + * @param minimumResource step-factor + * @param maximumResource the upper bound of the resource to be allocated + * @return normalized resource + */ + public NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource) { + return normalize(r, minimumResource, maximumResource, minimumResource); + } + + /** + * Normalize resource <code>r</code> given the base + * <code>minimumResource</code> and verify against max allowed + * <code>maximumResource</code> using a step factor for hte normalization. + * + * @param r resource + * @param minimumResource minimum value + * @param maximumResource the upper bound of the resource to be allocated + * @param stepFactor the increment for resources to be allocated + * @return normalized resource + */ + public abstract NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource, + NodeResource stepFactor); + + + /** + * Round-up resource <code>r</code> given factor <code>stepFactor</code>. + * + * @param r resource + * @param stepFactor step-factor + * @return rounded resource + */ + public abstract NodeResource roundUp(NodeResource r, NodeResource stepFactor); + + /** + * Round-down resource <code>r</code> given factor <code>stepFactor</code>. + * + * @param r resource + * @param stepFactor step-factor + * @return rounded resource + */ + public abstract NodeResource roundDown(NodeResource r, NodeResource stepFactor); + + /** + * Divide resource <code>numerator</code> by resource <code>denominator</code> + * using specified policy (domination, average, fairness etc.); hence overall + * <code>clusterResource</code> is provided for context. + * + * @param clusterResource cluster resources + * @param numerator numerator + * @param denominator denominator + * @return <code>numerator</code>/<code>denominator</code> + * using specific policy + */ + public abstract float divide( + NodeResource clusterResource, NodeResource numerator, NodeResource denominator); + + /** + * Determine if a resource is not suitable for use as a divisor + * (will result in divide by 0, etc) + * + * @param r resource + * @return true if divisor is invalid (should not be used), false else + */ + public abstract boolean isInvalidDivisor(NodeResource r); + + /** + * Ratio of resource <code>a</code> to resource <code>b</code>. + * + * @param a resource + * @param b resource + * @return ratio of resource <code>a</code> to resource <code>b</code> + */ + public abstract float ratio(NodeResource a, NodeResource b); + + /** + * Divide-and-ceil <code>numerator</code> by <code>denominator</code>. + * + * @param numerator numerator resource + * @param denominator denominator + * @return resultant resource + */ + public abstract NodeResource divideAndCeil(NodeResource numerator, int denominator); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java new file mode 100644 index 0000000..20eec6b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java @@ -0,0 +1,148 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tajo.worker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.storage.DiskUtil; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.apache.tajo.worker.event.NodeResourceManagerEvent; +import org.apache.tajo.worker.event.NodeStatusEvent; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; + +public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceManagerEvent> { + private static final Log LOG = LogFactory.getLog(NodeResourceManager.class); + + private final Dispatcher dispatcher; + private NodeResource totalResource; + private NodeResource availableResource; + private AtomicInteger allocatedSize; + private TajoConf tajoConf; + + public NodeResourceManager(Dispatcher dispatcher){ + super(NodeResourceManager.class.getName()); + this.dispatcher = dispatcher; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + this.tajoConf = (TajoConf)conf; + this.totalResource = createWorkerResource(tajoConf); + this.availableResource = NodeResources.clone(totalResource); + this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this); + this.allocatedSize = new AtomicInteger(); + super.serviceInit(conf); + LOG.info("Initialized NodeResourceManager for " + totalResource); + } + + @Override + public void handle(NodeResourceManagerEvent event) { + + if (event instanceof NodeResourceAllocateEvent) { + NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event; + BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder(); + for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { + NodeResource resource = new NodeResource(request.getResource()); + if (allocate(resource)) { + allocatedSize.incrementAndGet(); + //TODO send task event to taskExecutor + } else { + response.addCancellationTask(request); + } + } + allocateEvent.getCallback().run(response.build()); + + } else if (event instanceof NodeResourceDeallocateEvent) { + allocatedSize.decrementAndGet(); + NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; + release(deallocateEvent.getResource()); + + // send current resource to ResourceTracker + getDispatcher().getEventHandler().handle( + new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource())); + } + } + + protected Dispatcher getDispatcher() { + return dispatcher; + } + + protected NodeResource getTotalResource() { + return totalResource; + } + + protected NodeResource getAvailableResource() { + return availableResource; + } + + public int getAllocatedSize() { + return allocatedSize.get(); + } + + private boolean allocate(NodeResource resource) { + //TODO consider the jvm free memory + if (NodeResources.fitsIn(resource, availableResource)) { + NodeResources.subtractFrom(availableResource, resource); + return true; + } + return false; + } + + private void release(NodeResource resource) { + NodeResources.addTo(availableResource, resource); + } + + private NodeResource createWorkerResource(TajoConf conf) { + int memoryMb; + + if (conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); + } else { + memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), + conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); + } + + int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM); + + int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); + if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { + disks = dataNodeStorageSize; + } + + int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM); + return NodeResource.createResource(memoryMb, disks * diskParallels, vCores); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java new file mode 100644 index 0000000..84ac419 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.rpc.*; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.worker.event.NodeStatusEvent; + +import java.net.ConnectException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; + +/** + * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. + */ +public class NodeStatusUpdater extends AbstractService implements EventHandler<NodeStatusEvent> { + + private final static Log LOG = LogFactory.getLog(NodeStatusUpdater.class); + + private TajoConf tajoConf; + private StatusUpdaterThread updaterThread; + private volatile boolean isStopped; + private volatile long heartBeatInterval; + private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue; + private final WorkerConnectionInfo connectionInfo; + private final NodeResourceManager nodeResourceManager; + private AsyncRpcClient rmClient; + private ServiceTracker serviceTracker; + private TajoResourceTrackerProtocolService.Interface resourceTracker; + private int queueingLimit; + + public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) { + super(NodeStatusUpdater.class.getSimpleName()); + this.connectionInfo = connectionInfo; + this.nodeResourceManager = resourceManager; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + this.tajoConf = (TajoConf) conf; + this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue(); + this.serviceTracker = ServiceTrackerFactory.get(tajoConf); + this.nodeResourceManager.getDispatcher().register(NodeStatusEvent.EventType.class, this); + this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL); + this.updaterThread = new StatusUpdaterThread(); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + // if resource changed over than 50%, send reports + this.queueingLimit = nodeResourceManager.getTotalResource().getVirtualCores() / 2; + + updaterThread.start(); + super.serviceStart(); + LOG.info("NodeStatusUpdater started."); + } + + @Override + public void serviceStop() throws Exception { + this.isStopped = true; + + synchronized (updaterThread) { + updaterThread.notifyAll(); + } + super.serviceStop(); + LOG.info("NodeStatusUpdater stopped."); + } + + @Override + public void handle(NodeStatusEvent event) { + switch (event.getType()) { + case REPORT_RESOURCE: + heartBeatRequestQueue.add(event); //batch report to ResourceTracker + break; + case FLUSH_REPORTS: + heartBeatRequestQueue.add(event); //flush report to ResourceTracker + break; + } + } + + public int getQueueSize() { + return heartBeatRequestQueue.size(); + } + + public int getQueueingLimit() { + return queueingLimit; + } + + private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) { + NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); + requestProto.setAvailableResource(resource.getProto()); + requestProto.setWorkerId(connectionInfo.getId()); + return requestProto.build(); + } + + private NodeHeartbeatRequestProto createHeartBeatReport() { + NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); + requestProto.setWorkerId(connectionInfo.getId()); + return requestProto.build(); + } + + private NodeHeartbeatRequestProto createNodeStatusReport() { + NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); + requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto()); + requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto()); + requestProto.setWorkerId(connectionInfo.getId()); + requestProto.setConnectionInfo(connectionInfo.getProto()); + + //TODO set node status to requestProto.setStatus() + return requestProto.build(); + } + + protected TajoResourceTrackerProtocolService.Interface newStub() + throws NoSuchMethodException, ConnectException, ClassNotFoundException { + RpcClientManager.cleanup(rmClient); + + RpcClientManager rpcManager = RpcClientManager.getInstance(); + rmClient = rpcManager.newClient(serviceTracker.getResourceTrackerAddress(), + TajoResourceTrackerProtocol.class, true, rpcManager.getRetries(), + rpcManager.getTimeoutSeconds(), TimeUnit.SECONDS, false); + return rmClient.getStub(); + } + + protected NodeHeartbeatResponseProto sendHeartbeat(NodeHeartbeatRequestProto requestProto) + throws NoSuchMethodException, ClassNotFoundException, ConnectException, ExecutionException { + if (resourceTracker == null) { + resourceTracker = newStub(); + } + + NodeHeartbeatResponseProto response = null; + try { + CallFuture<NodeHeartbeatResponseProto> callBack = new CallFuture<NodeHeartbeatResponseProto>(); + + resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack); + response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + } catch (TimeoutException te) { + LOG.warn("Heartbeat response is being delayed.", te); + } catch (ExecutionException ee) { + LOG.warn("TajoMaster failure: " + ee.getMessage()); + resourceTracker = null; + throw ee; + } + return response; + } + + class StatusUpdaterThread extends Thread { + + public StatusUpdaterThread() { + super("NodeStatusUpdater"); + } + + private int drain(Collection<NodeStatusEvent> buffer, int numElements, + long timeout, TimeUnit unit) throws InterruptedException { + + long deadline = System.nanoTime() + unit.toNanos(timeout); + int added = 0; + while (added < numElements) { + added += heartBeatRequestQueue.drainTo(buffer, numElements - added); + if (added < numElements) { // not enough elements immediately available; will have to wait + NodeStatusEvent e = heartBeatRequestQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); + if (e == null) { + break; // we already waited enough, and there are no more elements in sight + } + buffer.add(e); + added++; + + if (e.getType() == NodeStatusEvent.EventType.FLUSH_REPORTS) { + break; + } + } + } + return added; + } + + /* Node sends a heartbeats with its resource and status periodically to master. */ + @Override + public void run() { + NodeHeartbeatResponseProto lastResponse = null; + while (!isStopped && !Thread.interrupted()) { + + try { + if (lastResponse != null) { + if (lastResponse.getCommand() == ResponseCommand.NORMAL) { + List<NodeStatusEvent> events = Lists.newArrayList(); + try { + /* batch update to ResourceTracker */ + drain(events, Math.max(queueingLimit, 1), heartBeatInterval, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + break; + } + + if (!events.isEmpty()) { + // send last available resource; + lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource())); + } else { + // send ping; + lastResponse = sendHeartbeat(createHeartBeatReport()); + } + + } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) { + // Membership changed + lastResponse = sendHeartbeat(createNodeStatusReport()); + } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) { + //TODO abort failure queries + } + } else { + // Node registration on startup + lastResponse = sendHeartbeat(createNodeStatusReport()); + } + } catch (NoSuchMethodException nsme) { + LOG.fatal(nsme.getMessage(), nsme); + Runtime.getRuntime().halt(1); + } catch (ClassNotFoundException cnfe) { + LOG.fatal(cnfe.getMessage(), cnfe); + Runtime.getRuntime().halt(1); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + if (!isStopped) { + synchronized (updaterThread) { + try { + updaterThread.wait(heartBeatInterval); + } catch (InterruptedException ie) { + // Do Nothing + } + } + } + } + } + + LOG.info("Heartbeat Thread stopped."); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index bd70d59..050e2b5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -47,6 +47,7 @@ import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat; /** * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. */ +@Deprecated public class WorkerHeartbeatService extends AbstractService { /** class logger */ private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java new file mode 100644 index 0000000..2f411e8 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + + +import com.google.protobuf.RpcCallback; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto; +import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto; + +public class NodeResourceAllocateEvent extends NodeResourceManagerEvent { + + private BatchAllocationRequestProto request; + private RpcCallback<BatchAllocationResponseProto> callback; + + public NodeResourceAllocateEvent(BatchAllocationRequestProto request, + RpcCallback<BatchAllocationResponseProto> callback) { + super(EventType.ALLOCATE); + this.callback = callback; + this.request = request; + } + + public BatchAllocationRequestProto getRequest() { + return request; + } + + public RpcCallback<BatchAllocationResponseProto> getCallback() { + return callback; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java new file mode 100644 index 0000000..a298d77 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.TajoProtos; +import org.apache.tajo.resource.NodeResource; + +public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { + + private NodeResource resource; + + public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) { + this(new NodeResource(proto)); + } + + public NodeResourceDeallocateEvent(NodeResource resource) { + super(EventType.DEALLOCATE); + this.resource = resource; + } + + public NodeResource getResource() { + return resource; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java new file mode 100644 index 0000000..bcb3448 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.resource.NodeResource; + +public class NodeResourceManagerEvent extends AbstractEvent<NodeResourceManagerEvent.EventType> { + public enum EventType { + ALLOCATE, + DEALLOCATE + } + + public NodeResourceManagerEvent(EventType eventType) { + super(eventType); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java new file mode 100644 index 0000000..58ab74a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.resource.NodeResource; + +public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> { + private final NodeResource resource; + + public enum EventType { + REPORT_RESOURCE, + FLUSH_REPORTS + } + + public NodeStatusEvent(EventType eventType, NodeResource resource) { + super(eventType); + this.resource = resource; + } + + public NodeResource getResource() { + return resource; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/proto/ResourceTrackerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index 40aeab7..dffd8c9 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -25,15 +25,42 @@ option java_generate_equals_and_hash = true; import "QueryCoordinatorProtocol.proto"; import "ContainerProtocol.proto"; import "tajo_protos.proto"; +import "TajoIdProtos.proto"; package hadoop.yarn; +// deprecated message NodeHeartbeat { required WorkerConnectionInfoProto connectionInfo = 1; optional ServerStatusProto serverStatus = 2; optional string statusMessage = 3; } +message NodeHeartbeatRequestProto { + required int32 workerId = 1; + optional NodeResourceProto totalResource = 2; + optional NodeResourceProto availableResource = 3; + optional WorkerConnectionInfoProto connectionInfo = 4; + optional NodeStatusProto status = 5; +} + +message NodeHeartbeatResponseProto { + required ResponseCommand command = 1 [default = NORMAL]; + repeated QueryIdProto queryId = 2; +} + +enum ResponseCommand { + NORMAL = 1; //ping + MEMBERSHIP = 2; // request membership to worker node + ABORT_QUERY = 3; //query master failure + SHUTDOWN = 4; // black list +} + +//TODO add node health information +message NodeStatusProto { +} + service TajoResourceTrackerProtocolService { rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse); + rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index bf9bbde..2324596 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -116,6 +116,7 @@ message ExecutionBlockReport { repeated IntermediateEntryProto intermediateEntries = 5; } +// deprecated message TaskResponseProto { required string id = 1; required QueryState status = 2; @@ -161,6 +162,7 @@ message QueryExecutionRequestProto { optional StringProto logicalPlanJson = 6; } +// deprecated message GetTaskRequestProto { required int32 workerId = 1; required TajoContainerIdProto containerId = 2; @@ -198,6 +200,20 @@ message ExecutionBlockListProto { repeated ExecutionBlockIdProto executionBlockId = 1; } +message TaskAllocationRequestProto { + required TaskRequestProto taskRequest = 1; + required NodeResourceProto resource = 2; +} + +message BatchAllocationRequestProto { + required ExecutionBlockIdProto executionBlockId = 1; + repeated TaskAllocationRequestProto taskRequest = 2; +} + +message BatchAllocationResponseProto { + repeated TaskAllocationRequestProto cancellationTask = 2; +} + service TajoWorkerProtocolService { rpc ping (TaskAttemptIdProto) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java new file mode 100644 index 0000000..eb0d732 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.resource; + +import org.junit.Test; + +import static org.apache.tajo.resource.NodeResources.componentwiseMin; +import static org.apache.tajo.resource.NodeResources.createResource; +import static org.apache.tajo.resource.NodeResources.fitsIn; +import static org.junit.Assert.*; + +public class TestResources { + @Test + public void testFitsIn() { + assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1))); + assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1))); + assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1))); + assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1))); + assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1))); + assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1))); + } + + @Test + public void testComponentwiseMin() { + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 1), createResource(2, 2))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(2, 2), createResource(1, 1))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 2), createResource(2, 1))); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java new file mode 100644 index 0000000..2d7d0be --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Maps; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; + +import java.net.ConnectException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; + +public class MockNodeStatusUpdater extends NodeStatusUpdater { + + private CountDownLatch barrier; + private Map<Integer, NodeResource> membership = Maps.newHashMap(); + private Map<Integer, NodeResource> resources = Maps.newHashMap(); + private MockResourceTracker resourceTracker; + + public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo, + NodeResourceManager resourceManager) { + super(connectionInfo, resourceManager); + this.barrier = barrier; + this.resourceTracker = new MockResourceTracker(); + } + + @Override + protected TajoResourceTrackerProtocolService.Interface newStub() + throws NoSuchMethodException, ConnectException, ClassNotFoundException { + + return resourceTracker; + } + + protected MockResourceTracker getResourceTracker() { + return resourceTracker; + } + + class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface { + private NodeHeartbeatRequestProto lastRequest; + + protected Map<Integer, NodeResource> getTotalResource() { + return membership; + } + + protected Map<Integer, NodeResource> getAvailableResource() { + return membership; + } + + protected NodeHeartbeatRequestProto getLastRequest() { + return lastRequest; + } + + @Override + public void heartbeat(RpcController controller, NodeHeartbeat request, + RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) { + + } + + @Override + public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequestProto request, + RpcCallback<NodeHeartbeatResponseProto> done) { + + NodeHeartbeatResponseProto.Builder response = NodeHeartbeatResponseProto.newBuilder(); + if (membership.containsKey(request.getWorkerId())) { + if (request.hasAvailableResource()) { + NodeResource resource = resources.get(request.getWorkerId()); + NodeResources.update(resource, new NodeResource(request.getAvailableResource())); + } + done.run(response.setCommand(ResponseCommand.NORMAL).build()); + } else { + if (request.hasConnectionInfo()) { + membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource())); + resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource())); + done.run(response.setCommand(ResponseCommand.NORMAL).build()); + } else { + done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build()); + } + } + lastRequest = request; + barrier.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java new file mode 100644 index 0000000..7407acc --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.junit.*; + +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +public class TestNodeResourceManager { + + private NodeResourceManager resourceManager; + private MockNodeStatusUpdater statusUpdater; + private AsyncDispatcher dispatcher; + private int taskMemory; + private TajoConf conf; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + + taskMemory = 512; + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, + taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES)); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1); + + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + + resourceManager = new NodeResourceManager(dispatcher); + resourceManager.init(conf); + resourceManager.start(); + + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager); + statusUpdater.init(conf); + statusUpdater.start(); + } + + @After + public void tearDown() { + resourceManager.stop(); + statusUpdater.stop(); + dispatcher.stop(); + } + + @Test + public void testNodeResourceAllocateEvent() throws Exception { + int requestSize = 4; + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + BatchAllocationResponseProto responseProto = callFuture.get(); + assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + assertEquals(0, responseProto.getCancellationTaskCount()); + assertEquals(requestSize, resourceManager.getAllocatedSize()); + } + + + @Test + public void testNodeResourceCancellation() throws Exception { + int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + int overSize = 10; + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + BatchAllocationResponseProto responseProto = callFuture.get(); + + assertEquals(overSize, responseProto.getCancellationTaskCount()); + assertEquals(requestSize, resourceManager.getAllocatedSize()); + } + + @Test + public void testNodeResourceDeallocateEvent() throws Exception { + int requestSize = 4; + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + BatchAllocationResponseProto responseProto = callFuture.get(); + assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + assertEquals(0, responseProto.getCancellationTaskCount()); + assertEquals(requestSize, resourceManager.getAllocatedSize()); + + //deallocate + for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) { + // direct invoke handler for testing + resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource())); + } + assertEquals(0, resourceManager.getAllocatedSize()); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + } + + @Test(timeout = 30000) + public void testParallelRequest() throws Exception { + final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2; + final int taskSize = 100000; + final AtomicInteger totalComplete = new AtomicInteger(); + final AtomicInteger totalCanceled = new AtomicInteger(); + + final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + final Queue<TaskAllocationRequestProto> totalTasks = createTaskRequests(taskMemory, taskSize); + + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + List<Future> futureList = Lists.newArrayList(); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < parallelCount; i++) { + futureList.add(executor.submit(new Runnable() { + @Override + public void run() { + int complete = 0; + while (true) { + TaskAllocationRequestProto task = totalTasks.poll(); + if (task == null) break; + + + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + requestProto.addTaskRequest(task); + requestProto.setExecutionBlockId(ebId.getProto()); + + CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + try { + BatchAllocationResponseProto proto = callFuture.get(); + if (proto.getCancellationTaskCount() > 0) { + totalTasks.addAll(proto.getCancellationTaskList()); + totalCanceled.addAndGet(proto.getCancellationTaskCount()); + } else { + complete++; + dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource())); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + System.out.println(Thread.currentThread().getName() + " complete requests: " + complete); + totalComplete.addAndGet(complete); + } + }) + ); + } + + for (Future future : futureList) { + future.get(); + } + + System.out.println(parallelCount + " Thread, completed requests: " + totalComplete.get() + ", canceled requests:" + + totalCanceled.get() + ", " + +(System.currentTimeMillis() - startTime) + " ms elapsed"); + executor.shutdown(); + assertEquals(taskSize, totalComplete.get()); + } + + protected static Queue<TaskAllocationRequestProto> createTaskRequests(int memory, int size) { + Queue<TaskAllocationRequestProto> requestProtoList = new LinkedBlockingQueue<TaskAllocationRequestProto>(); + for (int i = 0; i < size; i++) { + + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0); + + TajoWorkerProtocol.TaskRequestProto.Builder builder = + TajoWorkerProtocol.TaskRequestProto.newBuilder(); + builder.setId(taskAttemptId.getProto()); + builder.setShouldDie(true); + builder.setOutputTable(""); + builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); + builder.setClusteredOutput(false); + + + requestProtoList.add(TaskAllocationRequestProto.newBuilder() + .setResource(NodeResources.createResource(memory).getProto()) + .setTaskRequest(builder.build()).build()); + } + return requestProtoList; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java new file mode 100644 index 0000000..fb3c77e --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeStatusEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import static org.junit.Assert.*; + +public class TestNodeStatusUpdater { + + private NodeResourceManager resourceManager; + private MockNodeStatusUpdater statusUpdater; + private AsyncDispatcher dispatcher; + private TajoConf conf; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + + conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000); + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + + resourceManager = new NodeResourceManager(dispatcher); + resourceManager.init(conf); + resourceManager.start(); + } + + @After + public void tearDown() { + resourceManager.stop(); + if (statusUpdater != null) statusUpdater.stop(); + dispatcher.stop(); + } + + @Test(timeout = 20000) + public void testNodeMembership() throws Exception { + CountDownLatch barrier = new CountDownLatch(1); + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); + statusUpdater.init(conf); + statusUpdater.start(); + + MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + barrier.await(); + + assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId())); + assertEquals(resourceManager.getTotalResource(), + resourceTracker.getTotalResource().get(worker.getId())); + + assertEquals(resourceManager.getAvailableResource(), + resourceTracker.getAvailableResource().get(worker.getId())); + } + + @Test(timeout = 20000) + public void testPing() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); + statusUpdater.init(conf); + statusUpdater.start(); + + MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + barrier.await(); + + TajoResourceTrackerProtocol.NodeHeartbeatRequestProto lastRequest = resourceTracker.getLastRequest(); + assertTrue(lastRequest.hasWorkerId()); + assertFalse(lastRequest.hasAvailableResource()); + assertFalse(lastRequest.hasTotalResource()); + assertFalse(lastRequest.hasConnectionInfo()); + } + + @Test(timeout = 20000) + public void testResourceReport() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); + statusUpdater.init(conf); + statusUpdater.start(); + + for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) { + dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, + resourceManager.getAvailableResource())); + } + barrier.await(); + assertEquals(0, statusUpdater.getQueueSize()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java index 0bcd5ec..19e08e8 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java @@ -189,10 +189,10 @@ public class DiskUtil { } public static int getDataNodeStorageSize(){ - return getStorageDirs().size(); + return getDataNodeStorageDirs().size(); } - public static List<URI> getStorageDirs(){ + public static List<URI> getDataNodeStorageDirs(){ Configuration conf = new HdfsConfiguration(); Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); return Util.stringCollectionAsURIs(dirNames);
