http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index f9c752b..208591f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -24,9 +24,8 @@ import com.google.common.collect.Lists; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.Repartitioner; -import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.util.TUtil; import java.net.URI; @@ -39,7 +38,7 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; * <code>FetchImpl</code> information to indicate the locations of intermediate data. */ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cloneable { - private QueryUnit.PullHost host; // The pull server host information + private Task.PullHost host; // The pull server host information private ShuffleType type; // hash or range partition method. private ExecutionBlockId executionBlockId; // The executionBlock id private int partitionId; // The hash partition id @@ -59,7 +58,7 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl } public FetchImpl(TajoWorkerProtocol.FetchProto proto) { - this(new QueryUnit.PullHost(proto.getHost(), proto.getPort()), + this(new Task.PullHost(proto.getHost(), proto.getPort()), proto.getType(), new ExecutionBlockId(proto.getExecutionBlockId()), proto.getPartitionId(), @@ -77,22 +76,22 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl } } - public FetchImpl(QueryUnit.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, int partitionId) { this(host, type, executionBlockId, partitionId, null, false, null, new ArrayList<Integer>(), new ArrayList<Integer>()); } - public FetchImpl(QueryUnit.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, - int partitionId, List<QueryUnit.IntermediateEntry> intermediateEntryList) { + public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + int partitionId, List<Task.IntermediateEntry> intermediateEntryList) { this(host, type, executionBlockId, partitionId, null, false, null, new ArrayList<Integer>(), new ArrayList<Integer>()); - for (QueryUnit.IntermediateEntry entry : intermediateEntryList){ + for (Task.IntermediateEntry entry : intermediateEntryList){ addPart(entry.getTaskId(), entry.getAttemptId()); } } - public FetchImpl(QueryUnit.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, int partitionId, String rangeParams, boolean hasNext, String name, List<Integer> taskIds, List<Integer> attemptIds) { this.host = host; @@ -142,7 +141,7 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl this.attemptIds.add(attemptId); } - public QueryUnit.PullHost getPullHost() { + public Task.PullHost getPullHost() { return this.host; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java index 42ad875..5b2ad0f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java @@ -27,7 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.http.HttpRequest; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskId; import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; import org.apache.tajo.worker.dataserver.retriever.DataRetriever; import org.apache.tajo.worker.dataserver.retriever.FileChunk; @@ -41,13 +41,13 @@ import java.util.Set; @Deprecated public class InterDataRetriever implements DataRetriever { private final Log LOG = LogFactory.getLog(InterDataRetriever.class); - private final Set<QueryUnitId> registered = Sets.newHashSet(); + private final Set<TaskId> registered = Sets.newHashSet(); private final Map<String, String> map = Maps.newConcurrentMap(); public InterDataRetriever() { } - public void register(QueryUnitId id, String baseURI) { + public void register(TaskId id, String baseURI) { synchronized (registered) { if (!registered.contains(id)) { map.put(id.toString(), baseURI); @@ -56,7 +56,7 @@ public class InterDataRetriever implements DataRetriever { } } - public void unregister(QueryUnitId id) { + public void unregister(TaskId id) { synchronized (registered) { if (registered.contains(id)) { map.remove(id.toString()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 48f4f66..4a09772 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; @@ -104,7 +104,7 @@ public class TajoWorkerManagerService extends CompositeService @Override public void ping(RpcController controller, - TajoIdProtos.QueryUnitAttemptIdProto attemptId, + TajoIdProtos.TaskAttemptIdProto attemptId, RpcCallback<PrimitiveProtos.BoolProto> done) { done.run(TajoWorker.TRUE_PROTO); } @@ -146,9 +146,9 @@ public class TajoWorkerManagerService extends CompositeService } @Override - public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request, + public void killTaskAttempt(RpcController controller, TajoIdProtos.TaskAttemptIdProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { - Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)); + Task task = workerContext.getTaskRunnerManager().getTaskByTaskAttemptId(new TaskAttemptId(request)); if(task != null) task.kill(); done.run(TajoWorker.TRUE_PROTO); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 00eabcc..0920619 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.Schema; @@ -44,7 +44,7 @@ import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.engine.query.QueryUnitRequest; +import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; @@ -63,7 +63,6 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.URI; -import java.text.NumberFormat; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; @@ -78,11 +77,11 @@ public class Task { private final TajoConf systemConf; private final QueryContext queryContext; private final ExecutionBlockContext executionBlockContext; - private final QueryUnitAttemptId taskId; + private final TaskAttemptId taskId; private final String taskRunnerId; private final Path taskDir; - private final QueryUnitRequest request; + private final TaskRequest request; private TaskAttemptContext context; private List<Fetcher> fetcherRunners; private LogicalNode plan; @@ -106,9 +105,9 @@ public class Task { public Task(String taskRunnerId, Path baseDir, - QueryUnitAttemptId taskId, + TaskAttemptId taskId, final ExecutionBlockContext executionBlockContext, - final QueryUnitRequest request) throws IOException { + final TaskRequest request) throws IOException { this.taskRunnerId = taskRunnerId; this.request = request; this.taskId = taskId; @@ -117,7 +116,7 @@ public class Task { this.queryContext = request.getQueryContext(systemConf); this.executionBlockContext = executionBlockContext; this.taskDir = StorageUtil.concatPath(baseDir, - taskId.getQueryUnitId().getId() + "_" + taskId.getId()); + taskId.getTaskId().getId() + "_" + taskId.getId()); this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); @@ -207,7 +206,7 @@ public class Task { } } - public QueryUnitAttemptId getTaskId() { + public TaskAttemptId getTaskId() { return taskId; } @@ -215,11 +214,11 @@ public class Task { return LOG; } - public void localize(QueryUnitRequest request) throws IOException { + public void localize(TaskRequest request) throws IOException { fetcherRunners = getFetchRunners(context, request.getFetches()); } - public QueryUnitAttemptId getId() { + public TaskAttemptId getId() { return context.getTaskId(); } @@ -272,7 +271,7 @@ public class Task { executionBlockContext.getTasks().remove(this.getId()); } } else { - LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState()); + LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); } } @@ -626,7 +625,7 @@ public class Task { if (retryNum == maxRetryNum) { LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); } - aborted = true; // retry queryUnit + aborted = true; // retry task ctx.getFetchLatch().countDown(); } } @@ -830,10 +829,10 @@ public class Task { return ret; } - public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { + public static Path getTaskAttemptDir(TaskAttemptId quid) { Path workDir = - StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()), - String.valueOf(quid.getQueryUnitId().getId()), + StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), + String.valueOf(quid.getTaskId().getId()), String.valueOf(quid.getId())); return workDir; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 99976d8..1556a44 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -24,7 +24,7 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; @@ -59,7 +59,7 @@ public class TaskAttemptContext { private TaskAttemptState state; private TableStats resultStats; - private QueryUnitAttemptId queryId; + private TaskAttemptId queryId; private final Path workDir; private boolean needFetch = false; private CountDownLatch doneFetchPhaseSignal; @@ -84,7 +84,7 @@ public class TaskAttemptContext { private HashShuffleAppenderManager hashShuffleAppenderManager; public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext, - final QueryUnitAttemptId queryId, + final TaskAttemptId queryId, final FragmentProto[] fragments, final Path workDir) { this.queryContext = queryContext; @@ -127,7 +127,7 @@ public class TaskAttemptContext { } @VisibleForTesting - public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId, + public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId queryId, final Fragment [] fragments, final Path workDir) { this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir); } @@ -306,7 +306,7 @@ public class TaskAttemptContext { return this.workDir; } - public QueryUnitAttemptId getTaskId() { + public TaskAttemptId getTaskId() { return this.queryId; } @@ -396,7 +396,7 @@ public class TaskAttemptContext { return queryContext; } - public QueryUnitAttemptId getQueryId() { + public TaskAttemptId getQueryId() { return queryId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java index 9b6fd0d..c2432eb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java @@ -20,7 +20,7 @@ package org.apache.tajo.worker; import com.google.common.base.Objects; import com.google.common.collect.Lists; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.util.history.History; @@ -37,7 +37,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; */ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History { - private QueryUnitAttemptId queryUnitAttemptId; + private TaskAttemptId taskAttemptId; private TaskAttemptState state; private float progress; private long startTime; @@ -51,10 +51,10 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History { private int totalFetchCount; private List<FetcherHistoryProto> fetcherHistories; - public TaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskAttemptState state, float progress, + public TaskHistory(TaskAttemptId taskAttemptId, TaskAttemptState state, float progress, long startTime, long finishTime, CatalogProtos.TableStatsProto inputStats) { init(); - this.queryUnitAttemptId = queryUnitAttemptId; + this.taskAttemptId = taskAttemptId; this.state = state; this.progress = progress; this.startTime = startTime; @@ -63,7 +63,7 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History { } public TaskHistory(TaskHistoryProto proto) { - this.queryUnitAttemptId = new QueryUnitAttemptId(proto.getQueryUnitAttemptId()); + this.taskAttemptId = new TaskAttemptId(proto.getTaskAttemptId()); this.state = proto.getState(); this.progress = proto.getProgress(); this.startTime = proto.getStartTime(); @@ -99,7 +99,7 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History { @Override public int hashCode() { - return Objects.hashCode(queryUnitAttemptId, state); + return Objects.hashCode(taskAttemptId, state); } @Override @@ -114,7 +114,7 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History { @Override public TaskHistoryProto getProto() { TaskHistoryProto.Builder builder = TaskHistoryProto.newBuilder(); - builder.setQueryUnitAttemptId(queryUnitAttemptId.getProto()); + builder.setTaskAttemptId(taskAttemptId.getProto()); builder.setState(state); builder.setProgress(progress); builder.setStartTime(startTime); @@ -158,8 +158,8 @@ public class TaskHistory implements ProtoObject<TaskHistoryProto>, History { fetcherHistories.add(fetcherHistory); } - public QueryUnitAttemptId getQueryUnitAttemptId() { - return queryUnitAttemptId; + public TaskAttemptId getTaskAttemptId() { + return taskAttemptId; } public TaskAttemptState getState() { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 4e9860b..f0da0cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -25,10 +25,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.engine.query.QueryUnitRequestImpl; +import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.container.impl.pb.TajoContainerIdPBImpl; @@ -42,7 +42,7 @@ import java.util.concurrent.*; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; /** - * The driver class for Tajo QueryUnit processing. + * The driver class for Tajo Task processing. */ public class TaskRunner extends AbstractService { /** class logger */ @@ -165,7 +165,7 @@ public class TaskRunner extends AbstractService { } static void fatalError(QueryMasterProtocolService.Interface qmClientService, - QueryUnitAttemptId taskAttemptId, String message) { + TaskAttemptId taskAttemptId, String message) { if (message == null) { message = "No error message"; } @@ -185,8 +185,8 @@ public class TaskRunner extends AbstractService { @Override public void run() { int receivedNum = 0; - CallFuture<QueryUnitRequestProto> callFuture = null; - QueryUnitRequestProto taskRequest = null; + CallFuture<TaskRequestProto> callFuture = null; + TaskRequestProto taskRequest = null; while(!stopped) { QueryMasterProtocolService.Interface qmClientService; @@ -207,7 +207,7 @@ public class TaskRunner extends AbstractService { try { if (callFuture == null) { - callFuture = new CallFuture<QueryUnitRequestProto>(); + callFuture = new CallFuture<TaskRequestProto>(); LOG.info("Request GetTask: " + getId()); GetTaskRequestProto request = GetTaskRequestProto.newBuilder() .setExecutionBlockId(getExecutionBlockId().getProto()) @@ -254,7 +254,7 @@ public class TaskRunner extends AbstractService { getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); LOG.info("Accumulated Received Task: " + (++receivedNum)); - QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId()); + TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); if (getContext().getTasks().containsKey(taskAttemptId)) { LOG.error("Duplicate Task Attempt: " + taskAttemptId); fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId); @@ -265,7 +265,7 @@ public class TaskRunner extends AbstractService { Task task; try { task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext, - new QueryUnitRequestImpl(taskRequest)); + new TaskRequestImpl(taskRequest)); getContext().getTasks().put(taskAttemptId, task); task.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java index 364348f..5c97ba8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java @@ -22,7 +22,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Maps; import org.apache.hadoop.service.Service; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.container.TajoConverterUtils; @@ -43,7 +43,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { private long startTime; private long finishTime; private ExecutionBlockId executionBlockId; - private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null; + private Map<TaskAttemptId, TaskHistory> taskHistoryMap = null; public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) { init(); @@ -60,7 +60,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { this.taskHistoryMap = Maps.newTreeMap(); for (TaskHistoryProto taskHistoryProto : proto.getTaskHistoriesList()) { TaskHistory taskHistory = new TaskHistory(taskHistoryProto); - taskHistoryMap.put(taskHistory.getQueryUnitAttemptId(), taskHistory); + taskHistoryMap.put(taskHistory.getTaskAttemptId(), taskHistory); } } @@ -137,15 +137,15 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> { this.containerId = containerId; } - public TaskHistory getTaskHistory(QueryUnitAttemptId queryUnitAttemptId) { - return taskHistoryMap.get(queryUnitAttemptId); + public TaskHistory getTaskHistory(TaskAttemptId taskAttemptId) { + return taskHistoryMap.get(taskAttemptId); } - public Map<QueryUnitAttemptId, TaskHistory> getTaskHistoryMap() { + public Map<TaskAttemptId, TaskHistory> getTaskHistoryMap() { return Collections.unmodifiableMap(taskHistoryMap); } - public void addTaskHistory(QueryUnitAttemptId queryUnitAttemptId, TaskHistory taskHistory) { - taskHistoryMap.put(queryUnitAttemptId, taskHistory); + public void addTaskHistory(TaskAttemptId taskAttemptId, TaskHistory taskHistory) { + taskHistoryMap.put(taskAttemptId, taskHistory); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index a06e6e2..f837b11 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -27,7 +27,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.utils.TupleCache; import org.apache.tajo.worker.event.TaskRunnerEvent; @@ -137,15 +137,15 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< return taskRunnerMap.get(taskRunnerId); } - public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId queryUnitAttemptId) { - ExecutionBlockContext context = executionBlockContextMap.get(queryUnitAttemptId.getQueryUnitId().getExecutionBlockId()); + public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) { + ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId()); if (context != null) { - return context.getTask(queryUnitAttemptId); + return context.getTask(taskAttemptId); } return null; } - public TaskHistory getTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) { + public TaskHistory getTaskHistoryByTaskAttemptId(TaskAttemptId quAttemptId) { synchronized (taskRunnerHistoryMap) { for (TaskRunnerHistory history : taskRunnerHistoryMap.values()) { TaskHistory taskHistory = history.getTaskHistory(quAttemptId); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java index 2ef0c4c..9c15d0c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java @@ -23,8 +23,8 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; import org.jboss.netty.channel.ChannelHandlerContext; @@ -46,7 +46,7 @@ public class AdvancedDataRetriever implements DataRetriever { public AdvancedDataRetriever() { } - public void register(QueryUnitAttemptId id, RetrieverHandler handler) { + public void register(TaskAttemptId id, RetrieverHandler handler) { synchronized (handlerMap) { if (!handlerMap.containsKey(id.toString())) { handlerMap.put(id.toString(), handler); @@ -54,7 +54,7 @@ public class AdvancedDataRetriever implements DataRetriever { } } - public void unregister(QueryUnitAttemptId id) { + public void unregister(TaskAttemptId id) { synchronized (handlerMap) { if (handlerMap.containsKey(id.toString())) { handlerMap.remove(id.toString()); @@ -82,8 +82,8 @@ public class AdvancedDataRetriever implements DataRetriever { for (String qid : qids) { String[] ids = qid.split("_"); ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0)); - QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0])); - QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, + TaskId quid = new TaskId(suid, Integer.parseInt(ids[0])); + TaskAttemptId attemptId = new TaskAttemptId(quid, Integer.parseInt(ids[1])); RetrieverHandler handler = handlerMap.get(attemptId.toString()); FileChunk chunk = handler.get(params); @@ -115,7 +115,7 @@ public class AdvancedDataRetriever implements DataRetriever { private List<String> splitMaps(List<String> qids) { if (null == qids) { - LOG.error("QueryUnitId is EMPTY"); + LOG.error("QueryId is EMPTY"); return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/proto/QueryMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto index 494d296..ae20309 100644 --- a/tajo-core/src/main/proto/QueryMasterProtocol.proto +++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto @@ -33,7 +33,7 @@ package hadoop.yarn; service QueryMasterProtocolService { //from Worker - rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto); + rpc getTask(GetTaskRequestProto) returns (TaskRequestProto); rpc statusUpdate (TaskStatusProto) returns (BoolProto); rpc ping (ExecutionBlockIdProto) returns (BoolProto); rpc fatalError(TaskFatalErrorReport) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index 989b0e3..5acbcd9 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -41,7 +41,7 @@ message SessionProto { } message TaskStatusProto { - required QueryUnitAttemptIdProto id = 1; + required TaskAttemptIdProto id = 1; required string workerName = 2; required float progress = 3; required TaskAttemptState state = 4; @@ -52,7 +52,7 @@ message TaskStatusProto { } message TaskCompletionReport { - required QueryUnitAttemptIdProto id = 1; + required TaskAttemptIdProto id = 1; optional StatSetProto stats = 2; optional TableStatsProto inputStats = 3; optional TableStatsProto resultStats = 4; @@ -60,13 +60,13 @@ message TaskCompletionReport { } message TaskFatalErrorReport { - required QueryUnitAttemptIdProto id = 1; + required TaskAttemptIdProto id = 1; optional string errorMessage = 2; optional string errorTrace = 3; } -message QueryUnitRequestProto { - required QueryUnitAttemptIdProto id = 1; +message TaskRequestProto { + required TaskAttemptIdProto id = 1; repeated FragmentProto fragments = 2; required string outputTable = 3; required bool clusteredOutput = 4; @@ -126,7 +126,7 @@ message ExecutionBlockReport { repeated IntermediateEntryProto intermediateEntries = 5; } -message QueryUnitResponseProto { +message TaskResponseProto { required string id = 1; required QueryState status = 2; } @@ -135,7 +135,7 @@ message StatusReportProto { required int64 timestamp = 1; required string serverName = 2; repeated TaskStatusProto status = 3; - repeated QueryUnitAttemptIdProto pings = 4; + repeated TaskAttemptIdProto pings = 4; } message CommandRequestProto { @@ -146,7 +146,7 @@ message CommandResponseProto { } message Command { - required QueryUnitAttemptIdProto id = 1; + required TaskAttemptIdProto id = 1; required CommandType type = 2; } @@ -208,12 +208,12 @@ message ExecutionBlockListProto { } service TajoWorkerProtocolService { - rpc ping (QueryUnitAttemptIdProto) returns (BoolProto); + rpc ping (TaskAttemptIdProto) returns (BoolProto); // from QueryMaster(Worker) rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto); rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto); - rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto); + rpc killTaskAttempt(TaskAttemptIdProto) returns (BoolProto); rpc cleanup(QueryIdProto) returns (BoolProto); rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto); } @@ -336,7 +336,7 @@ message FetcherHistoryProto { } message TaskHistoryProto { - required QueryUnitAttemptIdProto queryUnitAttemptId = 1; + required TaskAttemptIdProto taskAttemptId = 1; required TaskAttemptState state = 2; required float progress = 3; required int64 startTime = 4; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/admin/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp index 7a23157..ed97eff 100644 --- a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp @@ -31,7 +31,7 @@ <%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="java.util.*" %> -<%@ page import="org.apache.tajo.util.history.QueryUnitHistory" %> +<%@ page import="org.apache.tajo.util.history.TaskHistory" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -100,15 +100,15 @@ totalWriteRows = subQuery.getTotalWriteRows(); } - List<QueryUnitHistory> allQueryUnits = reader.getQueryUnitHistory(queryId, ebId); - int numTasks = allQueryUnits.size(); + List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId); + int numTasks = allTasks.size(); int numShuffles = 0; float totalProgress = 0.0f; - if (allQueryUnits != null) { - for(QueryUnitHistory eachQueryUnit: allQueryUnits) { - totalProgress += eachQueryUnit.getProgress(); - numShuffles = eachQueryUnit.getNumShuffles(); + if (allTasks != null) { + for(TaskHistory eachTask: allTasks) { + totalProgress += eachTask.getProgress(); + numShuffles = eachTask.getNumShuffles(); } } @@ -187,53 +187,53 @@ <input type="hidden" name="startTime" value="<%=startTime%>"/> </form> <% - List<QueryUnitHistory> filteredQueryUnit = new ArrayList<QueryUnitHistory>(); - for(QueryUnitHistory eachQueryUnit: allQueryUnits) { + List<TaskHistory> filteredTasks = new ArrayList<TaskHistory>(); + for(TaskHistory eachTask: allTasks) { if (!"ALL".equals(status)) { - if (!status.equals(eachQueryUnit.getState().toString())) { + if (!status.equals(eachTask.getState().toString())) { continue; } } - filteredQueryUnit.add(eachQueryUnit); + filteredTasks.add(eachTask); } - JSPUtil.sortQueryUnitHistory(filteredQueryUnit, sort, sortOrder); - List<QueryUnitHistory> queryUnits = JSPUtil.getPageNavigationList(filteredQueryUnit, currentPage, pageSize); + JSPUtil.sortTaskHistory(filteredTasks, sort, sortOrder); + List<TaskHistory> tasks = JSPUtil.getPageNavigationList(filteredTasks, currentPage, pageSize); - int numOfQueryUnits = filteredQueryUnit.size(); - int totalPage = numOfQueryUnits % pageSize == 0 ? - numOfQueryUnits / pageSize : numOfQueryUnits / pageSize + 1; + int numOfTasks = filteredTasks.size(); + int totalPage = numOfTasks % pageSize == 0 ? + numOfTasks / pageSize : numOfTasks / pageSize + 1; %> - <div align="right"># Tasks: <%=numOfQueryUnits%> / # Pages: <%=totalPage%></div> + <div align="right"># Tasks: <%=numOfTasks%> / # Pages: <%=totalPage%></div> <table border="1" width="100%" class="border_table"> <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr> <% int rowNo = (currentPage - 1) * pageSize + 1; - for (QueryUnitHistory eachQueryUnit: queryUnits) { - String queryUnitDetailUrl = ""; - if (eachQueryUnit.getId() != null) { - queryUnitDetailUrl = "queryunit.jsp?queryId=" + queryId + "&ebid=" + ebId + "&startTime=" + startTime + - "&queryUnitAttemptId=" + eachQueryUnit.getId() + "&sort=" + sort + "&sortOrder=" + sortOrder; + for (TaskHistory eachTask: tasks) { + String taskDetailUrl = ""; + if (eachTask.getId() != null) { + taskDetailUrl = "task.jsp?queryId=" + queryId + "&ebid=" + ebId + "&startTime=" + startTime + + "&taskAttemptId=" + eachTask.getId() + "&sort=" + sort + "&sortOrder=" + sortOrder; } - String queryUnitHost = eachQueryUnit.getHostAndPort() == null ? "-" : eachQueryUnit.getHostAndPort(); - if (eachQueryUnit.getHostAndPort() != null) { - Worker worker = workerMap.get(eachQueryUnit.getHostAndPort()); + String taskHost = eachTask.getHostAndPort() == null ? "-" : eachTask.getHostAndPort(); + if (eachTask.getHostAndPort() != null) { + Worker worker = workerMap.get(eachTask.getHostAndPort()); if (worker != null) { - String[] hostTokens = eachQueryUnit.getHostAndPort().split(":"); - queryUnitHost = "<a href='http://" + hostTokens[0] + ":" + worker.getConnectionInfo().getHttpInfoPort() + - "/taskhistory.jsp?queryUnitAttemptId=" + eachQueryUnit.getId() + "&startTime=" + eachQueryUnit.getLaunchTime() + - "'>" + eachQueryUnit.getHostAndPort() + "</a>"; + String[] hostTokens = eachTask.getHostAndPort().split(":"); + taskHost = "<a href='http://" + hostTokens[0] + ":" + worker.getConnectionInfo().getHttpInfoPort() + + "/taskhistory.jsp?taskAttemptId=" + eachTask.getId() + "&startTime=" + eachTask.getLaunchTime() + + "'>" + eachTask.getHostAndPort() + "</a>"; } } %> <tr> <td><%=rowNo%></td> - <td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td> - <td><%=eachQueryUnit.getState()%></td> - <td><%=JSPUtil.percentFormat(eachQueryUnit.getProgress())%>%</td> - <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td> - <td align='right'><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td> - <td><%=queryUnitHost%></td> + <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td> + <td><%=eachTask.getState()%></td> + <td><%=JSPUtil.percentFormat(eachTask.getProgress())%>%</td> + <td><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> + <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td> + <td><%=taskHost%></td> </tr> <% rowNo++; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/admin/queryunit.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/queryunit.jsp b/tajo-core/src/main/resources/webapps/admin/queryunit.jsp deleted file mode 100644 index 697469f..0000000 --- a/tajo-core/src/main/resources/webapps/admin/queryunit.jsp +++ /dev/null @@ -1,134 +0,0 @@ -<% - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -%> -<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> - -<%@ page import="org.apache.tajo.util.JSPUtil" %> -<%@ page import="org.apache.tajo.util.TajoIdUtils" %> -<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="java.text.SimpleDateFormat" %> -<%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.util.history.HistoryReader" %> -<%@ page import="org.apache.tajo.util.history.QueryUnitHistory" %> -<%@ page import="java.util.List" %> - -<% - TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - HistoryReader reader = master.getContext().getHistoryReader(); - - String queryId = request.getParameter("queryId"); - String ebId = request.getParameter("ebid"); - - String status = request.getParameter("status"); - if(status == null || status.isEmpty() || "null".equals(status)) { - status = "ALL"; - } - - String queryUnitAttemptId = request.getParameter("queryUnitAttemptId"); - - List<QueryUnitHistory> allQueryUnits = reader.getQueryUnitHistory(queryId, ebId); - - QueryUnitHistory queryUnit = null; - for(QueryUnitHistory eachQueryUnit: allQueryUnits) { - if (eachQueryUnit.getId().equals(queryUnitAttemptId)) { - queryUnit = eachQueryUnit; - break; - } - } - - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String backUrl = request.getHeader("referer"); -%> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<head> - <link rel="stylesheet" type="text/css" href="/static/style.css"/> - <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> - <title>Query Unit Detail</title> -</head> -<body> -<% - if (queryUnit == null) { -%> - <div>No QueryUnit history.</div> - <div><a href="<%=backUrl%>">Back</a></div> -<% - return; - } - - String fragmentInfo = ""; - String delim = ""; - - for (String eachFragment : queryUnit.getFragments()) { - fragmentInfo += delim + eachFragment; - delim = "<br/>"; - } - - String fetchInfo = ""; - delim = ""; - String previousKey = null; - for (String[] e : queryUnit.getFetchs()) { - if (previousKey == null || !previousKey.equals(e[0])) { - fetchInfo += delim + "<b>" + e[0] + "</b>"; - } - delim = "<br/>"; - fetchInfo += delim + e[1]; - - previousKey = e[0]; - } - - String dataLocationInfos = ""; - delim = ""; - for (String eachLocation: queryUnit.getDataLocations()) { - dataLocationInfos += delim + eachLocation.toString(); - delim = "<br/>"; - } - - int numShuffles = queryUnit.getNumShuffles(); - String shuffleKey = "-"; - String shuffleFileName = "-"; - if(numShuffles > 0) { - shuffleKey = queryUnit.getShuffleKey(); - shuffleFileName = queryUnit.getShuffleFileName(); - } -%> - - -<%@ include file="header.jsp"%> -<div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> - <hr/> - <h3><a href='<%=backUrl%>'><%=ebId%></a></h3> - <hr/> - <table border="1" width="100%" class="border_table"> - <tr><td width="200" align="right">ID</td><td><%=queryUnit.getId()%></td></tr> - <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(queryUnit.getProgress())%>%</td></tr> - <tr><td align="right">State</td><td><%=queryUnit.getState()%></td></tr> - <tr><td align="right">Launch Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : df.format(queryUnit.getLaunchTime())%></td></tr> - <tr><td align="right">Finish Time</td><td><%=queryUnit.getFinishTime() == 0 ? "-" : df.format(queryUnit.getFinishTime())%></td></tr> - <tr><td align="right">Running Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : queryUnit.getRunningTime() + " ms"%></td></tr> - <tr><td align="right">Host</td><td><%=queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort()%></td></tr> - <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr> - <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr> - <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr> - <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr> - </table> -</div> -</body> -</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/admin/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/task.jsp b/tajo-core/src/main/resources/webapps/admin/task.jsp new file mode 100644 index 0000000..1530572 --- /dev/null +++ b/tajo-core/src/main/resources/webapps/admin/task.jsp @@ -0,0 +1,134 @@ +<% + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +%> +<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> + +<%@ page import="org.apache.tajo.util.JSPUtil" %> +<%@ page import="org.apache.tajo.util.TajoIdUtils" %> +<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> +<%@ page import="java.text.SimpleDateFormat" %> +<%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.util.history.HistoryReader" %> +<%@ page import="org.apache.tajo.util.history.TaskHistory" %> +<%@ page import="java.util.List" %> + +<% + TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + HistoryReader reader = master.getContext().getHistoryReader(); + + String queryId = request.getParameter("queryId"); + String ebId = request.getParameter("ebid"); + + String status = request.getParameter("status"); + if(status == null || status.isEmpty() || "null".equals(status)) { + status = "ALL"; + } + + String taskAttemptId = request.getParameter("taskAttemptId"); + + List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId); + + TaskHistory task = null; + for(TaskHistory eachTask: allTasks) { + if (eachTask.getId().equals(taskAttemptId)) { + task = eachTask; + break; + } + } + + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String backUrl = request.getHeader("referer"); +%> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<head> + <link rel="stylesheet" type="text/css" href="/static/style.css"/> + <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> + <title>Query Unit Detail</title> +</head> +<body> +<% + if (task == null) { +%> + <div>No Task history.</div> + <div><a href="<%=backUrl%>">Back</a></div> +<% + return; + } + + String fragmentInfo = ""; + String delim = ""; + + for (String eachFragment : task.getFragments()) { + fragmentInfo += delim + eachFragment; + delim = "<br/>"; + } + + String fetchInfo = ""; + delim = ""; + String previousKey = null; + for (String[] e : task.getFetchs()) { + if (previousKey == null || !previousKey.equals(e[0])) { + fetchInfo += delim + "<b>" + e[0] + "</b>"; + } + delim = "<br/>"; + fetchInfo += delim + e[1]; + + previousKey = e[0]; + } + + String dataLocationInfos = ""; + delim = ""; + for (String eachLocation: task.getDataLocations()) { + dataLocationInfos += delim + eachLocation.toString(); + delim = "<br/>"; + } + + int numShuffles = task.getNumShuffles(); + String shuffleKey = "-"; + String shuffleFileName = "-"; + if(numShuffles > 0) { + shuffleKey = task.getShuffleKey(); + shuffleFileName = task.getShuffleFileName(); + } +%> + + +<%@ include file="header.jsp"%> +<div class='contents'> + <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> + <hr/> + <h3><a href='<%=backUrl%>'><%=ebId%></a></h3> + <hr/> + <table border="1" width="100%" class="border_table"> + <tr><td width="200" align="right">ID</td><td><%=task.getId()%></td></tr> + <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(task.getProgress())%>%</td></tr> + <tr><td align="right">State</td><td><%=task.getState()%></td></tr> + <tr><td align="right">Launch Time</td><td><%=task.getLaunchTime() == 0 ? "-" : df.format(task.getLaunchTime())%></td></tr> + <tr><td align="right">Finish Time</td><td><%=task.getFinishTime() == 0 ? "-" : df.format(task.getFinishTime())%></td></tr> + <tr><td align="right">Running Time</td><td><%=task.getLaunchTime() == 0 ? "-" : task.getRunningTime() + " ms"%></td></tr> + <tr><td align="right">Host</td><td><%=task.getHostAndPort() == null ? "-" : task.getHostAndPort()%></td></tr> + <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr> + <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr> + <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr> + <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr> + </table> +</div> +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index f39f57c..265937c 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -21,7 +21,7 @@ <%@ page import="org.apache.tajo.ExecutionBlockId" %> <%@ page import="org.apache.tajo.QueryId" %> -<%@ page import="org.apache.tajo.QueryUnitAttemptId" %> +<%@ page import="org.apache.tajo.TaskAttemptId" %> <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> <%@ page import="org.apache.tajo.plan.util.PlannerUtil" %> <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %> @@ -97,30 +97,30 @@ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - QueryUnit[] allQueryUnits = subQuery.getQueryUnits(); + Task[] allTasks = subQuery.getTasks(); long totalInputBytes = 0; long totalReadBytes = 0; long totalReadRows = 0; long totalWriteBytes = 0; long totalWriteRows = 0; - int numTasks = allQueryUnits.length; + int numTasks = allTasks.length; // int numSucceededTasks = 0; // int localReadTasks = subQuery.; int numShuffles = 0; float totalProgress = 0.0f; - for(QueryUnit eachQueryUnit: allQueryUnits) { - totalProgress += eachQueryUnit.getLastAttempt() != null ? eachQueryUnit.getLastAttempt().getProgress(): 0.0f; - numShuffles = eachQueryUnit.getShuffleOutpuNum(); - if (eachQueryUnit.getLastAttempt() != null) { - TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats(); + for(Task eachTask : allTasks) { + totalProgress += eachTask.getLastAttempt() != null ? eachTask.getLastAttempt().getProgress(): 0.0f; + numShuffles = eachTask.getShuffleOutpuNum(); + if (eachTask.getLastAttempt() != null) { + TableStats inputStats = eachTask.getLastAttempt().getInputStats(); if (inputStats != null) { totalInputBytes += inputStats.getNumBytes(); totalReadBytes += inputStats.getReadBytes(); totalReadRows += inputStats.getNumRows(); } - TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats(); + TableStats outputStats = eachTask.getLastAttempt().getResultStats(); if (outputStats != null) { totalWriteBytes += outputStats.getNumBytes(); totalWriteRows += outputStats.getNumRows(); @@ -202,55 +202,55 @@ <input type="hidden" name="sortOrder" value="<%=sortOrder%>"/> </form> <% - List<QueryUnit> filteredQueryUnit = new ArrayList<QueryUnit>(); - for(QueryUnit eachQueryUnit: allQueryUnits) { + List<Task> filteredTask = new ArrayList<Task>(); + for(Task eachTask : allTasks) { if (!"ALL".equals(status)) { - if (!status.equals(eachQueryUnit.getLastAttemptStatus().toString())) { + if (!status.equals(eachTask.getLastAttemptStatus().toString())) { continue; } } - filteredQueryUnit.add(eachQueryUnit); + filteredTask.add(eachTask); } - JSPUtil.sortQueryUnit(filteredQueryUnit, sort, sortOrder); - List<QueryUnit> queryUnits = JSPUtil.getPageNavigationList(filteredQueryUnit, currentPage, pageSize); + JSPUtil.sortTasks(filteredTask, sort, sortOrder); + List<Task> tasks = JSPUtil.getPageNavigationList(filteredTask, currentPage, pageSize); - int numOfQueryUnits = filteredQueryUnit.size(); - int totalPage = numOfQueryUnits % pageSize == 0 ? - numOfQueryUnits / pageSize : numOfQueryUnits / pageSize + 1; + int numOfTasks = filteredTask.size(); + int totalPage = numOfTasks % pageSize == 0 ? + numOfTasks / pageSize : numOfTasks / pageSize + 1; int rowNo = (currentPage - 1) * pageSize + 1; %> - <div align="right"># Tasks: <%=numOfQueryUnits%> / # Pages: <%=totalPage%></div> + <div align="right"># Tasks: <%=numOfTasks%> / # Pages: <%=totalPage%></div> <table border="1" width="100%" class="border_table"> <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr> <% - for(QueryUnit eachQueryUnit: queryUnits) { - int queryUnitSeq = eachQueryUnit.getId().getId(); - String queryUnitDetailUrl = "queryunit.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + + for(Task eachTask : tasks) { + int taskSeq = eachTask.getId().getId(); + String taskDetailUrl = "task.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + "&page=" + currentPage + "&pageSize=" + pageSize + - "&queryUnitSeq=" + queryUnitSeq + "&sort=" + sort + "&sortOrder=" + sortOrder; + "&taskSeq=" + taskSeq + "&sort=" + sort + "&sortOrder=" + sortOrder; - String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost(); - if(eachQueryUnit.getSucceededHost() != null) { + String taskHost = eachTask.getSucceededHost() == null ? "-" : eachTask.getSucceededHost(); + if(eachTask.getSucceededHost() != null) { TajoMasterProtocol.WorkerResourceProto worker = - workerMap.get(eachQueryUnit.getLastAttempt().getWorkerConnectionInfo().getId()); + workerMap.get(eachTask.getLastAttempt().getWorkerConnectionInfo().getId()); if(worker != null) { - QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt(); + TaskAttempt lastAttempt = eachTask.getLastAttempt(); if(lastAttempt != null) { - QueryUnitAttemptId lastAttemptId = lastAttempt.getId(); - queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>"; + TaskAttemptId lastAttemptId = lastAttempt.getId(); + taskHost = "<a href='http://" + eachTask.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?taskAttemptId=" + lastAttemptId + "'>" + eachTask.getSucceededHost() + "</a>"; } } } %> <tr> <td><%=rowNo%></td> - <td><a href="<%=queryUnitDetailUrl%>"><%=eachQueryUnit.getId()%></a></td> - <td><%=eachQueryUnit.getLastAttemptStatus()%></td> - <td><%=JSPUtil.percentFormat(eachQueryUnit.getLastAttempt().getProgress())%>%</td> - <td><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : df.format(eachQueryUnit.getLaunchTime())%></td> - <td align='right'><%=eachQueryUnit.getLaunchTime() == 0 ? "-" : eachQueryUnit.getRunningTime() + " ms"%></td> - <td><%=queryUnitHost%></td> + <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td> + <td><%=eachTask.getLastAttemptStatus()%></td> + <td><%=JSPUtil.percentFormat(eachTask.getLastAttempt().getProgress())%>%</td> + <td><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> + <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td> + <td><%=taskHost%></td> </tr> <% rowNo++; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/queryunit.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp deleted file mode 100644 index 49635d1..0000000 --- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp +++ /dev/null @@ -1,175 +0,0 @@ -<% - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -%> -<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> - -<%@ page import="org.apache.tajo.ExecutionBlockId" %> -<%@ page import="org.apache.tajo.QueryId" %> -<%@ page import="org.apache.tajo.QueryUnitId" %> -<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %> -<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> -<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> -<%@ page import="org.apache.tajo.master.querymaster.Query" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.master.querymaster.QueryUnit" %> -<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %> -<%@ page import="org.apache.tajo.storage.DataLocation" %> -<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %> -<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %> -<%@ page import="org.apache.tajo.util.JSPUtil" %> -<%@ page import="org.apache.tajo.util.TajoIdUtils" %> -<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="org.apache.tajo.worker.FetchImpl" %> -<%@ page import="org.apache.tajo.worker.TajoWorker" %> -<%@ page import="java.net.URI" %> -<%@ page import="java.text.SimpleDateFormat" %> -<%@ page import="java.util.Map" %> -<%@ page import="java.util.Set" %> -<%@ page import="org.apache.tajo.storage.fragment.Fragment" %> - -<% - String paramQueryId = request.getParameter("queryId"); - String paramEbId = request.getParameter("ebid"); - String status = request.getParameter("status"); - if(status == null || status.isEmpty() || "null".equals(status)) { - status = "ALL"; - } - - QueryId queryId = TajoIdUtils.parseQueryId(paramQueryId); - ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(paramEbId); - - int queryUnitSeq = Integer.parseInt(request.getParameter("queryUnitSeq")); - TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() - .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true); - - if(queryMasterTask == null) { - out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>"); - return; - } - - Query query = queryMasterTask.getQuery(); - SubQuery subQuery = query.getSubQuery(ebid); - - if(subQuery == null) { - out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>"); - return; - } - - if(subQuery == null) { -%> -<script type="text/javascript"> - alert("No Execution Block for" + ebid); - document.history.back(); -</script> -<% - return; - } - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - QueryUnitId queryUnitId = new QueryUnitId(ebid, queryUnitSeq); - QueryUnit queryUnit = subQuery.getQueryUnit(queryUnitId); - if(queryUnit == null) { -%> -<script type="text/javascript"> - alert("No QueryUnit for" + queryUnitId); - document.history.back(); -</script> -<% - return; - } - - String sort = request.getParameter("sort"); - String sortOrder = request.getParameter("sortOrder"); - - String backUrl = "querytasks.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + "&sort=" + sort + "&sortOrder=" + sortOrder + "&status=" + status; - - String fragmentInfo = ""; - String delim = ""; - for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) { - Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment); - fragmentInfo += delim + fragment.toString(); - delim = "<br/>"; - } - - String fetchInfo = ""; - delim = ""; - for (Map.Entry<String, Set<FetchImpl>> e : queryUnit.getFetchMap().entrySet()) { - fetchInfo += delim + "<b>" + e.getKey() + "</b>"; - delim = "<br/>"; - for (FetchImpl f : e.getValue()) { - for (URI uri : f.getSimpleURIs()){ - fetchInfo += delim + uri; - } - } - } - - String dataLocationInfos = ""; - delim = ""; - for(DataLocation eachLocation: queryUnit.getDataLocations()) { - dataLocationInfos += delim + eachLocation.toString(); - delim = "<br/>"; - } - - int numShuffles = queryUnit.getShuffleOutpuNum(); - String shuffleKey = "-"; - String shuffleFileName = "-"; - if(numShuffles > 0) { - TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = queryUnit.getShuffleFileOutputs().get(0); - shuffleKey = "" + shuffleFileOutputs.getPartId(); - shuffleFileName = shuffleFileOutputs.getFileName(); - } - - //int numIntermediateData = queryUnit.getIntermediateData() == null ? 0 : queryUnit.getIntermediateData().size(); - TableStats inputStat = queryUnit.getLastAttempt().getInputStats(); - TableStats outputStat = queryUnit.getLastAttempt().getResultStats(); -%> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<head> - <link rel="stylesheet" type="text/css" href="/static/style.css"/> - <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> - <title>Query Unit Detail</title> -</head> -<body> -<%@ include file="header.jsp"%> -<div class='contents'> - <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2> - <hr/> - <h3><a href='<%=backUrl%>'><%=ebid.toString()%></a></h3> - <hr/> - <table border="1" width="100%" class="border_table"> - <tr><td width="200" align="right">ID</td><td><%=queryUnit.getId()%></td></tr> - <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(queryUnit.getLastAttempt().getProgress())%>%</td></tr> - <tr><td align="right">State</td><td><%=queryUnit.getState()%></td></tr> - <tr><td align="right">Launch Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : df.format(queryUnit.getLaunchTime())%></td></tr> - <tr><td align="right">Finish Time</td><td><%=queryUnit.getFinishTime() == 0 ? "-" : df.format(queryUnit.getFinishTime())%></td></tr> - <tr><td align="right">Running Time</td><td><%=queryUnit.getLaunchTime() == 0 ? "-" : queryUnit.getRunningTime() + " ms"%></td></tr> - <tr><td align="right">Host</td><td><%=queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost()%></td></tr> - <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr> - <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr> - <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr> - <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(inputStat)%></td></tr> - <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(outputStat)%></td></tr> - <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr> - </table> -</div> -</body> -</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp new file mode 100644 index 0000000..5abbd8c --- /dev/null +++ b/tajo-core/src/main/resources/webapps/worker/task.jsp @@ -0,0 +1,174 @@ +<% + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +%> +<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> + +<%@ page import="org.apache.tajo.ExecutionBlockId" %> +<%@ page import="org.apache.tajo.QueryId" %> +<%@ page import="org.apache.tajo.TaskId" %> +<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %> +<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> +<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> +<%@ page import="org.apache.tajo.master.querymaster.Query" %> +<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> +<%@ page import="org.apache.tajo.master.querymaster.Task" %> +<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %> +<%@ page import="org.apache.tajo.storage.DataLocation" %> +<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %> +<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %> +<%@ page import="org.apache.tajo.util.JSPUtil" %> +<%@ page import="org.apache.tajo.util.TajoIdUtils" %> +<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> +<%@ page import="org.apache.tajo.worker.FetchImpl" %> +<%@ page import="org.apache.tajo.worker.TajoWorker" %> +<%@ page import="java.net.URI" %> +<%@ page import="java.text.SimpleDateFormat" %> +<%@ page import="java.util.Map" %> +<%@ page import="java.util.Set" %> +<%@ page import="org.apache.tajo.storage.fragment.Fragment" %> + +<% + String paramQueryId = request.getParameter("queryId"); + String paramEbId = request.getParameter("ebid"); + String status = request.getParameter("status"); + if(status == null || status.isEmpty() || "null".equals(status)) { + status = "ALL"; + } + + QueryId queryId = TajoIdUtils.parseQueryId(paramQueryId); + ExecutionBlockId ebid = TajoIdUtils.createExecutionBlockId(paramEbId); + + int taskSeq = Integer.parseInt(request.getParameter("taskSeq")); + TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() + .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true); + + if(queryMasterTask == null) { + out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>"); + return; + } + + Query query = queryMasterTask.getQuery(); + SubQuery subQuery = query.getSubQuery(ebid); + + if(subQuery == null) { + out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>"); + return; + } + + if(subQuery == null) { +%> +<script type="text/javascript"> + alert("No Execution Block for" + ebid); + document.history.back(); +</script> +<% + return; + } + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + TaskId taskId = new TaskId(ebid, taskSeq); + Task task = subQuery.getTask(taskId); + if(task == null) { +%> +<script type="text/javascript"> + alert("No Task for" + taskId); + document.history.back(); +</script> +<% + return; + } + + String sort = request.getParameter("sort"); + String sortOrder = request.getParameter("sortOrder"); + + String backUrl = "querytasks.jsp?queryId=" + paramQueryId + "&ebid=" + paramEbId + "&sort=" + sort + "&sortOrder=" + sortOrder + "&status=" + status; + + String fragmentInfo = ""; + String delim = ""; + for (CatalogProtos.FragmentProto eachFragment : task.getAllFragments()) { + Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment); + fragmentInfo += delim + fragment.toString(); + delim = "<br/>"; + } + + String fetchInfo = ""; + delim = ""; + for (Map.Entry<String, Set<FetchImpl>> e : task.getFetchMap().entrySet()) { + fetchInfo += delim + "<b>" + e.getKey() + "</b>"; + delim = "<br/>"; + for (FetchImpl f : e.getValue()) { + for (URI uri : f.getSimpleURIs()){ + fetchInfo += delim + uri; + } + } + } + + String dataLocationInfos = ""; + delim = ""; + for(DataLocation eachLocation: task.getDataLocations()) { + dataLocationInfos += delim + eachLocation.toString(); + delim = "<br/>"; + } + + int numShuffles = task.getShuffleOutpuNum(); + String shuffleKey = "-"; + String shuffleFileName = "-"; + if(numShuffles > 0) { + TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = task.getShuffleFileOutputs().get(0); + shuffleKey = "" + shuffleFileOutputs.getPartId(); + shuffleFileName = shuffleFileOutputs.getFileName(); + } + + TableStats inputStat = task.getLastAttempt().getInputStats(); + TableStats outputStat = task.getLastAttempt().getResultStats(); +%> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<head> + <link rel="stylesheet" type="text/css" href="/static/style.css"/> + <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> + <title>Query Unit Detail</title> +</head> +<body> +<%@ include file="header.jsp"%> +<div class='contents'> + <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2> + <hr/> + <h3><a href='<%=backUrl%>'><%=ebid.toString()%></a></h3> + <hr/> + <table border="1" width="100%" class="border_table"> + <tr><td width="200" align="right">ID</td><td><%=task.getId()%></td></tr> + <tr><td align="right">Progress</td><td><%=JSPUtil.percentFormat(task.getLastAttempt().getProgress())%>%</td></tr> + <tr><td align="right">State</td><td><%=task.getState()%></td></tr> + <tr><td align="right">Launch Time</td><td><%=task.getLaunchTime() == 0 ? "-" : df.format(task.getLaunchTime())%></td></tr> + <tr><td align="right">Finish Time</td><td><%=task.getFinishTime() == 0 ? "-" : df.format(task.getFinishTime())%></td></tr> + <tr><td align="right">Running Time</td><td><%=task.getLaunchTime() == 0 ? "-" : task.getRunningTime() + " ms"%></td></tr> + <tr><td align="right">Host</td><td><%=task.getSucceededHost() == null ? "-" : task.getSucceededHost()%></td></tr> + <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr> + <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr> + <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr> + <tr><td align="right">Input Statistics</td><td><%=JSPUtil.tableStatToString(inputStat)%></td></tr> + <tr><td align="right">Output Statistics</td><td><%=JSPUtil.tableStatToString(outputStat)%></td></tr> + <tr><td align="right">Fetches</td><td><%=fetchInfo%></td></tr> + </table> +</div> +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp index d84664f..5c3ce7b 100644 --- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp @@ -20,7 +20,7 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.commons.lang.StringUtils" %> -<%@ page import="org.apache.tajo.QueryUnitAttemptId" %> +<%@ page import="org.apache.tajo.TaskAttemptId" %> <%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> @@ -33,27 +33,27 @@ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); String containerId = request.getParameter("containerId"); - String quAttemptId = request.getParameter("queryUnitAttemptId"); - QueryUnitAttemptId queryUnitAttemptId = TajoIdUtils.parseQueryUnitAttemptId(quAttemptId); + String quAttemptId = request.getParameter("taskAttemptId"); + TaskAttemptId taskAttemptId = TajoIdUtils.parseTaskAttemptId(quAttemptId); Task task = null; TaskHistory taskHistory = null; if(containerId == null || containerId.isEmpty() || "null".equals(containerId)) { - task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByQueryUnitAttemptId(queryUnitAttemptId); + task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByTaskAttemptId(taskAttemptId); if (task != null) { taskHistory = task.createTaskHistory(); } else { - taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByQueryUnitAttemptId(queryUnitAttemptId); + taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByTaskAttemptId(taskAttemptId); } } else { TaskRunner runner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId); if(runner != null) { - task = runner.getContext().getTask(queryUnitAttemptId); + task = runner.getContext().getTask(taskAttemptId); if (task != null) { taskHistory = task.createTaskHistory(); } else { TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId); if(history != null) { - taskHistory = history.getTaskHistory(queryUnitAttemptId); + taskHistory = history.getTaskHistory(taskAttemptId); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp index b777d5f..b7774e8 100644 --- a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp +++ b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp @@ -31,7 +31,7 @@ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); HistoryReader reader = new HistoryReader(tajoWorker.getWorkerContext().getWorkerName(), tajoWorker.getWorkerContext().getConf()); - TaskHistory taskHistory = reader.getTaskHistory(request.getParameter("queryUnitAttemptId"), + TaskHistory taskHistory = reader.getTaskHistory(request.getParameter("taskAttemptId"), Long.parseLong(request.getParameter("startTime"))); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -62,9 +62,9 @@ <div class='contents'> <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2> <hr/> - <h3>Task Detail: <%=request.getParameter("queryUnitAttemptId")%></h3> + <h3>Task Detail: <%=request.getParameter("taskAttemptId")%></h3> <table border="1" width="100%" class="border_table"> - <tr><td width="200" align="right">ID</td><td><%=request.getParameter("queryUnitAttemptId")%></td></tr> + <tr><td width="200" align="right">ID</td><td><%=request.getParameter("taskAttemptId")%></td></tr> <tr><td align="right">State</td><td><%=taskHistory.getState()%></td></tr> <tr><td align="right">Start Time</td><td><%=taskHistory.getStartTime() == 0 ? "-" : df.format(taskHistory.getStartTime())%></td></tr> <tr><td align="right">Finish Time</td><td><%=taskHistory.getFinishTime() == 0 ? "-" : df.format(taskHistory.getFinishTime())%></td></tr> http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/resources/webapps/worker/tasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp index ae05047..ab873cd 100644 --- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp @@ -19,7 +19,7 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="org.apache.tajo.QueryUnitAttemptId" %> +<%@ page import="org.apache.tajo.TaskAttemptId" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.text.SimpleDateFormat" %> @@ -67,13 +67,13 @@ if (taskRunner != null) { ExecutionBlockContext context = taskRunner.getContext(); - for (Map.Entry<QueryUnitAttemptId, Task> entry : context.getTasks().entrySet()) { - QueryUnitAttemptId queryUnitId = entry.getKey(); + for (Map.Entry<TaskAttemptId, Task> entry : context.getTasks().entrySet()) { + TaskAttemptId taskAttemptId = entry.getKey(); TaskHistory eachTask = entry.getValue().createTaskHistory(); %> <tr> <td> - <a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td> + <a href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td> <td><%=df.format(eachTask.getStartTime())%></td> <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td> <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td> @@ -86,12 +86,12 @@ if (history != null) { - for (Map.Entry<QueryUnitAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) { - QueryUnitAttemptId queryUnitId = entry.getKey(); + for (Map.Entry<TaskAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) { + TaskAttemptId taskAttemptId = entry.getKey(); TaskHistory eachTask = entry.getValue(); %> <tr> - <td><a href="taskdetail.jsp?containerId=<%=containerId%>&queryUnitAttemptId=<%=queryUnitId%>"><%=queryUnitId%></a></td> + <td><a href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td> <td><%=df.format(eachTask.getStartTime())%></td> <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td> <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td> http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index f6a5fed..8bee6fb 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -66,12 +66,12 @@ public class LocalTajoTestingUtility { private static int taskAttemptId; - public static QueryUnitAttemptId newQueryUnitAttemptId() { - return QueryIdFactory.newQueryUnitAttemptId( - QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), taskAttemptId++); + public static TaskAttemptId newTaskAttemptId() { + return QueryIdFactory.newTaskAttemptId( + QueryIdFactory.newTaskId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), taskAttemptId++); } - public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) { - return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0); + public static TaskAttemptId newTaskAttemptId(MasterPlan plan) { + return QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(plan.newExecutionBlockId()), 0); } public static Session createDummySession() { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java index 912400b..836dd2f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java +++ b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java @@ -47,12 +47,12 @@ public class TestQueryIdFactory { } @Test - public void testNewQueryUnitId() { + public void testNewTaskId() { QueryId qid = LocalTajoTestingUtility.newQueryId(); MasterPlan plan = new MasterPlan(qid, null, null); ExecutionBlockId subid = plan.newExecutionBlockId(); - QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid); - QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid); + TaskId quid1 = QueryIdFactory.newTaskId(subid); + TaskId quid2 = QueryIdFactory.newTaskId(subid); assertTrue(quid1.compareTo(quid2) < 0); } }
