http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java deleted file mode 100644 index 0f161ff..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java +++ /dev/null @@ -1,443 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TajoProtos.TaskAttemptState; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; -import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; -import org.apache.tajo.master.querymaster.Task.IntermediateEntry; -import org.apache.tajo.master.querymaster.Task.PullHost; -import org.apache.tajo.master.container.TajoContainerId; - -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; - -public class TaskAttempt implements EventHandler<TaskAttemptEvent> { - - private static final Log LOG = LogFactory.getLog(TaskAttempt.class); - - private final static int EXPIRE_TIME = 15000; - - private final TaskAttemptId id; - private final Task task; - final EventHandler eventHandler; - - private TajoContainerId containerId; - private WorkerConnectionInfo workerConnectionInfo; - private int expire; - - private final Lock readLock; - private final Lock writeLock; - - private final List<String> diagnostics = new ArrayList<String>(); - - private final TaskAttemptScheduleContext scheduleContext; - - private float progress; - private CatalogProtos.TableStatsProto inputStats; - private CatalogProtos.TableStatsProto resultStats; - - protected static final StateMachineFactory - <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> - stateMachineFactory = new StateMachineFactory - <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> - (TaskAttemptState.TA_NEW) - - // Transitions from TA_NEW state - .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, - TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition()) - .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, - TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition()) - .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_KILL, - new TaskKilledCompleteTransition()) - - // Transitions from TA_UNASSIGNED state - .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED, - TaskAttemptEventType.TA_ASSIGNED, - new LaunchTransition()) - .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_KILL, - new KillUnassignedTaskTransition()) - - // Transitions from TA_ASSIGNED state - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED, - TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_KILL, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_KILL, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, - EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED), - TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_DONE, new SucceededTransition()) - .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - - // Transitions from TA_RUNNING state - .addTransition(TaskAttemptState.TA_RUNNING, - EnumSet.of(TaskAttemptState.TA_RUNNING), - TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) - .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_KILL, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_DONE, new SucceededTransition()) - .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_LOCAL_KILLED, - new TaskKilledCompleteTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, - TaskAttemptEventType.TA_ASSIGNED, - new KillTaskTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_SCHEDULE_CANCELED, - new TaskKilledCompleteTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_DONE, - new TaskKilledCompleteTransition()) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR) - .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, - EnumSet.of( - TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, - TaskAttemptEventType.TA_UPDATE)) - - // Transitions from TA_SUCCEEDED state - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_UPDATE) - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition()) - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED, - TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - // Ignore-able transitions - .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, - TaskAttemptEventType.TA_KILL) - - // Transitions from TA_KILLED state - .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE) - // Ignore-able transitions - .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - EnumSet.of( - TaskAttemptEventType.TA_UPDATE)) - .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, - EnumSet.of( - TaskAttemptEventType.TA_LOCAL_KILLED, - TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_ASSIGNED, - TaskAttemptEventType.TA_DONE), - new TaskKilledCompleteTransition()) - .installTopology(); - - private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> - stateMachine; - - - public TaskAttempt(final TaskAttemptScheduleContext scheduleContext, - final TaskAttemptId id, final Task task, - final EventHandler eventHandler) { - this.scheduleContext = scheduleContext; - this.id = id; - this.expire = TaskAttempt.EXPIRE_TIME; - this.task = task; - this.eventHandler = eventHandler; - - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - - stateMachine = stateMachineFactory.make(this); - } - - public TaskAttemptState getState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } - } - - public TaskAttemptId getId() { - return this.id; - } - - public boolean isLeafTask() { - return this.task.isLeafTask(); - } - - public Task getTask() { - return this.task; - } - - public WorkerConnectionInfo getWorkerConnectionInfo() { - return this.workerConnectionInfo; - } - - public void setContainerId(TajoContainerId containerId) { - this.containerId = containerId; - } - - public synchronized void setExpireTime(int expire) { - this.expire = expire; - } - - public synchronized void updateExpireTime(int period) { - this.setExpireTime(this.expire - period); - } - - public synchronized void resetExpireTime() { - this.setExpireTime(TaskAttempt.EXPIRE_TIME); - } - - public int getLeftTime() { - return this.expire; - } - - public float getProgress() { - return progress; - } - - public TableStats getInputStats() { - if (inputStats == null) { - return null; - } - - return new TableStats(inputStats); - } - - public TableStats getResultStats() { - if (resultStats == null) { - return null; - } - return new TableStats(resultStats); - } - - private void fillTaskStatistics(TaskCompletionReport report) { - this.progress = 1.0f; - - List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); - - if (report.getShuffleFileOutputsCount() > 0) { - this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList()); - - PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort()); - for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { - IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(), - getId().getId(), p.getPartId(), host, p.getVolume()); - partitions.add(entry); - } - } - this.getTask().setIntermediateData(partitions); - - if (report.hasInputStats()) { - this.inputStats = report.getInputStats(); - } - if (report.hasResultStats()) { - this.resultStats = report.getResultStats(); - this.getTask().setStats(new TableStats(resultStats)); - } - } - - private static class TaskAttemptScheduleTransition implements - SingleArcTransition<TaskAttempt, TaskAttemptEvent> { - - @Override - public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { - taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent( - EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(), - taskAttempt.scheduleContext, taskAttempt)); - } - } - - private static class KillUnassignedTaskTransition implements - SingleArcTransition<TaskAttempt, TaskAttemptEvent> { - - @Override - public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { - taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent( - EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(), - taskAttempt.scheduleContext, taskAttempt)); - } - } - - private static class LaunchTransition - implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { - - @Override - public void transition(TaskAttempt taskAttempt, - TaskAttemptEvent event) { - TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; - taskAttempt.containerId = castEvent.getContainerId(); - taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); - taskAttempt.eventHandler.handle( - new TaskTAttemptEvent(taskAttempt.getId(), - TaskEventType.T_ATTEMPT_LAUNCHED)); - } - } - - private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { - - @Override - public void transition(TaskAttempt taskAttempt, - TaskAttemptEvent event) { - taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(), - TaskEventType.T_ATTEMPT_KILLED)); - LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask"); - } - } - - private static class StatusUpdateTransition - implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> { - - @Override - public TaskAttemptState transition(TaskAttempt taskAttempt, - TaskAttemptEvent event) { - TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event; - - taskAttempt.progress = updateEvent.getStatus().getProgress(); - taskAttempt.inputStats = updateEvent.getStatus().getInputStats(); - taskAttempt.resultStats = updateEvent.getStatus().getResultStats(); - - return TaskAttemptState.TA_RUNNING; - } - } - - private void addDiagnosticInfo(String diag) { - if (diag != null && !diag.equals("")) { - diagnostics.add(diag); - } - } - - private static class AlreadyAssignedTransition - implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ - - @Override - public void transition(TaskAttempt taskAttempt, - TaskAttemptEvent taskAttemptEvent) { - } - } - - private static class AlreadyDoneTransition - implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ - - @Override - public void transition(TaskAttempt taskAttempt, - TaskAttemptEvent taskAttemptEvent) { - } - } - - private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { - @Override - public void transition(TaskAttempt taskAttempt, - TaskAttemptEvent event) { - TaskCompletionReport report = ((TaskCompletionEvent)event).getReport(); - - try { - taskAttempt.fillTaskStatistics(report); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); - } catch (Throwable t) { - taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); - taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); - } - } - } - - private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { - - @Override - public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { - taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId, - LocalTaskEventType.KILL)); - } - } - - private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ - @Override - public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { - TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); - taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); - LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() - + " >> " + errorEvent.errorMessage()); - } - } - - @Override - public void handle(TaskAttemptEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType()); - } - try { - writeLock.lock(); - TaskAttemptState oldState = getState(); - try { - stateMachine.doTransition(event.getType(), event); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")" - + ", eventType:" + event.getType().name() - + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() - , e); - eventHandler.handle( - new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(), - "Can't handle this event at current state of " + event.getTaskAttemptId() + ")")); - eventHandler.handle( - new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(), - StageEventType.SQ_INTERNAL_ERROR)); - } - - //notify the eventhandler of state change - if (LOG.isDebugEnabled()) { - if (oldState != getState()) { - LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to " - + getState()); - } - } - } - - finally { - writeLock.unlock(); - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index f1a9224..c4200d5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -36,10 +36,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.querymaster.QueryInProgress; +import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.util.ApplicationIdUtils; -import org.apache.tajo.util.StringUtils; import java.io.IOException; import java.util.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index 9c2b71b..b237cc5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -23,7 +23,7 @@ import org.apache.hadoop.service.Service; import org.apache.tajo.QueryId; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.master.querymaster.QueryInProgress; +import org.apache.tajo.querymaster.QueryInProgress; import java.io.IOException; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java new file mode 100644 index 0000000..3dd3389 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import com.google.common.base.Objects; +import org.apache.tajo.QueryId; + +public class QuerySchedulingInfo { + private QueryId queryId; + private Integer priority; + private Long startTime; + + public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) { + this.queryId = queryId; + this.priority = priority; + this.startTime = startTime; + } + + public QueryId getQueryId() { + return queryId; + } + + public Integer getPriority() { + return priority; + } + + public Long getStartTime() { + return startTime; + } + + public String getName() { + return queryId.getId(); + } + + @Override + public int hashCode() { + return Objects.hashCode(startTime, getName(), priority); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java new file mode 100644 index 0000000..02203a9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import org.apache.tajo.QueryId; +import org.apache.tajo.querymaster.QueryInProgress; + +import java.util.List; + +public interface Scheduler { + + public Mode getMode(); + + public String getName(); + + public boolean addQuery(QueryInProgress resource); + + public boolean removeQuery(QueryId queryId); + + public List<QueryInProgress> getRunningQueries(); + + public enum Mode { + FIFO + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java new file mode 100644 index 0000000..7fd07b5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import java.util.Comparator; + +/** + * Utility class containing scheduling algorithms used in the scheduler. + */ + +public class SchedulingAlgorithms { + /** + * Compare Schedulables in order of priority and then submission time, as in + * the default FIFO scheduler in Tajo. + */ + public static class FifoComparator implements Comparator<QuerySchedulingInfo> { + @Override + public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) { + int res = q1.getPriority().compareTo(q2.getPriority()); + if (res == 0) { + res = (int) Math.signum(q1.getStartTime() - q2.getStartTime()); + } + if (res == 0) { + // In the rare case where jobs were submitted at the exact same time, + // compare them by name (which will be the QueryId) to get a deterministic ordering + res = q1.getName().compareTo(q2.getName()); + } + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java new file mode 100644 index 0000000..bd8ca28 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.master.QueryJobManager; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SimpleFifoScheduler implements Scheduler { + private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName()); + private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>(); + private final Thread queryProcessor; + private AtomicBoolean stopped = new AtomicBoolean(); + private QueryJobManager manager; + private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator(); + + public SimpleFifoScheduler(QueryJobManager manager) { + this.manager = manager; + this.queryProcessor = new Thread(new QueryProcessor()); + this.queryProcessor.setName("Query Processor"); + } + + @Override + public Mode getMode() { + return Mode.FIFO; + } + + @Override + public String getName() { + return manager.getName(); + } + + @Override + public boolean addQuery(QueryInProgress queryInProgress) { + int qSize = pool.size(); + if (qSize != 0 && qSize % 100 == 0) { + LOG.info("Size of Fifo queue is " + qSize); + } + + QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime()); + boolean result = pool.add(querySchedulingInfo); + if (getRunningQueries().size() == 0) wakeupProcessor(); + return result; + } + + @Override + public boolean removeQuery(QueryId queryId) { + return pool.remove(getQueryByQueryId(queryId)); + } + + public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) { + for (QuerySchedulingInfo querySchedulingInfo : pool) { + if (querySchedulingInfo.getQueryId().equals(queryId)) { + return querySchedulingInfo; + } + } + return null; + } + + @Override + public List<QueryInProgress> getRunningQueries() { + return new ArrayList<QueryInProgress>(manager.getRunningQueries()); + } + + public void start() { + queryProcessor.start(); + } + + public void stop() { + if (stopped.getAndSet(true)) { + return; + } + pool.clear(); + synchronized (queryProcessor) { + queryProcessor.interrupt(); + } + } + + private QuerySchedulingInfo pollScheduledQuery() { + if (pool.size() > 1) { + Collections.sort(pool, COMPARATOR); + } + return pool.poll(); + } + + private void wakeupProcessor() { + synchronized (queryProcessor) { + queryProcessor.notifyAll(); + } + } + + private final class QueryProcessor implements Runnable { + @Override + public void run() { + + QuerySchedulingInfo query; + + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + query = null; + if (getRunningQueries().size() == 0) { + query = pollScheduledQuery(); + } + + if (query != null) { + try { + manager.startQueryJob(query.getQueryId()); + } catch (Throwable t) { + LOG.fatal("Exception during query startup:", t); + manager.stopQuery(query.getQueryId()); + } + } + + synchronized (queryProcessor) { + try { + queryProcessor.wait(500); + } catch (InterruptedException e) { + if (stopped.get()) { + break; + } + LOG.warn("Exception during shutdown: ", e); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java deleted file mode 100644 index 3f48ca5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -public class InvalidSessionException extends Exception { - public InvalidSessionException(String sessionId) { - super("Invalid session id \"" + sessionId + "\""); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java deleted file mode 100644 index 686d860..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -public class NoSuchSessionVariableException extends Exception { - public NoSuchSessionVariableException(String varname) { - super("No such session variable \"" + varname + "\""); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java deleted file mode 100644 index 5f44ecb..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -import com.google.common.collect.ImmutableMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.QueryId; -import org.apache.tajo.SessionVars; -import org.apache.tajo.master.NonForwardQueryResultScanner; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.common.ProtoObject; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto; - -public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable { - private static final Log LOG = LogFactory.getLog(Session.class); - - private final String sessionId; - private final String userName; - private String currentDatabase; - private final Map<String, String> sessionVariables; - private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>(); - - // transient status - private volatile long lastAccessTime; - - public Session(String sessionId, String userName, String databaseName) { - this.sessionId = sessionId; - this.userName = userName; - this.currentDatabase = databaseName; - this.lastAccessTime = System.currentTimeMillis(); - - this.sessionVariables = new HashMap<String, String>(); - sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId); - sessionVariables.put(SessionVars.USERNAME.keyname(), userName); - selectDatabase(databaseName); - } - - public Session(SessionProto proto) { - sessionId = proto.getSessionId(); - userName = proto.getUsername(); - currentDatabase = proto.getCurrentDatabase(); - lastAccessTime = proto.getLastAccessTime(); - KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables()); - sessionVariables = keyValueSet.getAllKeyValus(); - } - - public String getSessionId() { - return sessionId; - } - - public String getUserName() { - return userName; - } - - public void updateLastAccessTime() { - lastAccessTime = System.currentTimeMillis(); - } - - public long getLastAccessTime() { - return lastAccessTime; - } - - public void setVariable(String name, String value) { - synchronized (sessionVariables) { - sessionVariables.put(SessionVars.handleDeprecatedName(name), value); - } - } - - public String getVariable(String name) throws NoSuchSessionVariableException { - synchronized (sessionVariables) { - if (sessionVariables.containsKey(name)) { - return sessionVariables.get(SessionVars.handleDeprecatedName(name)); - } else { - throw new NoSuchSessionVariableException(name); - } - } - } - - public void removeVariable(String name) { - synchronized (sessionVariables) { - sessionVariables.remove(SessionVars.handleDeprecatedName(name)); - } - } - - public synchronized Map<String, String> getAllVariables() { - synchronized (sessionVariables) { - sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId); - sessionVariables.put(SessionVars.USERNAME.keyname(), userName); - sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime)); - sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase); - return ImmutableMap.copyOf(sessionVariables); - } - } - - public synchronized void selectDatabase(String databaseName) { - this.currentDatabase = databaseName; - } - - public synchronized String getCurrentDatabase() { - return currentDatabase; - } - - @Override - public SessionProto getProto() { - SessionProto.Builder builder = SessionProto.newBuilder(); - builder.setSessionId(getSessionId()); - builder.setUsername(getUserName()); - builder.setCurrentDatabase(getCurrentDatabase()); - builder.setLastAccessTime(lastAccessTime); - KeyValueSet variables = new KeyValueSet(); - - synchronized (sessionVariables) { - variables.putAll(this.sessionVariables); - builder.setVariables(variables.getProto()); - return builder.build(); - } - } - - public String toString() { - return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime(); - } - - public Session clone() throws CloneNotSupportedException { - Session newSession = (Session) super.clone(); - newSession.sessionVariables.putAll(getAllVariables()); - return newSession; - } - - public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) { - synchronized (nonForwardQueryMap) { - return nonForwardQueryMap.get(queryId); - } - } - - public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) { - synchronized (nonForwardQueryMap) { - nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner); - } - } - - public void closeNonForwardQueryResultScanner(QueryId queryId) { - NonForwardQueryResultScanner resultScanner; - synchronized (nonForwardQueryMap) { - resultScanner = nonForwardQueryMap.remove(queryId); - } - - if (resultScanner != null) { - try { - resultScanner.close(); - } catch (Exception e) { - LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e); - } - } - } - - public void close() { - try { - synchronized (nonForwardQueryMap) { - for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) { - try { - eachQueryScanner.close(); - } catch (Exception e) { - LOG.error("Error while closing NonForwardQueryResultScanner: " + - eachQueryScanner.getSessionId() + ", " + e.getMessage(), e); - } - } - - nonForwardQueryMap.clear(); - } - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - throw new RuntimeException(t.getMessage(), t); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java deleted file mode 100644 index 46f49a2..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -public interface SessionConstants { - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java deleted file mode 100644 index dce3ba6..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -import org.apache.hadoop.yarn.event.AbstractEvent; - -public class SessionEvent extends AbstractEvent<SessionEventType> { - private final String sessionId; - - public SessionEvent(String sessionId, SessionEventType sessionEventType) { - super(sessionEventType); - this.sessionId = sessionId; - } - - public String getSessionId() { - return sessionId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java deleted file mode 100644 index 64c6fc6..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -public enum SessionEventType { - EXPIRE, - PING -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java deleted file mode 100644 index 912f769..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tajo.conf.TajoConf; - -public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> { - - private EventHandler dispatcher; - - public SessionLivelinessMonitor(Dispatcher d) { - super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock()); - this.dispatcher = d.getEventHandler(); - } - - public void serviceInit(Configuration conf) throws Exception { - Preconditions.checkArgument(conf instanceof TajoConf); - TajoConf systemConf = (TajoConf) conf; - - // seconds - int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.$CLIENT_SESSION_EXPIRY_TIME); - setExpireInterval(expireIntvl); - setMonitorInterval(expireIntvl / 3); - super.serviceInit(conf); - } - - @Override - protected void expire(String id) { - dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java deleted file mode 100644 index d701d03..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.session; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -public class SessionManager extends CompositeService implements EventHandler<SessionEvent> { - private static final Log LOG = LogFactory.getLog(SessionManager.class); - - public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); - private final Dispatcher dispatcher; - private SessionLivelinessMonitor sessionLivelinessMonitor; - - - public SessionManager(Dispatcher dispatcher) { - super(SessionManager.class.getSimpleName()); - this.dispatcher = dispatcher; - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher); - addIfService(sessionLivelinessMonitor); - super.serviceInit(conf); - } - - @Override - public void serviceStop() throws Exception { - super.serviceStop(); - } - - private void assertSessionExistence(String sessionId) throws InvalidSessionException { - if (!sessions.containsKey(sessionId)) { - throw new InvalidSessionException(sessionId); - } - } - - public String createSession(String username, String baseDatabaseName) throws InvalidSessionException { - String sessionId; - Session oldSession; - - sessionId = UUID.randomUUID().toString(); - Session newSession = new Session(sessionId, username, baseDatabaseName); - oldSession = sessions.putIfAbsent(sessionId, newSession); - if (oldSession != null) { - throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId()); - } - LOG.info("Session " + sessionId + " is created." ); - return sessionId; - } - - public Session removeSession(String sessionId) { - if (sessions.containsKey(sessionId)) { - LOG.info("Session " + sessionId + " is removed."); - Session session = sessions.remove(sessionId); - session.close(); - return session; - } else { - LOG.error("No such session id: " + sessionId); - return null; - } - } - - public Session getSession(String sessionId) throws InvalidSessionException { - assertSessionExistence(sessionId); - touch(sessionId); - return sessions.get(sessionId); - } - - public void setVariable(String sessionId, String name, String value) throws InvalidSessionException { - assertSessionExistence(sessionId); - touch(sessionId); - sessions.get(sessionId).setVariable(name, value); - } - - public String getVariable(String sessionId, String name) - throws InvalidSessionException, NoSuchSessionVariableException { - assertSessionExistence(sessionId); - touch(sessionId); - return sessions.get(sessionId).getVariable(name); - } - - public void removeVariable(String sessionId, String name) throws InvalidSessionException { - assertSessionExistence(sessionId); - touch(sessionId); - sessions.get(sessionId).removeVariable(name); - } - - public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException { - assertSessionExistence(sessionId); - touch(sessionId); - return sessions.get(sessionId).getAllVariables(); - } - - public void touch(String sessionId) throws InvalidSessionException { - assertSessionExistence(sessionId); - sessions.get(sessionId).updateLastAccessTime(); - sessionLivelinessMonitor.receivedPing(sessionId); - } - - @Override - public void handle(SessionEvent event) { - LOG.info("Processing " + event.getSessionId() + " of type " + event.getType()); - - try { - assertSessionExistence(event.getSessionId()); - touch(event.getSessionId()); - } catch (InvalidSessionException e) { - LOG.error(e); - } - - if (event.getType() == SessionEventType.EXPIRE) { - Session session = removeSession(event.getSessionId()); - if (session != null) { - LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java new file mode 100644 index 0000000..82ebe29 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import org.apache.tajo.master.TajoMaster; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; + +public class CatalogMetricsGaugeSet implements MetricSet { + TajoMaster.MasterContext tajoMasterContext; + public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) { + this.tajoMasterContext = tajoMasterContext; + } + + @Override + public Map<String, Metric> getMetrics() { + Map<String, Metric> metricsMap = new HashMap<String, Metric>(); + metricsMap.put("numTables", new Gauge<Integer>() { + @Override + public Integer getValue() { + return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size(); + } + }); + + metricsMap.put("numFunctions", new Gauge<Integer>() { + @Override + public Integer getValue() { + return tajoMasterContext.getCatalog().getFunctions().size(); + } + }); + + return metricsMap; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java new file mode 100644 index 0000000..229a80a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.WorkerState; + +import java.util.HashMap; +import java.util.Map; + +public class WorkerResourceMetricsGaugeSet implements MetricSet { + TajoMaster.MasterContext tajoMasterContext; + public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) { + this.tajoMasterContext = tajoMasterContext; + } + + @Override + public Map<String, Metric> getMetrics() { + Map<String, Metric> metricsMap = new HashMap<String, Metric>(); + metricsMap.put("totalWorkers", new Gauge<Integer>() { + @Override + public Integer getValue() { + return tajoMasterContext.getResourceManager().getWorkers().size(); + } + }); + + metricsMap.put("liveWorkers", new Gauge<Integer>() { + @Override + public Integer getValue() { + return getNumWorkers(WorkerState.RUNNING); + } + }); + + metricsMap.put("deadWorkers", new Gauge<Integer>() { + @Override + public Integer getValue() { + return getNumWorkers(WorkerState.LOST); + } + }); + + return metricsMap; + } + + protected int getNumWorkers(WorkerState status) { + int numWorkers = 0; + for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) { + if(eachWorker.getState() == status) { + numWorkers++; + } + } + + return numWorkers; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java new file mode 100644 index 0000000..e45f274 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.master.event.TaskRequestEvent; +import org.apache.tajo.master.event.TaskSchedulerEvent; + + +public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> { + + protected int hostLocalAssigned; + protected int rackLocalAssigned; + protected int totalAssigned; + + /** + * Construct the service. + * + * @param name service name + */ + public AbstractTaskScheduler(String name) { + super(name); + } + + public int getHostLocalAssigned() { + return hostLocalAssigned; + } + + public int getRackLocalAssigned() { + return rackLocalAssigned; + } + + public int getTotalAssigned() { + return totalAssigned; + } + + public abstract void handleTaskRequestEvent(TaskRequestEvent event); + public abstract int remainingScheduledObjectNum(); +}
