http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 1f5e7a3..61e2dea 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -18,6 +18,8 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,11 +37,9 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoContainerProxy; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; -import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -48,20 +48,29 @@ import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; -import org.apache.tajo.storage.*; +import org.apache.tajo.storage.FormatProperty; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; -import org.apache.tajo.worker.AbstractResourceAllocator; -import org.apache.tajo.worker.TajoResourceAllocator; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.apache.tajo.worker.event.NodeResourceEvent; +import org.apache.tajo.worker.event.NodeStatusEvent; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import static org.apache.tajo.TajoProtos.QueryState; +import static org.apache.tajo.ResourceProtos.*; public class QueryMasterTask extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName()); @@ -90,20 +99,21 @@ public class QueryMasterTask extends CompositeService { private AtomicLong lastClientHeartbeat = new AtomicLong(-1); - private AbstractResourceAllocator resourceAllocator; - - private AtomicBoolean stopped = new AtomicBoolean(false); + private volatile boolean isStopped; private TajoMetrics queryMetrics; private Throwable initError; - private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = - new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>(); + private NodeResource allocation; + + private final List<TaskFatalErrorReport> diagnostics = new ArrayList<TaskFatalErrorReport>(); + + private final ConcurrentMap<Integer, WorkerConnectionInfo> workerMap = Maps.newConcurrentMap(); public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, QueryId queryId, Session session, QueryContext queryContext, - String jsonExpr, AsyncDispatcher dispatcher) { + String jsonExpr, NodeResource allocation, AsyncDispatcher dispatcher) { super(QueryMasterTask.class.getName()); this.queryMasterContext = queryMasterContext; @@ -111,90 +121,90 @@ public class QueryMasterTask extends CompositeService { this.session = session; this.queryContext = queryContext; this.jsonExpr = jsonExpr; + this.allocation = allocation; this.querySubmitTime = System.currentTimeMillis(); this.dispatcher = dispatcher; } public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, QueryId queryId, Session session, QueryContext queryContext, - String jsonExpr) { - this(queryMasterContext, queryId, session, queryContext, jsonExpr, new AsyncDispatcher()); + String jsonExpr, + NodeResource allocation) { + this(queryMasterContext, queryId, session, queryContext, jsonExpr, allocation, new AsyncDispatcher()); } @Override - public void init(Configuration conf) { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("conf should be a TajoConf type."); - } - systemConf = (TajoConf)conf; + public void serviceInit(Configuration conf) throws Exception { - try { - queryTaskContext = new QueryMasterTaskContext(); - String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS); + systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); - if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) { - resourceAllocator = new TajoResourceAllocator(queryTaskContext); - } else { - throw new UnimplementedException(resourceManagerClassName + " is not supported yet"); - } - addService(resourceAllocator); - addService(dispatcher); + queryTaskContext = new QueryMasterTaskContext(); - dispatcher.register(StageEventType.class, new StageEventDispatcher()); - dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); - dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); - dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); - dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher()); - dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler()); + addService(dispatcher); - queryMetrics = new TajoMetrics(queryId.toString()); + dispatcher.register(StageEventType.class, new StageEventDispatcher()); + dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); + dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); + dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); + dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher()); + dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler()); - super.init(systemConf); + try { + queryMetrics = new TajoMetrics(queryId.toString()); } catch (Throwable t) { LOG.error(t.getMessage(), t); initError = t; } + super.serviceInit(systemConf); } public boolean isStopped() { - return stopped.get(); + return isStopped; } @Override - public void start() { + public void serviceStart() throws Exception { startQuery(); - super.start(); + List<TajoProtos.WorkerConnectionInfoProto> workersProto = queryMasterContext.getQueryMaster().getAllWorker(); + for (TajoProtos.WorkerConnectionInfoProto worker : workersProto) { + workerMap.put(worker.getId(), new WorkerConnectionInfo(worker)); + } + super.serviceStart(); } @Override - public void stop() { - - if(stopped.getAndSet(true)) { - return; - } + public void serviceStop() throws Exception { + isStopped = true; LOG.info("Stopping QueryMasterTask:" + queryId); - try { - resourceAllocator.stop(); - } catch (Throwable t) { - LOG.fatal(t.getMessage(), t); + //release QM resource + EventHandler handler = getQueryTaskContext().getQueryMasterContext().getWorkerContext(). + getNodeResourceManager().getDispatcher().getEventHandler(); + + handler.handle(new NodeResourceDeallocateEvent(allocation, NodeResourceEvent.ResourceType.QUERY_MASTER)); + + //flush current node resource + handler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS)); + + if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { + cleanupQuery(getQueryId()); } if (queryMetrics != null) { - queryMetrics.report(new MetricsConsoleReporter()); + queryMasterContext.getEventExecutor().submit(new Runnable() { + @Override + public void run() { + queryMetrics.report(new MetricsConsoleReporter()); + } + }); } - super.stop(); + super.serviceStop(); LOG.info("Stopped QueryMasterTask:" + queryId); } - public void handleTaskRequestEvent(TaskRequestEvent event) { - ExecutionBlockId id = event.getExecutionBlockId(); - query.getStage(id).handleTaskRequestEvent(event); - } - - public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) { + public void handleTaskFailed(TaskFatalErrorReport report) { synchronized(diagnostics) { if (diagnostics.size() < 10) { diagnostics.add(report); @@ -204,7 +214,7 @@ public class QueryMasterTask extends CompositeService { getEventHandler().handle(new TaskFatalErrorEvent(report)); } - public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() { + public Collection<TaskFatalErrorReport> getDiagnostics() { synchronized(diagnostics) { return Collections.unmodifiableCollection(diagnostics); } @@ -252,33 +262,53 @@ public class QueryMasterTask extends CompositeService { } } + /** + * It sends a kill RPC request to a corresponding worker. + * + * @param workerId worker unique Id. + * @param taskAttemptId The TaskAttemptId to be killed. + */ + protected void killTaskAttempt(int workerId, TaskAttemptId taskAttemptId) { + NettyClientBase tajoWorkerRpc; + ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId(); + InetSocketAddress workerAddress = getQuery().getStage(ebId).getAssignedWorkerMap().get(workerId); + + try { + tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>(); + tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), callFuture); + + if(!callFuture.get().getValue()){ + queryMasterContext.getEventHandler().handle( + new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + taskAttemptId)); + } + } catch (Exception e) { + /* Node RPC failure */ + LOG.error(e.getMessage(), e); + queryMasterContext.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); + } + } + private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> { @Override - public void handle(LocalTaskEvent event) { - TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId()); - if (proxy != null) { - proxy.killTaskAttempt(event.getTaskAttemptId()); - } + public void handle(final LocalTaskEvent event) { + queryMasterContext.getEventExecutor().submit(new Runnable() { + @Override + public void run() { + killTaskAttempt(event.getWorkerId(), event.getTaskAttemptId()); + } + }); } } private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> { + @Override public void handle(QueryMasterQueryCompletedEvent event) { - QueryId queryId = event.getQueryId(); - LOG.info("Query completion notified from " + queryId); - - while (!isTerminatedState(query.getSynchronizedState())) { - try { - synchronized (this) { - wait(10); - } - } catch (InterruptedException e) { - LOG.error(e); - } - } - LOG.info("Query final state: " + query.getSynchronizedState()); + QueryId queryId = event.getQueryId(); + LOG.info("Query completion notified from " + queryId + " final state: " + query.getSynchronizedState()); queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId)); } } @@ -441,6 +471,29 @@ public class QueryMasterTask extends CompositeService { return this.querySubmitTime; } + private void cleanupQuery(final QueryId queryId) { + Set<InetSocketAddress> workers = Sets.newHashSet(); + for (Stage stage : getQuery().getStages()) { + workers.addAll(stage.getAssignedWorkerMap().values()); + } + + LOG.info("Cleanup resources of all workers. Query: " + queryId + ", workers: " + workers.size()); + for (final InetSocketAddress worker : workers) { + queryMasterContext.getEventExecutor().submit(new Runnable() { + @Override + public void run() { + try { + AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get()); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + } + }); + } + } + public class QueryMasterTaskContext { EventHandler eventHandler; public QueryMaster.QueryMasterContext getQueryMasterContext() { @@ -501,12 +554,15 @@ public class QueryMasterTask extends CompositeService { return query.getProgress(); } - public AbstractResourceAllocator getResourceAllocator() { - return resourceAllocator; - } - public TajoMetrics getQueryMetrics() { return queryMetrics; } + + /** + * A key is worker id, and a value is a worker connection information. + */ + public ConcurrentMap<Integer, WorkerConnectionInfo> getWorkerMap() { + return workerMap; + } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index f30fb64..50b84d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -40,8 +40,8 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.exception.InternalException; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.util.PlannerUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 1163a6e..50b45a8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -20,15 +20,12 @@ package org.apache.tajo.querymaster; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.commons.lang.exception.ExceptionUtils; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; -import org.apache.hadoop.yarn.util.Records; import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -38,27 +35,24 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.planner.PhysicalPlannerImpl; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; -import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; -import org.apache.tajo.master.LaunchTaskRunnersEvent; -import org.apache.tajo.master.TaskRunnerGroupEvent; -import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.master.TaskState; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.querymaster.Task.IntermediateEntry; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.FileTablespace; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; @@ -70,8 +64,8 @@ import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.worker.FetchImpl; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -80,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.tajo.conf.TajoConf.ConfVars; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; +import static org.apache.tajo.ResourceProtos.*; /** @@ -107,16 +102,12 @@ public class Stage implements EventHandler<StageEvent> { private volatile long lastContactTime; private Thread timeoutChecker; - volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>(); - volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId, - TajoContainer>(); + private final Map<TaskId, Task> tasks = Maps.newConcurrentMap(); + private final Map<Integer, InetSocketAddress> workerMap = Maps.newConcurrentMap(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); - private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition(); private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition(); - private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION = - new AllocatedContainersCancelTransition(); private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition(); private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition(); private StateMachine<StageState, StageEventType, StageEvent> stateMachine; @@ -142,8 +133,7 @@ public class Stage implements EventHandler<StageEvent> { // Transitions from INITED state .addTransition(StageState.INITED, StageState.RUNNING, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINER_LAUNCH_TRANSITION) + StageEventType.SQ_START) .addTransition(StageState.INITED, StageState.INITED, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) @@ -153,10 +143,7 @@ public class Stage implements EventHandler<StageEvent> { StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from RUNNING state - .addTransition(StageState.RUNNING, StageState.RUNNING, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINER_LAUNCH_TRANSITION) + // Transitions from RUNNING state .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) @@ -185,10 +172,8 @@ public class Stage implements EventHandler<StageEvent> { // Transitions from KILL_WAIT state .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - EnumSet.of(StageEventType.SQ_START, StageEventType.SQ_CONTAINER_ALLOCATED), - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition()) + EnumSet.of(StageEventType.SQ_KILL), + new KillTasksTransition()) .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, StageEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) @@ -205,6 +190,9 @@ public class Stage implements EventHandler<StageEvent> { .addTransition(StageState.KILL_WAIT, StageState.ERROR, StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + // Ignore-able events + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + EnumSet.of(StageEventType.SQ_START)) // Transitions from FINALIZING state .addTransition(StageState.FINALIZING, StageState.FINALIZING, @@ -226,9 +214,6 @@ public class Stage implements EventHandler<StageEvent> { // Transitions from SUCCEEDED state .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(StageState.SUCCEEDED, StageState.ERROR, @@ -239,14 +224,10 @@ public class Stage implements EventHandler<StageEvent> { EnumSet.of( StageEventType.SQ_START, StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_SHUFFLE_REPORT)) // Transitions from KILLED state .addTransition(StageState.KILLED, StageState.KILLED, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.KILLED, StageState.KILLED, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(StageState.KILLED, StageState.ERROR, @@ -257,16 +238,12 @@ public class Stage implements EventHandler<StageEvent> { EnumSet.of( StageEventType.SQ_START, StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_SHUFFLE_REPORT, StageEventType.SQ_STAGE_COMPLETED, StageEventType.SQ_FAILED)) // Transitions from FAILED state .addTransition(StageState.FAILED, StageState.FAILED, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.FAILED, StageState.FAILED, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(StageState.FAILED, StageState.ERROR, @@ -277,14 +254,10 @@ public class Stage implements EventHandler<StageEvent> { EnumSet.of( StageEventType.SQ_START, StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_FAILED)) // Transitions from ERROR state .addTransition(StageState.ERROR, StageState.ERROR, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.ERROR, StageState.ERROR, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) // Ignore-able transitions @@ -294,18 +267,19 @@ public class Stage implements EventHandler<StageEvent> { StageEventType.SQ_KILL, StageEventType.SQ_FAILED, StageEventType.SQ_INTERNAL_ERROR, - StageEventType.SQ_STAGE_COMPLETED)) + StageEventType.SQ_STAGE_COMPLETED, + StageEventType.SQ_SHUFFLE_REPORT)) .installTopology(); private final Lock readLock; private final Lock writeLock; - private int totalScheduledObjectsCount; - private int completedTaskCount = 0; - private int succeededObjectCount = 0; - private int killedObjectCount = 0; - private int failedObjectCount = 0; + private volatile int totalScheduledObjectsCount; + private volatile int completedTaskCount = 0; + private volatile int succeededObjectCount = 0; + private volatile int killedObjectCount = 0; + private volatile int failedObjectCount = 0; private TaskSchedulerContext schedulerContext; private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList(); private AtomicInteger completedShuffleTasks = new AtomicInteger(0); @@ -695,16 +669,60 @@ public class Stage implements EventHandler<StageEvent> { } private void stopScheduler() { - // If there are launched TaskRunners, send the 'shouldDie' message to all r - // via received task requests. if (taskScheduler != null) { taskScheduler.stop(); } } - private void releaseContainers() { - // If there are still live TaskRunners, try to kill the containers. and send the shuffle report request - eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values())); + /** + * Get the launched worker address + */ + protected Map<Integer, InetSocketAddress> getAssignedWorkerMap() { + return workerMap; + } + + private void sendStopExecutionBlockEvent(final StopExecutionBlockRequest requestProto) { + + for (final InetSocketAddress worker : getAssignedWorkerMap().values()) { + getContext().getQueryMasterContext().getEventExecutor().submit(new Runnable() { + @Override + public void run() { + try { + AsyncRpcClient tajoWorkerRpc = + RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + tajoWorkerRpcClient.stopExecutionBlock(null, + requestProto, NullCallback.get(PrimitiveProtos.BoolProto.class)); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + } + }); + } + } + + /** + * Sends stopping request to all worker + */ + protected void stopExecutionBlock() { + // If there are still live tasks, try to kill the tasks. and send the shuffle report request + + List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList(); + if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) { + List<ExecutionBlock> childs = getMasterPlan().getChilds(getId()); + + for (ExecutionBlock executionBlock : childs) { + ebIds.add(executionBlock.getId().getProto()); + } + } + + StopExecutionBlockRequest.Builder stopRequest = StopExecutionBlockRequest.newBuilder(); + ExecutionBlockListProto.Builder cleanupList = ExecutionBlockListProto.newBuilder(); + + cleanupList.addAllExecutionBlockId(Lists.newArrayList(ebIds)); + stopRequest.setCleanupList(cleanupList.build()); + stopRequest.setExecutionBlockId(getId().getProto()); + sendStopExecutionBlockEvent(stopRequest.build()); } /** @@ -769,10 +787,6 @@ public class Stage implements EventHandler<StageEvent> { } } - public void handleTaskRequestEvent(TaskRequestEvent event) { - taskScheduler.handleTaskRequestEvent(event); - } - private static class InitAndRequestContainer implements MultipleArcTransition<Stage, StageEvent, StageState> { @@ -794,7 +808,7 @@ public class Stage implements EventHandler<StageEvent> { setShuffleIfNecessary(stage, channel); initTaskScheduler(stage); // execute pre-processing asyncronously - stage.getContext().getQueryMasterContext().getEventExecutor() + stage.getContext().getQueryMasterContext().getSingleEventExecutor() .submit(new Runnable() { @Override public void run() { @@ -809,7 +823,7 @@ public class Stage implements EventHandler<StageEvent> { } else { if(stage.getSynchronizedState() == StageState.INITED) { stage.taskScheduler.start(); - allocateContainers(stage); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); } else { /* all tasks are killed before stage are inited */ if (stage.getTotalScheduledObjectsCount() == stage.getCompletedTaskCount()) { @@ -1047,33 +1061,6 @@ public class Stage implements EventHandler<StageEvent> { } } - public static void allocateContainers(Stage stage) { - ExecutionBlock execBlock = stage.getBlock(); - - //TODO consider disk slot - int requiredMemoryMBPerTask = 512; - - int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers( - stage.getContext().getQueryMasterContext().getWorkerContext(), - stage.schedulerContext.getEstimatedTaskNum(), - requiredMemoryMBPerTask - ); - - final Resource resource = Records.newRecord(Resource.class); - - resource.setMemory(requiredMemoryMBPerTask); - - LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest); - - Priority priority = Records.newRecord(Priority.class); - priority.setPriority(stage.getPriority()); - ContainerAllocationEvent event = - new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, - stage.getId(), priority, resource, numRequest, - stage.masterPlan.isLeaf(execBlock), 0.0f); - stage.eventHandler.handle(event); - } - private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException { ExecutionBlock execBlock = stage.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); @@ -1082,8 +1069,6 @@ public class Stage implements EventHandler<StageEvent> { TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); Collection<Fragment> fragments; - TableMeta meta = table.getMeta(); - Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()).get(); // Depending on scanner node's type, it creates fragments. If scan is for @@ -1157,69 +1142,6 @@ public class Stage implements EventHandler<StageEvent> { return unit; } - private static class ContainerLaunchTransition - implements SingleArcTransition<Stage, StageEvent> { - - @Override - public void transition(Stage stage, StageEvent event) { - if (!(event instanceof StageContainerAllocationEvent)) { - throw new IllegalArgumentException("event should be a StageContainerAllocationEvent type."); - } - try { - StageContainerAllocationEvent allocationEvent = - (StageContainerAllocationEvent) event; - for (TajoContainer container : allocationEvent.getAllocatedContainer()) { - TajoContainerId cId = container.getId(); - if (stage.containers.containsKey(cId)) { - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), - "Duplicated containers are allocated: " + cId.toString())); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); - } - stage.containers.put(cId, container); - } - LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!"); - stage.eventHandler.handle( - new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(), - stage.getContext().getQueryContext(), - CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class)) - ); - - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); - } catch (Throwable t) { - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), - ExceptionUtils.getStackTrace(t))); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); - } - } - } - - /** - * It is used in KILL_WAIT state against Contained Allocated event. - * It just returns allocated containers to resource manager. - */ - private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> { - @Override - public void transition(Stage stage, StageEvent event) { - if (!(event instanceof StageContainerAllocationEvent)) { - throw new IllegalArgumentException("event should be a StageContainerAllocationEvent type."); - } - try { - StageContainerAllocationEvent allocationEvent = - (StageContainerAllocationEvent) event; - stage.eventHandler.handle( - new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, - stage.getId(), allocationEvent.getAllocatedContainer())); - LOG.info(String.format("[%s] %d allocated containers are canceled", - stage.getId().toString(), - allocationEvent.getAllocatedContainer().size())); - } catch (Throwable t) { - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), - ExceptionUtils.getStackTrace(t))); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); - } - } - } - private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> { @Override @@ -1236,6 +1158,7 @@ public class Stage implements EventHandler<StageEvent> { stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); } else { stage.completedTaskCount++; + stage.getTaskScheduler().releaseTaskAttempt(task.getLastAttempt()); if (taskEvent.getState() == TaskState.SUCCEEDED) { stage.succeededObjectCount++; @@ -1281,19 +1204,7 @@ public class Stage implements EventHandler<StageEvent> { private void cleanup() { stopScheduler(); - releaseContainers(); - - if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) { - List<ExecutionBlock> childs = getMasterPlan().getChilds(getId()); - List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList(); - - for (ExecutionBlock executionBlock : childs) { - ebIds.add(executionBlock.getId().getProto()); - } - - getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); - } - + stopExecutionBlock(); this.finalStageHistory = makeStageHistory(); this.finalStageHistory.setTasks(makeTaskHistories()); } @@ -1309,7 +1220,7 @@ public class Stage implements EventHandler<StageEvent> { private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType type) { if(!checkIfNeedFinalizing(type)) return; - TajoWorkerProtocol.ExecutionBlockReport report = event.getReport(); + ExecutionBlockReport report = event.getReport(); if (!report.getReportSuccess()) { stopFinalization(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 1da623e..a586e4b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -34,9 +34,8 @@ import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto; -import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; import org.apache.tajo.master.TaskState; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; @@ -57,7 +56,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; +import static org.apache.tajo.ResourceProtos.*; public class Task implements EventHandler<TaskEvent> { /** Class Logger */ @@ -86,9 +85,8 @@ public class Task implements EventHandler<TaskEvent> { private TaskAttemptId lastAttemptId; private TaskAttemptId successfulAttempt; - private String succeededHost; - private int succeededHostPort; - private int succeededPullServerPort; + + private WorkerConnectionInfo succeededWorker; private int failedAttempts; private int finishedAttempts; // finish are total of success, failed and killed @@ -174,6 +172,7 @@ public class Task implements EventHandler<TaskEvent> { EnumSet.of( TaskEventType.T_KILL, TaskEventType.T_SCHEDULE, + TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) @@ -252,7 +251,11 @@ public class Task implements EventHandler<TaskEvent> { taskHistory.setState(lastAttempt.getState().toString()); taskHistory.setProgress(lastAttempt.getProgress()); } - taskHistory.setHostAndPort(succeededHost + ":" + succeededHostPort); + + if(getSucceededWorker() != null) { + taskHistory.setHostAndPort(succeededWorker.getHostAndPeerRpcPort()); + } + taskHistory.setRetryCount(this.getRetryCount()); taskHistory.setLaunchTime(launchTime); taskHistory.setFinishTime(finishTime); @@ -357,8 +360,8 @@ public class Task implements EventHandler<TaskEvent> { return dataLocations; } - public String getSucceededHost() { - return succeededHost; + public WorkerConnectionInfo getSucceededWorker() { + return succeededWorker; } public void addFetches(String tableId, Collection<FetchImpl> fetches) { @@ -608,9 +611,7 @@ public class Task implements EventHandler<TaskEvent> { TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); task.successfulAttempt = attemptEvent.getTaskAttemptId(); - task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); - task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); - task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort(); + task.succeededWorker = attempt.getWorkerConnectionInfo(); task.finishTask(); task.eventHandler.handle(new StageTaskEvent(event.getTaskId(), TaskState.SUCCEEDED)); @@ -624,11 +625,7 @@ public class Task implements EventHandler<TaskEvent> { if (!(event instanceof TaskTAttemptEvent)) { throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); task.launchTime = System.currentTimeMillis(); - task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); - task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index c1b9273..6c48d3b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -23,18 +23,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; -import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; +import org.apache.tajo.ResourceProtos.TaskCompletionReport; +import org.apache.tajo.ResourceProtos.ShuffleFileOutput; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.querymaster.Task.PullHost; -import org.apache.tajo.master.container.TajoContainerId; import java.util.ArrayList; import java.util.EnumSet; @@ -43,8 +43,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; - public class TaskAttempt implements EventHandler<TaskAttemptEvent> { private static final Log LOG = LogFactory.getLog(TaskAttempt.class); @@ -55,7 +53,6 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { private final Task task; final EventHandler eventHandler; - private TajoContainerId containerId; private WorkerConnectionInfo workerConnectionInfo; private int expire; @@ -109,6 +106,8 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { TaskAttemptEventType.TA_DONE, new SucceededTransition()) .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED, TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_UNASSIGNED, + TaskAttemptEventType.TA_ASSIGN_CANCEL, new CancelTransition()) // Transitions from TA_RUNNING state .addTransition(TaskAttemptState.TA_RUNNING, @@ -167,6 +166,10 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_DONE), new TaskKilledCompleteTransition()) + + // Transitions from TA_FAILED state + .addTransition(TaskAttemptState.TA_FAILED, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_KILL) .installTopology(); private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> @@ -214,10 +217,6 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { return this.workerConnectionInfo; } - public void setContainerId(TajoContainerId containerId) { - this.containerId = containerId; - } - public synchronized void setExpireTime(int expire) { this.expire = expire; } @@ -311,7 +310,6 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { throw new IllegalArgumentException("event should be a TaskAttemptAssignedEvent type."); } TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; - taskAttempt.containerId = castEvent.getContainerId(); taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); taskAttempt.eventHandler.handle( new TaskTAttemptEvent(taskAttempt.getId(), @@ -319,6 +317,17 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { } } + private static class CancelTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + + taskAttempt.workerConnectionInfo = null; + } + } + private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { @Override @@ -396,7 +405,8 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { @Override public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { - taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId, + taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), + taskAttempt.getWorkerConnectionInfo().getId(), LocalTaskEventType.KILL)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java index 58b8a26..15ec845 100644 --- a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java +++ b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java @@ -27,9 +27,7 @@ public class DefaultResourceCalculator extends ResourceCalculator { @Override public int computeAvailableContainers(NodeResource available, NodeResource required) { - return Math.min(Math.min( - available.getMemory() / required.getMemory(), - available.getDisks() / required.getDisks()), + return Math.min(available.getMemory() / required.getMemory(), available.getVirtualCores() / required.getVirtualCores()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java index f51fc07..c7fe55b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java @@ -183,6 +183,6 @@ public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>, @Override public String toString() { - return "<memory:" + getMemory() + ", disks:" + getDisks() + ", vCores:" + getVirtualCores() + ">"; + return "(Memory:" + getMemory() + ", Disks:" + getDisks() + ", vCores:" + getVirtualCores() + ")"; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/session/Session.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/session/Session.java index ea7d29a..0d067dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/session/Session.java +++ b/tajo-core/src/main/java/org/apache/tajo/session/Session.java @@ -32,7 +32,7 @@ import org.apache.tajo.common.ProtoObject; import java.util.HashMap; import java.util.Map; -import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto; +import static org.apache.tajo.ResourceProtos.SessionProto; public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable { private static final Log LOG = LogFactory.getLog(Session.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 578b15a..7641320 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -19,7 +19,6 @@ package org.apache.tajo.util; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; @@ -31,8 +30,6 @@ import org.apache.tajo.querymaster.Task; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; -import org.apache.tajo.worker.TaskRunner; -import org.apache.tajo.worker.TaskRunnerHistory; import java.text.DecimalFormat; import java.util.*; @@ -66,28 +63,6 @@ public class JSPUtil { Collections.sort(tasks, new TaskHistoryComparator(sortField, "asc".equals(sortOrder))); } - public static void sortTaskRunner(List<TaskRunner> taskRunners) { - Collections.sort(taskRunners, new Comparator<TaskRunner>() { - @Override - public int compare(TaskRunner taskRunner, TaskRunner taskRunner2) { - return taskRunner.getId().compareTo(taskRunner2.getId()); - } - }); - } - - public static void sortTaskRunnerHistory(List<TaskRunnerHistory> histories) { - Collections.sort(histories, new Comparator<TaskRunnerHistory>() { - @Override - public int compare(TaskRunnerHistory h1, TaskRunnerHistory h2) { - int value = h1.getExecutionBlockId().compareTo(h2.getExecutionBlockId()); - if(value == 0){ - return h1.getContainerId().compareTo(h2.getContainerId()); - } - return value; - } - }); - } - public static String getElapsedTime(long startTime, long finishTime) { if(startTime == 0) { return "-"; @@ -221,8 +196,8 @@ public class JSPUtil { if("id".equals(sortField)) { return task.getId().compareTo(task2.getId()); } else if("host".equals(sortField)) { - String host1 = task.getSucceededHost() == null ? "-" : task.getSucceededHost(); - String host2 = task2.getSucceededHost() == null ? "-" : task2.getSucceededHost(); + String host1 = task.getSucceededWorker() == null ? "-" : task.getSucceededWorker().getHost(); + String host2 = task2.getSucceededWorker() == null ? "-" : task2.getSucceededWorker().getHost(); return host1.compareTo(host2); } else if("runTime".equals(sortField)) { return compareLong(task.getRunningTime(), task2.getRunningTime()); @@ -235,8 +210,8 @@ public class JSPUtil { if("id".equals(sortField)) { return task2.getId().compareTo(task.getId()); } else if("host".equals(sortField)) { - String host1 = task.getSucceededHost() == null ? "-" : task.getSucceededHost(); - String host2 = task2.getSucceededHost() == null ? "-" : task2.getSucceededHost(); + String host1 = task.getSucceededWorker() == null ? "-" : task.getSucceededWorker().getHost(); + String host2 = task2.getSucceededWorker() == null ? "-" : task2.getSucceededWorker().getHost(); return host2.compareTo(host1); } else if("runTime".equals(sortField)) { if(task2.getLaunchTime() == 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index f4719b2..2acd5f6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; +import org.apache.tajo.ResourceProtos.TaskHistoryProto; import org.apache.tajo.master.QueryInfo; import org.apache.tajo.util.Bytes; @@ -192,7 +192,11 @@ public class HistoryReader { } public List<TaskHistory> getTaskHistory(String queryId, String ebId) throws IOException { - Path queryHistoryFile = getQueryHistoryFilePath(queryId, 0); + return getTaskHistory(queryId, ebId, 0); + } + + public List<TaskHistory> getTaskHistory(String queryId, String ebId, long startTime) throws IOException { + Path queryHistoryFile = getQueryHistoryFilePath(queryId, startTime); if (queryHistoryFile == null) { return new ArrayList<TaskHistory>(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index daced3e..c51b4e6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -53,6 +53,9 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class HistoryWriter extends AbstractService { private static final Log LOG = LogFactory.getLog(HistoryWriter.class); + public static final String HISTORY_QUERY_REPLICATION = "tajo.history.query.replication"; + public static final String HISTORY_TASK_REPLICATION = "tajo.history.task.replication"; + public static final String QUERY_LIST = "query-list"; public static final String QUERY_DETAIL = "query-detail"; public static final String HISTORY_FILE_POSTFIX = ".hist"; @@ -87,13 +90,15 @@ public class HistoryWriter extends AbstractService { if (!(conf instanceof TajoConf)) { throw new IllegalArgumentException("conf should be a TajoConf type."); } - tajoConf = (TajoConf)conf; + tajoConf = (TajoConf) conf; historyParentPath = tajoConf.getQueryHistoryDir(tajoConf); taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); writerThread = new WriterThread(); historyCleaner = new HistoryCleaner(tajoConf, isMaster); - queryReplication = (short) tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_REPLICATION); - taskReplication = (short) tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_TASK_REPLICATION); + queryReplication = (short) tajoConf.getInt(HISTORY_QUERY_REPLICATION, + FileSystem.get(tajoConf).getDefaultReplication(historyParentPath)); + taskReplication = (short) tajoConf.getInt(HISTORY_TASK_REPLICATION, + FileSystem.get(tajoConf).getDefaultReplication(taskHistoryParentPath)); super.serviceInit(conf); } @@ -144,6 +149,7 @@ public class HistoryWriter extends AbstractService { } }; historyQueue.add(future); + synchronized (writerThread) { writerThread.notifyAll(); } @@ -151,7 +157,7 @@ public class HistoryWriter extends AbstractService { } /* synchronously flush to history file */ - public synchronized void appendAndSync(History history) + public void appendAndSync(History history) throws TimeoutException, InterruptedException, IOException { WriterFuture<WriterHolder> future = appendAndFlush(history); @@ -227,7 +233,7 @@ public class HistoryWriter extends AbstractService { } try { - if (!histories.isEmpty()) { + if (!stopped.get() && !histories.isEmpty()) { writeHistory(histories); } else { continue; @@ -239,25 +245,23 @@ public class HistoryWriter extends AbstractService { //clean up history file // closing previous writer - synchronized (taskWriters) { - Calendar cal = Calendar.getInstance(); - cal.add(Calendar.HOUR_OF_DAY, -2); - String closeTargetTime = df.format(cal.getTime()); - List<String> closingTargets = new ArrayList<String>(); - - for (String eachWriterTime : taskWriters.keySet()) { - if (eachWriterTime.compareTo(closeTargetTime) <= 0) { - closingTargets.add(eachWriterTime); - } + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.HOUR_OF_DAY, -2); + String closeTargetTime = df.format(cal.getTime()); + List<String> closingTargets = new ArrayList<String>(); + + for (String eachWriterTime : taskWriters.keySet()) { + if (eachWriterTime.compareTo(closeTargetTime) <= 0) { + closingTargets.add(eachWriterTime); } + } - for (String eachWriterTime : closingTargets) { - WriterHolder writerHolder; - writerHolder = taskWriters.remove(eachWriterTime); - if (writerHolder != null) { - LOG.info("Closing task history file: " + writerHolder.path); - IOUtils.cleanup(LOG, writerHolder); - } + for (String eachWriterTime : closingTargets) { + WriterHolder writerHolder; + writerHolder = taskWriters.remove(eachWriterTime); + if (writerHolder != null) { + LOG.info("Closing task history file: " + writerHolder.path); + IOUtils.cleanup(LOG, writerHolder); } } } @@ -267,24 +271,20 @@ public class HistoryWriter extends AbstractService { private int drainHistory(Collection<WriterFuture<WriterHolder>> buffer, int numElements, long timeoutMillis) throws InterruptedException { - long deadline = System.currentTimeMillis() + timeoutMillis; + long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis); int added = 0; while (added < numElements) { added += historyQueue.drainTo(buffer, numElements - added); if (added < numElements) { // not enough elements immediately available; will have to wait - if (deadline <= System.currentTimeMillis()) { - break; - } else { - synchronized (writerThread) { - writerThread.wait(deadline - System.currentTimeMillis()); - if (deadline > System.currentTimeMillis()) { - added += historyQueue.drainTo(buffer, numElements - added); - break; - } - } + WriterFuture<WriterHolder> e = historyQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); + if (e == null) { + break; // we already waited enough, and there are no more elements in sight } + buffer.add(e); + added++; } } + return added; } @@ -427,13 +427,11 @@ public class HistoryWriter extends AbstractService { } private void flushTaskHistories() { - synchronized (taskWriters) { - for (WriterHolder holder : taskWriters.values()) { - try { - holder.flush(); - } catch (IOException e) { - LOG.warn(e, e); - } + for (WriterHolder holder : taskWriters.values()) { + try { + holder.flush(); + } catch (IOException e) { + LOG.warn(e, e); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java deleted file mode 100644 index 68c57f2..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.collect.Maps; -import org.apache.hadoop.service.CompositeService; -import org.apache.tajo.master.ContainerProxy; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.container.TajoContainerId; - -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator { - /** - * A key is worker id, and a value is a worker connection information. - */ - protected ConcurrentMap<Integer, WorkerConnectionInfo> workerInfoMap = Maps.newConcurrentMap(); - - public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) { - return workerInfoMap.get(workerId); - } - - public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) { - workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo); - } - - private Map<TajoContainerId, ContainerProxy> containers = Maps.newConcurrentMap(); - - public AbstractResourceAllocator() { - super(AbstractResourceAllocator.class.getName()); - } - - public void addContainer(TajoContainerId cId, ContainerProxy container) { - containers.put(cId, container); - } - - public void removeContainer(TajoContainerId cId) { - containers.remove(cId); - } - - public boolean containsContainer(TajoContainerId cId) { - return containers.containsKey(cId); - } - - public ContainerProxy getContainer(TajoContainerId cId) { - return containers.get(cId); - } - - public Map<TajoContainerId, ContainerProxy> getContainers() { - return containers; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index cbd451d..57bedd2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -34,18 +34,15 @@ import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; +import org.apache.tajo.worker.event.ExecutionBlockErrorEvent; import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -54,14 +51,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +import static org.apache.tajo.ResourceProtos.*; import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface; public class ExecutionBlockContext { /** class logger */ private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class); - private TaskRunnerManager manager; + protected AtomicInteger runningTasksNum = new AtomicInteger(); protected AtomicInteger completedTasksNum = new AtomicInteger(); protected AtomicInteger succeededTasksNum = new AtomicInteger(); protected AtomicInteger killedTasksNum = new AtomicInteger(); @@ -79,10 +76,8 @@ public class ExecutionBlockContext { private TajoQueryEngine queryEngine; private RpcClientManager connManager; - private InetSocketAddress qmMasterAddr; - private NettyClientBase client; + private AsyncRpcClient queryMasterClient; private QueryMasterProtocol.QueryMasterProtocolService.Interface stub; - private WorkerConnectionInfo queryMaster; private TajoConf systemConf; // for the doAs block private UserGroupInformation taskOwner; @@ -96,17 +91,13 @@ public class ExecutionBlockContext { // It keeps all of the query unit attempts while a TaskRunner is running. private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap(); - @Deprecated - private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); + private final Map<TaskId, TaskHistory> taskHistories = Maps.newConcurrentMap(); - private final Map<TaskId, TaskHistory> taskHistories = Maps.newTreeMap(); - - public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, - TaskRunnerManager manager, RunExecutionBlockRequestProto request) throws IOException { - this.manager = manager; + public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, ExecutionBlockContextResponse request, + AsyncRpcClient queryMasterClient) + throws IOException { this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId()); this.connManager = RpcClientManager.getInstance(); - this.queryMaster = new WorkerConnectionInfo(request.getQueryMaster()); this.systemConf = workerContext.getConf(); this.reporter = new Reporter(); this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); @@ -120,6 +111,7 @@ public class ExecutionBlockContext { this.resource = new ExecutionBlockSharedResource(); this.workerContext = workerContext; this.shuffleType = request.getShuffleType(); + this.queryMasterClient = queryMasterClient; } public void init() throws Throwable { @@ -127,8 +119,6 @@ public class ExecutionBlockContext { LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR)); LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR)); - this.qmMasterAddr = NetUtils.createSocketAddr(queryMaster.getHost(), queryMaster.getQueryMasterPort()); - LOG.info("QueryMaster Address:" + qmMasterAddr); UserGroupInformation.setConfiguration(systemConf); // TODO - 'load credential' should be implemented @@ -138,7 +128,7 @@ public class ExecutionBlockContext { // initialize DFS and LocalFileSystems this.taskOwner = taskOwner; - this.stub = getRpcClient().getStub(); + this.stub = queryMasterClient.getStub(); this.reporter.startReporter(); // resource intiailization try{ @@ -157,12 +147,8 @@ public class ExecutionBlockContext { return resource; } - private NettyClientBase getRpcClient() - throws NoSuchMethodException, ConnectException, ClassNotFoundException { - if (client != null) return client; - - client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, true); - return client; + private AsyncRpcClient getRpcClient() { + return queryMasterClient; } public Interface getStub() { @@ -199,7 +185,7 @@ public class ExecutionBlockContext { tasks.clear(); taskHistories.clear(); resource.release(); - RpcClientManager.cleanup(client); + RpcClientManager.cleanup(queryMasterClient); } public TajoConf getConf() { @@ -256,21 +242,6 @@ public class ExecutionBlockContext { return tasks.get(taskAttemptId); } - @Deprecated - public void stopTaskRunner(String id){ - manager.stopTaskRunner(id); - } - - @Deprecated - public TaskRunner getTaskRunner(String taskRunnerId){ - return manager.getTaskRunner(taskRunnerId); - } - - @Deprecated - public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, TaskHistory taskHistory) { - histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory); - } - public void addTaskHistory(TaskId taskId, TaskHistory taskHistory) { taskHistories.put(taskId, taskHistory); } @@ -287,12 +258,15 @@ public class ExecutionBlockContext { .setId(taskAttemptId.getProto()) .setErrorMessage(message); - getStub().fatalError(null, builder.build(), NullCallback.get()); - } - - public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){ - histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId)); - return histories.get(runner.getId()); + try { + //If QueryMaster does not responding, current execution block should be stop + CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); + getStub().fatalError(callFuture.getController(), builder.build(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Exception e) { + getWorkerContext().getTaskManager().getDispatcher().getEventHandler() + .handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e)); + } } public TajoWorker.WorkerContext getWorkerContext(){ http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 89c3404..07a9ba6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -22,10 +22,10 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ResourceProtos.FetchProto; import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Repartitioner; +import org.apache.tajo.querymaster.Task; import org.apache.tajo.util.TUtil; import java.net.URI; @@ -37,7 +37,7 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; /** * <code>FetchImpl</code> information to indicate the locations of intermediate data. */ -public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cloneable { +public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { private Task.PullHost host; // The pull server host information private ShuffleType type; // hash or range partition method. private ExecutionBlockId executionBlockId; // The executionBlock id @@ -57,7 +57,7 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl attemptIds = new ArrayList<Integer>(); } - public FetchImpl(TajoWorkerProtocol.FetchProto proto) { + public FetchImpl(FetchProto proto) { this(new Task.PullHost(proto.getHost(), proto.getPort()), proto.getType(), new ExecutionBlockId(proto.getExecutionBlockId()), @@ -112,8 +112,8 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl } @Override - public TajoWorkerProtocol.FetchProto getProto() { - TajoWorkerProtocol.FetchProto.Builder builder = TajoWorkerProtocol.FetchProto.newBuilder(); + public FetchProto getProto() { + FetchProto.Builder builder = FetchProto.newBuilder(); builder.setHost(host.getHost()); builder.setPort(host.getPort());
