[
https://issues.apache.org/jira/browse/TAJO-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14631325#comment-14631325
]
ASF GitHub Bot commented on TAJO-1397:
--------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/608#discussion_r34887805
--- Diff: tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.util.TUtil;
+
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+/**
+ * It contains resource and various information for a node.
+ */
+public class NodeStatus implements EventHandler<NodeEvent>,
Comparable<NodeStatus> {
+ /** class logger */
+ private static final Log LOG = LogFactory.getLog(NodeStatus.class);
+
+ /** context of {@link TajoResourceManager} */
+ private final TajoRMContext rmContext;
+
+ /** last heartbeat time */
+ private volatile long lastHeartbeatTime;
+
+ private volatile int numRunningTasks;
+
+ private volatile int numRunningQueryMaster;
+
+ private static AtomicLongFieldUpdater HEARTBEAT_TIME_UPDATER;
+ private static AtomicIntegerFieldUpdater RUNNING_TASK_UPDATER;
+ private static AtomicIntegerFieldUpdater RUNNING_QM_UPDATER;
+
+ static {
+ HEARTBEAT_TIME_UPDATER =
PlatformDependent.newAtomicLongFieldUpdater(NodeStatus.class,
"lastHeartbeatTime");
+ if (HEARTBEAT_TIME_UPDATER == null) {
+ HEARTBEAT_TIME_UPDATER =
AtomicLongFieldUpdater.newUpdater(NodeStatus.class, "lastHeartbeatTime");
+ RUNNING_TASK_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(NodeStatus.class, "numRunningTasks");
+ RUNNING_QM_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(NodeStatus.class, "numRunningQueryMaster");
+ } else {
+ RUNNING_TASK_UPDATER =
PlatformDependent.newAtomicIntegerFieldUpdater(NodeStatus.class,
"numRunningTasks");
+ RUNNING_QM_UPDATER =
PlatformDependent.newAtomicIntegerFieldUpdater(NodeStatus.class,
"numRunningQueryMaster");
+ }
+ }
+
+ /** Available resources on the node. */
+ private final NodeResource availableResource;
+
+ /** Total resources on the node. */
+ private final NodeResource totalResourceCapability;
+
+ /** Node connection information */
+ private WorkerConnectionInfo connectionInfo;
+
+ private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION =
new ReconnectNodeTransition();
+ private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION =
new StatusUpdateTransition();
+
+ private static final StateMachineFactory<NodeStatus,
+ NodeState,
+ NodeEventType,
+ NodeEvent> stateMachineFactory
+ = new StateMachineFactory<NodeStatus,
+ NodeState,
+ NodeEventType,
+ NodeEvent>(NodeState.NEW)
+
+ // Transition from NEW
+ .addTransition(NodeState.NEW, NodeState.RUNNING,
+ NodeEventType.STARTED,
+ new AddNodeTransition())
+
+ // Transition from RUNNING
+ .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING,
NodeState.UNHEALTHY),
+ NodeEventType.STATE_UPDATE,
+ STATUS_UPDATE_TRANSITION)
+ .addTransition(NodeState.RUNNING, NodeState.LOST,
+ NodeEventType.EXPIRE,
+ new DeactivateNodeTransition(NodeState.LOST))
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ NodeEventType.RECONNECTED,
+ RECONNECT_NODE_TRANSITION)
+
+ // Transitions from UNHEALTHY state
+ .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.RUNNING,
NodeState.UNHEALTHY),
+ NodeEventType.STATE_UPDATE,
+ STATUS_UPDATE_TRANSITION)
+ .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
+ NodeEventType.EXPIRE,
+ new DeactivateNodeTransition(NodeState.LOST))
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ NodeEventType.RECONNECTED,
+ RECONNECT_NODE_TRANSITION);
+
+ private final StateMachine<NodeState, NodeEventType, NodeEvent>
stateMachine =
+ stateMachineFactory.make(this, NodeState.NEW);
+
+ public NodeStatus(TajoRMContext rmContext, NodeResource
totalResourceCapability, WorkerConnectionInfo connectionInfo) {
+ this.rmContext = rmContext;
+
+ this.connectionInfo = connectionInfo;
+ this.lastHeartbeatTime = System.currentTimeMillis();
+ this.totalResourceCapability = totalResourceCapability;
+ this.availableResource = NodeResources.clone(totalResourceCapability);
+ }
+
+ public int getWorkerId() {
+ return connectionInfo.getId();
+ }
+
+ public WorkerConnectionInfo getConnectionInfo() {
+ return connectionInfo;
+ }
+
+ public void setLastHeartbeatTime(long lastheartbeatReportTime) {
+ HEARTBEAT_TIME_UPDATER.lazySet(this, lastheartbeatReportTime);
+ }
+
+ public void setNumRunningQueryMaster(int numRunningQueryMaster) {
+ RUNNING_QM_UPDATER.lazySet(this, numRunningQueryMaster);
+ }
+
+ public int getNumRunningQueryMaster() {
+ return numRunningQueryMaster;
+ }
+
+ public void setNumRunningTasks(int numRunningTasks) {
+ RUNNING_TASK_UPDATER.lazySet(this, numRunningTasks);
--- End diff --
``RUNNING_TASK_UPDATE`` contains the number of running tasks. But, the
getter method ``getNumRunningTasks()`` uses just the member variable
``numrunningTasks``. It seems to be invalid.
> Resource allocation should be fine grained.
> -------------------------------------------
>
> Key: TAJO-1397
> URL: https://issues.apache.org/jira/browse/TAJO-1397
> Project: Tajo
> Issue Type: Improvement
> Components: QueryMaster, resource manager, Worker
> Reporter: Hyunsik Choi
> Assignee: Jinho Kim
> Fix For: 0.11.0
>
> Attachments: ResoruceSequence.jpg, TAJO-1397.patch,
> TAJO-1397_2.patch, TAJO-1397_3.patch, TAJO-1397_4.patch,
> old_resource_circuit.png, resource_circuit.png
>
>
> See the comment:
> https://issues.apache.org/jira/browse/TAJO-540?focusedCommentId=14359478&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14359478
> From the discussion in TAJO-540
> {quote}
> In general, query (or job) scheduler aims at the maximum resource
> utilization. For multi-tenancy, we also need to consider the fairness for
> multiple users (or queries). BTW, the maximum resource utilization and
> fairness are usually conflict to each other in many cases. To mitigate this
> problem, many scheduler seems to use preemption approach.
> In this point, our resource and scheduler system has the following problems:
> * A query exclusively uses allocated resources at the first time until the
> query is completed or failed.
> * There is no mechanism to deallocate resources during query processing.
> * Preempt is also not allowed.
> To achieve the multi tenancy, we should change our resource circulation.
> Especially, resource allocation must be fine grained instead of per query.
> So, I'll create a jira issue to change the resource circulation. We have to
> do this issue firstly in my opinion. If we achieve this, implementing
> multi-tenant scheduler would be much easier than now. It would be a good
> starting point of this issue.
> {quote}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)