http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java new file mode 100644 index 0000000..86c49b4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.state.*; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.event.*; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; +import org.apache.tajo.querymaster.Task.IntermediateEntry; +import org.apache.tajo.querymaster.Task.PullHost; +import org.apache.tajo.master.container.TajoContainerId; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; + +public class TaskAttempt implements EventHandler<TaskAttemptEvent> { + + private static final Log LOG = LogFactory.getLog(TaskAttempt.class); + + private final static int EXPIRE_TIME = 15000; + + private final TaskAttemptId id; + private final Task task; + final EventHandler eventHandler; + + private TajoContainerId containerId; + private WorkerConnectionInfo workerConnectionInfo; + private int expire; + + private final Lock readLock; + private final Lock writeLock; + + private final List<String> diagnostics = new ArrayList<String>(); + + private final TaskAttemptScheduleContext scheduleContext; + + private float progress; + private CatalogProtos.TableStatsProto inputStats; + private CatalogProtos.TableStatsProto resultStats; + + protected static final StateMachineFactory + <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> + stateMachineFactory = new StateMachineFactory + <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> + (TaskAttemptState.TA_NEW) + + // Transitions from TA_NEW state + .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, + TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition()) + .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, + TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition()) + .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_KILL, + new TaskKilledCompleteTransition()) + + // Transitions from TA_UNASSIGNED state + .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED, + TaskAttemptEventType.TA_ASSIGNED, + new LaunchTransition()) + .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_KILL, + new KillUnassignedTaskTransition()) + + // Transitions from TA_ASSIGNED state + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED, + TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, + EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED), + TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_DONE, new SucceededTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + + // Transitions from TA_RUNNING state + .addTransition(TaskAttemptState.TA_RUNNING, + EnumSet.of(TaskAttemptState.TA_RUNNING), + TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) + .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_DONE, new SucceededTransition()) + .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_LOCAL_KILLED, + new TaskKilledCompleteTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_ASSIGNED, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_SCHEDULE_CANCELED, + new TaskKilledCompleteTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_DONE, + new TaskKilledCompleteTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, + EnumSet.of( + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + TaskAttemptEventType.TA_UPDATE)) + + // Transitions from TA_SUCCEEDED state + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_UPDATE) + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition()) + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + // Ignore-able transitions + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_KILL) + + // Transitions from TA_KILLED state + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE) + // Ignore-able transitions + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + EnumSet.of( + TaskAttemptEventType.TA_UPDATE)) + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + EnumSet.of( + TaskAttemptEventType.TA_LOCAL_KILLED, + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_ASSIGNED, + TaskAttemptEventType.TA_DONE), + new TaskKilledCompleteTransition()) + .installTopology(); + + private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> + stateMachine; + + + public TaskAttempt(final TaskAttemptScheduleContext scheduleContext, + final TaskAttemptId id, final Task task, + final EventHandler eventHandler) { + this.scheduleContext = scheduleContext; + this.id = id; + this.expire = TaskAttempt.EXPIRE_TIME; + this.task = task; + this.eventHandler = eventHandler; + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + + stateMachine = stateMachineFactory.make(this); + } + + public TaskAttemptState getState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + public TaskAttemptId getId() { + return this.id; + } + + public boolean isLeafTask() { + return this.task.isLeafTask(); + } + + public Task getTask() { + return this.task; + } + + public WorkerConnectionInfo getWorkerConnectionInfo() { + return this.workerConnectionInfo; + } + + public void setContainerId(TajoContainerId containerId) { + this.containerId = containerId; + } + + public synchronized void setExpireTime(int expire) { + this.expire = expire; + } + + public synchronized void updateExpireTime(int period) { + this.setExpireTime(this.expire - period); + } + + public synchronized void resetExpireTime() { + this.setExpireTime(TaskAttempt.EXPIRE_TIME); + } + + public int getLeftTime() { + return this.expire; + } + + public float getProgress() { + return progress; + } + + public TableStats getInputStats() { + if (inputStats == null) { + return null; + } + + return new TableStats(inputStats); + } + + public TableStats getResultStats() { + if (resultStats == null) { + return null; + } + return new TableStats(resultStats); + } + + private void fillTaskStatistics(TaskCompletionReport report) { + this.progress = 1.0f; + + List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); + + if (report.getShuffleFileOutputsCount() > 0) { + this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList()); + + PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort()); + for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { + IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(), + getId().getId(), p.getPartId(), host, p.getVolume()); + partitions.add(entry); + } + } + this.getTask().setIntermediateData(partitions); + + if (report.hasInputStats()) { + this.inputStats = report.getInputStats(); + } + if (report.hasResultStats()) { + this.resultStats = report.getResultStats(); + this.getTask().setStats(new TableStats(resultStats)); + } + } + + private static class TaskAttemptScheduleTransition implements + SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { + taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent( + EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(), + taskAttempt.scheduleContext, taskAttempt)); + } + } + + private static class KillUnassignedTaskTransition implements + SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { + taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent( + EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(), + taskAttempt.scheduleContext, taskAttempt)); + } + } + + private static class LaunchTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; + taskAttempt.containerId = castEvent.getContainerId(); + taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); + taskAttempt.eventHandler.handle( + new TaskTAttemptEvent(taskAttempt.getId(), + TaskEventType.T_ATTEMPT_LAUNCHED)); + } + } + + private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(), + TaskEventType.T_ATTEMPT_KILLED)); + LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask"); + } + } + + private static class StatusUpdateTransition + implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> { + + @Override + public TaskAttemptState transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event; + + taskAttempt.progress = updateEvent.getStatus().getProgress(); + taskAttempt.inputStats = updateEvent.getStatus().getInputStats(); + taskAttempt.resultStats = updateEvent.getStatus().getResultStats(); + + return TaskAttemptState.TA_RUNNING; + } + } + + private void addDiagnosticInfo(String diag) { + if (diag != null && !diag.equals("")) { + diagnostics.add(diag); + } + } + + private static class AlreadyAssignedTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent taskAttemptEvent) { + } + } + + private static class AlreadyDoneTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent taskAttemptEvent) { + } + } + + private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + TaskCompletionReport report = ((TaskCompletionEvent)event).getReport(); + + try { + taskAttempt.fillTaskStatistics(report); + taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); + } catch (Throwable t) { + taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); + taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); + } + } + } + + private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId, + LocalTaskEventType.KILL)); + } + } + + private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; + taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); + taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); + LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() + + " >> " + errorEvent.errorMessage()); + } + } + + @Override + public void handle(TaskAttemptEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType()); + } + try { + writeLock.lock(); + TaskAttemptState oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); + eventHandler.handle( + new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(), + "Can't handle this event at current state of " + event.getTaskAttemptId() + ")")); + eventHandler.handle( + new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(), + StageEventType.SQ_INTERNAL_ERROR)); + } + + //notify the eventhandler of state change + if (LOG.isDebugEnabled()) { + if (oldState != getState()) { + LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to " + + getState()); + } + } + } + + finally { + writeLock.unlock(); + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java new file mode 100644 index 0000000..b699674 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.querymaster.QueryMasterTask; + +public class TaskSchedulerContext { + private QueryMasterTask.QueryMasterTaskContext masterContext; + private boolean isLeafQuery; + private ExecutionBlockId blockId; + private int taskSize; + private int estimatedTaskNum; + + public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery, + ExecutionBlockId blockId) { + this.masterContext = masterContext; + this.isLeafQuery = isLeafQuery; + this.blockId = blockId; + } + + public QueryMasterTask.QueryMasterTaskContext getMasterContext() { + return masterContext; + } + + public boolean isLeafQuery() { + return isLeafQuery; + } + + public ExecutionBlockId getBlockId() { + return blockId; + } + + public int getTaskSize() { + return taskSize; + } + + public int getEstimatedTaskNum() { + return estimatedTaskNum; + } + + public void setTaskSize(int taskSize) { + this.taskSize = taskSize; + } + + public void setEstimatedTaskNum(int estimatedTaskNum) { + this.estimatedTaskNum = estimatedTaskNum; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java new file mode 100644 index 0000000..2794771 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Map; + +public class TaskSchedulerFactory { + private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS; + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); + private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class }; + + public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf) + throws IOException { + if (CACHED_ALGORITHM_CLASS != null) { + return CACHED_ALGORITHM_CLASS; + } else { + CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class); + } + + if (CACHED_ALGORITHM_CLASS == null) { + throw new IOException("Task scheduler is null"); + } + return CACHED_ALGORITHM_CLASS; + } + + public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context, + Stage stage) { + T result; + try { + Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); + if (constructor == null) { + constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(clazz, constructor); + } + result = constructor.newInstance(new Object[]{context, stage}); + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + } + + public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage) + throws IOException { + return get(getTaskSchedulerClass(conf), context, stage); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java deleted file mode 100644 index d9932bd..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.scheduler; - -import com.google.common.base.Objects; -import org.apache.tajo.QueryId; - -public class QuerySchedulingInfo { - private QueryId queryId; - private Integer priority; - private Long startTime; - - public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) { - this.queryId = queryId; - this.priority = priority; - this.startTime = startTime; - } - - public QueryId getQueryId() { - return queryId; - } - - public Integer getPriority() { - return priority; - } - - public Long getStartTime() { - return startTime; - } - - public String getName() { - return queryId.getId(); - } - - @Override - public int hashCode() { - return Objects.hashCode(startTime, getName(), priority); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java deleted file mode 100644 index d74280c..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.scheduler; - -import org.apache.tajo.QueryId; -import org.apache.tajo.master.querymaster.QueryInProgress; - -import java.util.List; - -public interface Scheduler { - - public Mode getMode(); - - public String getName(); - - public boolean addQuery(QueryInProgress resource); - - public boolean removeQuery(QueryId queryId); - - public List<QueryInProgress> getRunningQueries(); - - public enum Mode { - FIFO - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java deleted file mode 100644 index 9c9b16d..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.scheduler; - -import java.util.Comparator; - -/** - * Utility class containing scheduling algorithms used in the scheduler. - */ - -public class SchedulingAlgorithms { - /** - * Compare Schedulables in order of priority and then submission time, as in - * the default FIFO scheduler in Tajo. - */ - public static class FifoComparator implements Comparator<QuerySchedulingInfo> { - @Override - public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) { - int res = q1.getPriority().compareTo(q2.getPriority()); - if (res == 0) { - res = (int) Math.signum(q1.getStartTime() - q2.getStartTime()); - } - if (res == 0) { - // In the rare case where jobs were submitted at the exact same time, - // compare them by name (which will be the QueryId) to get a deterministic ordering - res = q1.getName().compareTo(q2.getName()); - } - return res; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java deleted file mode 100644 index a74e606..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.scheduler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.QueryId; -import org.apache.tajo.master.querymaster.QueryInProgress; -import org.apache.tajo.master.querymaster.QueryJobManager; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - -public class SimpleFifoScheduler implements Scheduler { - private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName()); - private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>(); - private final Thread queryProcessor; - private AtomicBoolean stopped = new AtomicBoolean(); - private QueryJobManager manager; - private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator(); - - public SimpleFifoScheduler(QueryJobManager manager) { - this.manager = manager; - this.queryProcessor = new Thread(new QueryProcessor()); - this.queryProcessor.setName("Query Processor"); - } - - @Override - public Mode getMode() { - return Mode.FIFO; - } - - @Override - public String getName() { - return manager.getName(); - } - - @Override - public boolean addQuery(QueryInProgress queryInProgress) { - int qSize = pool.size(); - if (qSize != 0 && qSize % 100 == 0) { - LOG.info("Size of Fifo queue is " + qSize); - } - - QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime()); - boolean result = pool.add(querySchedulingInfo); - if (getRunningQueries().size() == 0) wakeupProcessor(); - return result; - } - - @Override - public boolean removeQuery(QueryId queryId) { - return pool.remove(getQueryByQueryId(queryId)); - } - - public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) { - for (QuerySchedulingInfo querySchedulingInfo : pool) { - if (querySchedulingInfo.getQueryId().equals(queryId)) { - return querySchedulingInfo; - } - } - return null; - } - - @Override - public List<QueryInProgress> getRunningQueries() { - return new ArrayList<QueryInProgress>(manager.getRunningQueries()); - } - - public void start() { - queryProcessor.start(); - } - - public void stop() { - if (stopped.getAndSet(true)) { - return; - } - pool.clear(); - synchronized (queryProcessor) { - queryProcessor.interrupt(); - } - } - - private QuerySchedulingInfo pollScheduledQuery() { - if (pool.size() > 1) { - Collections.sort(pool, COMPARATOR); - } - return pool.poll(); - } - - private void wakeupProcessor() { - synchronized (queryProcessor) { - queryProcessor.notifyAll(); - } - } - - private final class QueryProcessor implements Runnable { - @Override - public void run() { - - QuerySchedulingInfo query; - - while (!stopped.get() && !Thread.currentThread().isInterrupted()) { - query = null; - if (getRunningQueries().size() == 0) { - query = pollScheduledQuery(); - } - - if (query != null) { - try { - manager.startQueryJob(query.getQueryId()); - } catch (Throwable t) { - LOG.fatal("Exception during query startup:", t); - manager.stopQuery(query.getQueryId()); - } - } - - synchronized (queryProcessor) { - try { - queryProcessor.wait(500); - } catch (InterruptedException e) { - if (stopped.get()) { - break; - } - LOG.warn("Exception during shutdown: ", e); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java new file mode 100644 index 0000000..54c65bf --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +public class InvalidSessionException extends Exception { + public InvalidSessionException(String sessionId) { + super("Invalid session id \"" + sessionId + "\""); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java new file mode 100644 index 0000000..be90449 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +public class NoSuchSessionVariableException extends Exception { + public NoSuchSessionVariableException(String varname) { + super("No such session variable \"" + varname + "\""); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/Session.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/session/Session.java new file mode 100644 index 0000000..7ac4f85 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/Session.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.common.ProtoObject; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto; + +public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable { + private static final Log LOG = LogFactory.getLog(Session.class); + + private final String sessionId; + private final String userName; + private String currentDatabase; + private final Map<String, String> sessionVariables; + private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>(); + + // transient status + private volatile long lastAccessTime; + + public Session(String sessionId, String userName, String databaseName) { + this.sessionId = sessionId; + this.userName = userName; + this.currentDatabase = databaseName; + this.lastAccessTime = System.currentTimeMillis(); + + this.sessionVariables = new HashMap<String, String>(); + sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId); + sessionVariables.put(SessionVars.USERNAME.keyname(), userName); + selectDatabase(databaseName); + } + + public Session(SessionProto proto) { + sessionId = proto.getSessionId(); + userName = proto.getUsername(); + currentDatabase = proto.getCurrentDatabase(); + lastAccessTime = proto.getLastAccessTime(); + KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables()); + sessionVariables = keyValueSet.getAllKeyValus(); + } + + public String getSessionId() { + return sessionId; + } + + public String getUserName() { + return userName; + } + + public void updateLastAccessTime() { + lastAccessTime = System.currentTimeMillis(); + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public void setVariable(String name, String value) { + synchronized (sessionVariables) { + sessionVariables.put(SessionVars.handleDeprecatedName(name), value); + } + } + + public String getVariable(String name) throws NoSuchSessionVariableException { + synchronized (sessionVariables) { + if (sessionVariables.containsKey(name)) { + return sessionVariables.get(SessionVars.handleDeprecatedName(name)); + } else { + throw new NoSuchSessionVariableException(name); + } + } + } + + public void removeVariable(String name) { + synchronized (sessionVariables) { + sessionVariables.remove(SessionVars.handleDeprecatedName(name)); + } + } + + public synchronized Map<String, String> getAllVariables() { + synchronized (sessionVariables) { + sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId); + sessionVariables.put(SessionVars.USERNAME.keyname(), userName); + sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime)); + sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase); + return ImmutableMap.copyOf(sessionVariables); + } + } + + public synchronized void selectDatabase(String databaseName) { + this.currentDatabase = databaseName; + } + + public synchronized String getCurrentDatabase() { + return currentDatabase; + } + + @Override + public SessionProto getProto() { + SessionProto.Builder builder = SessionProto.newBuilder(); + builder.setSessionId(getSessionId()); + builder.setUsername(getUserName()); + builder.setCurrentDatabase(getCurrentDatabase()); + builder.setLastAccessTime(lastAccessTime); + KeyValueSet variables = new KeyValueSet(); + + synchronized (sessionVariables) { + variables.putAll(this.sessionVariables); + builder.setVariables(variables.getProto()); + return builder.build(); + } + } + + public String toString() { + return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime(); + } + + public Session clone() throws CloneNotSupportedException { + Session newSession = (Session) super.clone(); + newSession.sessionVariables.putAll(getAllVariables()); + return newSession; + } + + public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) { + synchronized (nonForwardQueryMap) { + return nonForwardQueryMap.get(queryId); + } + } + + public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) { + synchronized (nonForwardQueryMap) { + nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner); + } + } + + public void closeNonForwardQueryResultScanner(QueryId queryId) { + NonForwardQueryResultScanner resultScanner; + synchronized (nonForwardQueryMap) { + resultScanner = nonForwardQueryMap.remove(queryId); + } + + if (resultScanner != null) { + try { + resultScanner.close(); + } catch (Exception e) { + LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e); + } + } + } + + public void close() { + try { + synchronized (nonForwardQueryMap) { + for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) { + try { + eachQueryScanner.close(); + } catch (Exception e) { + LOG.error("Error while closing NonForwardQueryResultScanner: " + + eachQueryScanner.getSessionId() + ", " + e.getMessage(), e); + } + } + + nonForwardQueryMap.clear(); + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new RuntimeException(t.getMessage(), t); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java new file mode 100644 index 0000000..6c21a27 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +public interface SessionConstants { + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java new file mode 100644 index 0000000..819fd16 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class SessionEvent extends AbstractEvent<SessionEventType> { + private final String sessionId; + + public SessionEvent(String sessionId, SessionEventType sessionEventType) { + super(sessionEventType); + this.sessionId = sessionId; + } + + public String getSessionId() { + return sessionId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java new file mode 100644 index 0000000..8270926 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +public enum SessionEventType { + EXPIRE, + PING +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java new file mode 100644 index 0000000..2badccb --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tajo.conf.TajoConf; + +public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> { + + private EventHandler dispatcher; + + public SessionLivelinessMonitor(Dispatcher d) { + super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock()); + this.dispatcher = d.getEventHandler(); + } + + public void serviceInit(Configuration conf) throws Exception { + Preconditions.checkArgument(conf instanceof TajoConf); + TajoConf systemConf = (TajoConf) conf; + + // seconds + int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.$CLIENT_SESSION_EXPIRY_TIME); + setExpireInterval(expireIntvl); + setMonitorInterval(expireIntvl / 3); + super.serviceInit(conf); + } + + @Override + protected void expire(String id) { + dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java new file mode 100644 index 0000000..571144b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.session; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class SessionManager extends CompositeService implements EventHandler<SessionEvent> { + private static final Log LOG = LogFactory.getLog(SessionManager.class); + + public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); + private final Dispatcher dispatcher; + private SessionLivelinessMonitor sessionLivelinessMonitor; + + + public SessionManager(Dispatcher dispatcher) { + super(SessionManager.class.getSimpleName()); + this.dispatcher = dispatcher; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher); + addIfService(sessionLivelinessMonitor); + super.serviceInit(conf); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + + private void assertSessionExistence(String sessionId) throws InvalidSessionException { + if (!sessions.containsKey(sessionId)) { + throw new InvalidSessionException(sessionId); + } + } + + public String createSession(String username, String baseDatabaseName) throws InvalidSessionException { + String sessionId; + Session oldSession; + + sessionId = UUID.randomUUID().toString(); + Session newSession = new Session(sessionId, username, baseDatabaseName); + oldSession = sessions.putIfAbsent(sessionId, newSession); + if (oldSession != null) { + throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId()); + } + LOG.info("Session " + sessionId + " is created." ); + return sessionId; + } + + public Session removeSession(String sessionId) { + if (sessions.containsKey(sessionId)) { + LOG.info("Session " + sessionId + " is removed."); + Session session = sessions.remove(sessionId); + session.close(); + return session; + } else { + LOG.error("No such session id: " + sessionId); + return null; + } + } + + public Session getSession(String sessionId) throws InvalidSessionException { + assertSessionExistence(sessionId); + touch(sessionId); + return sessions.get(sessionId); + } + + public void setVariable(String sessionId, String name, String value) throws InvalidSessionException { + assertSessionExistence(sessionId); + touch(sessionId); + sessions.get(sessionId).setVariable(name, value); + } + + public String getVariable(String sessionId, String name) + throws InvalidSessionException, NoSuchSessionVariableException { + assertSessionExistence(sessionId); + touch(sessionId); + return sessions.get(sessionId).getVariable(name); + } + + public void removeVariable(String sessionId, String name) throws InvalidSessionException { + assertSessionExistence(sessionId); + touch(sessionId); + sessions.get(sessionId).removeVariable(name); + } + + public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException { + assertSessionExistence(sessionId); + touch(sessionId); + return sessions.get(sessionId).getAllVariables(); + } + + public void touch(String sessionId) throws InvalidSessionException { + assertSessionExistence(sessionId); + sessions.get(sessionId).updateLastAccessTime(); + sessionLivelinessMonitor.receivedPing(sessionId); + } + + @Override + public void handle(SessionEvent event) { + LOG.info("Processing " + event.getSessionId() + " of type " + event.getType()); + + try { + assertSessionExistence(event.getSessionId()); + touch(event.getSessionId()); + } catch (InvalidSessionException e) { + LOG.error(e); + } + + if (event.getType() == SessionEventType.EXPIRE) { + Session session = removeSession(event.getSessionId()); + if (session != null) { + LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 6050617..d711258 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -24,11 +24,11 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.master.ha.HAService; -import org.apache.tajo.master.querymaster.QueryInProgress; -import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.Stage; +import org.apache.tajo.ha.HAService; +import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.Task; +import org.apache.tajo.querymaster.Stage; import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.worker.TaskRunnerHistory; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index 932f584..c3f0087 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; -import org.apache.tajo.master.querymaster.QueryInfo; +import org.apache.tajo.master.QueryInfo; import java.io.EOFException; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 5934885..9eb58da 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.master.querymaster.QueryInfo; +import org.apache.tajo.master.QueryInfo; import org.apache.tajo.worker.TaskHistory; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 208591f..89c3404 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -24,8 +24,8 @@ import com.google.common.collect.Lists; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.Repartitioner; +import org.apache.tajo.querymaster.Task; +import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.util.TUtil; import java.net.URI; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 8944eae..8241478 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -38,9 +38,9 @@ import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.StageContainerAllocationEvent; -import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.Stage; -import org.apache.tajo.master.querymaster.StageState; +import org.apache.tajo.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.Stage; +import org.apache.tajo.querymaster.StageState; import org.apache.tajo.master.rm.*; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 4d96529..09a87e0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -38,9 +38,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.ha.TajoMasterInfo; -import org.apache.tajo.master.querymaster.QueryMaster; -import org.apache.tajo.master.querymaster.QueryMasterManagerService; +import org.apache.tajo.ha.TajoMasterInfo; +import org.apache.tajo.querymaster.QueryMaster; +import org.apache.tajo.querymaster.QueryMasterManagerService; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 1c83110..2ae4bed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -31,7 +31,7 @@ import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.QueryMasterClientProtocol; -import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/tajo-default.xml ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml index db92b02..4a92e72 100644 --- a/tajo-core/src/main/resources/tajo-default.xml +++ b/tajo-core/src/main/resources/tajo-default.xml @@ -39,7 +39,7 @@ <property> <name>tajo.querymaster.task-scheduler</name> - <value>org.apache.tajo.master.DefaultTaskScheduler</value> + <value>org.apache.tajo.querymaster.DefaultTaskScheduler</value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/catalogview.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp index 8f1d1bc..bc770d7 100644 --- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp +++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp @@ -24,7 +24,7 @@ <%@ page import="org.apache.tajo.catalog.TableDesc" %> <%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.master.ha.HAService" %> +<%@ page import="org.apache.tajo.ha.HAService" %> <%@ page import="org.apache.tajo.util.FileUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.Collection" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index 6fe21a2..1fb5e40 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -21,8 +21,8 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %> -<%@ page import="org.apache.tajo.master.ha.HAService" %> -<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.ha.HAService" %> +<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerResource" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 6778725..00186d7 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -23,9 +23,9 @@ <%@ page import="org.apache.tajo.conf.TajoConf" %> <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.master.ha.HAService" %> -<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %> +<%@ page import="org.apache.tajo.ha.HAService" %> +<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.util.NetUtils" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 5afb3b2..4d8e5e6 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -20,7 +20,7 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %> +<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.StringUtils" %> @@ -28,7 +28,7 @@ <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryInfo" %> +<%@ page import="org.apache.tajo.master.QueryInfo" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/query_executor.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp index 9ff6625..82836ac 100644 --- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp @@ -19,7 +19,7 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.master.ha.HAService" %> +<%@ page import="org.apache.tajo.ha.HAService" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp index 866d663..bb72f9e 100644 --- a/tajo-core/src/main/resources/webapps/worker/index.jsp +++ b/tajo-core/src/main/resources/webapps/worker/index.jsp @@ -19,8 +19,8 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="org.apache.tajo.master.querymaster.Query" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> +<%@ page import="org.apache.tajo.querymaster.Query" %> +<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.TajoWorker" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/querydetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp index 340eb95..56bdeba 100644 --- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp @@ -20,8 +20,8 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.QueryId" %> -<%@ page import="org.apache.tajo.master.querymaster.Query" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> +<%@ page import="org.apache.tajo.querymaster.Query" %> +<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/queryplan.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp index 88de97d..878efe3 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp @@ -21,11 +21,11 @@ <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.*" %> -<%@ page import="org.apache.tajo.master.querymaster.Query" %> +<%@ page import="org.apache.tajo.querymaster.Query" %> <%@ page import="org.apache.tajo.QueryId" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.master.querymaster.Stage" %> +<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> +<%@ page import="org.apache.tajo.querymaster.Stage" %> <%@ page import="org.apache.tajo.engine.planner.global.ExecutionBlock" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.ExecutionBlockId" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index 3aef49d..6d0e3a2 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -25,7 +25,7 @@ <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> <%@ page import="org.apache.tajo.plan.util.PlannerUtil" %> <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %> -<%@ page import="org.apache.tajo.master.querymaster.*" %> +<%@ page import="org.apache.tajo.querymaster.*" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.TajoWorker" %> <%@ page import="java.text.NumberFormat" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp index 81b1e6d..17e884a 100644 --- a/tajo-core/src/main/resources/webapps/worker/task.jsp +++ b/tajo-core/src/main/resources/webapps/worker/task.jsp @@ -25,10 +25,10 @@ <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %> <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> <%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> -<%@ page import="org.apache.tajo.master.querymaster.Query" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.master.querymaster.Task" %> -<%@ page import="org.apache.tajo.master.querymaster.Stage" %> +<%@ page import="org.apache.tajo.querymaster.Query" %> +<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> +<%@ page import="org.apache.tajo.querymaster.Task" %> +<%@ page import="org.apache.tajo.querymaster.Stage" %> <%@ page import="org.apache.tajo.storage.DataLocation" %> <%@ page import="org.apache.tajo.storage.fragment.FileFragment" %> <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 8bee6fb..e464446 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -35,7 +35,7 @@ import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.Session; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 0d2f6fa..0786912 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -44,10 +44,10 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.querymaster.*; -import org.apache.tajo.master.querymaster.Query; -import org.apache.tajo.master.querymaster.Stage; -import org.apache.tajo.master.querymaster.StageState; +import org.apache.tajo.querymaster.*; +import org.apache.tajo.querymaster.Query; +import org.apache.tajo.querymaster.Stage; +import org.apache.tajo.querymaster.StageState; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider; import org.apache.tajo.util.CommonTestingUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 889d61c..0b59bc7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -37,7 +37,7 @@ import org.apache.tajo.engine.function.builtin.SumInt; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.Session; import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 6db76ae..d3ab1fd 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -44,7 +44,7 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.session.Session; +import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index 1a212b0..d1756e1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -24,10 +24,10 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.master.querymaster.Query; -import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.Stage; +import org.apache.tajo.querymaster.Query; +import org.apache.tajo.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.Task; +import org.apache.tajo.querymaster.Stage; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index 68b3fb3..39b58d0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -32,7 +32,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.jdbc.TajoResultSet; -import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.storage.*; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 3400752..cacef96 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -38,7 +38,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.TajoResultSet; -import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.CommonTestingUtil;
