http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java deleted file mode 100644 index f97ce29..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java +++ /dev/null @@ -1,844 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import io.netty.handler.codec.http.QueryStringDecoder; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.TajoProtos.TaskAttemptState; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.physical.PhysicalExec; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.engine.query.TaskRequest; -import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.*; -import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.plan.function.python.TajoScriptEngine; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.serder.LogicalNodeDeserializer; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.pullserver.TajoPullServerService; -import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.NetUtils; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.URI; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; - -@Deprecated -public class LegacyTaskImpl implements Task { - private static final Log LOG = LogFactory.getLog(LegacyTaskImpl.class); - private static final float FETCHER_PROGRESS = 0.5f; - - private final TajoConf systemConf; - private final QueryContext queryContext; - private final ExecutionBlockContext executionBlockContext; - private final String taskRunnerId; - - private final Path taskDir; - private final TaskRequest request; - private TaskAttemptContext context; - private List<Fetcher> fetcherRunners; - private LogicalNode plan; - private final Map<String, TableDesc> descs = Maps.newHashMap(); - private PhysicalExec executor; - private boolean interQuery; - private Path inputTableBaseDir; - - private long startTime; - private long finishTime; - - private final TableStats inputStats; - private List<FileChunk> localChunks; - - // TODO - to be refactored - private ShuffleType shuffleType = null; - private Schema finalSchema = null; - private TupleComparator sortComp = null; - - public LegacyTaskImpl(String taskRunnerId, - Path baseDir, - TaskAttemptId taskId, - final ExecutionBlockContext executionBlockContext, - final TaskRequest request) throws IOException { - this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request); - } - - public LegacyTaskImpl(String taskRunnerId, - Path baseDir, - TaskAttemptId taskId, - TajoConf conf, - final ExecutionBlockContext executionBlockContext, - final TaskRequest request) throws IOException { - this.taskRunnerId = taskRunnerId; - this.request = request; - - this.systemConf = conf; - this.queryContext = request.getQueryContext(systemConf); - this.executionBlockContext = executionBlockContext; - this.taskDir = StorageUtil.concatPath(baseDir, - taskId.getTaskId().getId() + "_" + taskId.getId()); - - this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, - request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); - this.context.setDataChannel(request.getDataChannel()); - this.context.setEnforcer(request.getEnforcer()); - this.context.setState(TaskAttemptState.TA_PENDING); - this.inputStats = new TableStats(); - this.fetcherRunners = Lists.newArrayList(); - } - - public void initPlan() throws IOException { - plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); - LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); - if (scanNode != null) { - for (LogicalNode node : scanNode) { - ScanNode scan = (ScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } - - LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); - if (partitionScanNode != null) { - for (LogicalNode node : partitionScanNode) { - PartitionedTableScanNode scan = (PartitionedTableScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } - - interQuery = request.getProto().getInterQuery(); - if (interQuery) { - context.setInterQuery(); - this.shuffleType = context.getDataChannel().getShuffleType(); - - if (shuffleType == ShuffleType.RANGE_SHUFFLE) { - SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); - this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); - this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); - } - } else { - Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get()) - .getAppenderFilePath(getId(), queryContext.getStagingDir()); - LOG.info("Output File Path: " + outFilePath); - context.setOutputPath(outFilePath); - } - - this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); - LOG.info("=================================="); - LOG.info("* Stage " + request.getId() + " is initialized"); - LOG.info("* InterQuery: " + interQuery - + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + - ", Fragments (num: " + request.getFragments().size() + ")" + - ", Fetches (total:" + request.getFetches().size() + ") :"); - - if(LOG.isDebugEnabled()) { - for (FetchImpl f : request.getFetches()) { - LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); - } - } - LOG.info("* Local task dir: " + taskDir); - if(LOG.isDebugEnabled()) { - LOG.debug("* plan:\n"); - LOG.debug(plan.toString()); - } - LOG.info("=================================="); - } - - private void startScriptExecutors() throws IOException { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.start(systemConf); - } - } - - private void stopScriptExecutors() { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.shutdown(); - } - } - - @Override - public void init() throws IOException { - initPlan(); - startScriptExecutors(); - - if (context.getState() == TaskAttemptState.TA_PENDING) { - // initialize a task temporal dir - FileSystem localFS = executionBlockContext.getLocalFS(); - localFS.mkdirs(taskDir); - - if (request.getFetches().size() > 0) { - inputTableBaseDir = localFS.makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( - getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); - localFS.mkdirs(inputTableBaseDir); - Path tableDir; - for (String inputTable : context.getInputTables()) { - tableDir = new Path(inputTableBaseDir, inputTable); - if (!localFS.exists(tableDir)) { - LOG.info("the directory is created " + tableDir.toUri()); - localFS.mkdirs(tableDir); - } - } - } - // for localizing the intermediate data - fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); - } - } - - private TaskAttemptId getId() { - return context.getTaskId(); - } - - public String toString() { - return "queryId: " + this.getId() + " status: " + context.getState(); - } - - @Override - public boolean isStopped() { - return context.isStopped(); - } - - @Override - public TaskAttemptContext getTaskContext() { - return context; - } - - @Override - public ExecutionBlockContext getExecutionBlockContext() { - return executionBlockContext; - } - - @Override - public boolean hasFetchPhase() { - return fetcherRunners.size() > 0; - } - - @Override - public void fetch() { - ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); - for (Fetcher f : fetcherRunners) { - executorService.submit(new FetchRunner(context, f)); - } - } - - @Override - public void kill() { - stopScriptExecutors(); - context.setState(TaskAttemptState.TA_KILLED); - context.stop(); - } - - @Override - public void abort() { - stopScriptExecutors(); - context.setState(TajoProtos.TaskAttemptState.TA_FAILED); - context.stop(); - } - - @Override - public TaskStatusProto getReport() { - TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); - builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); - builder.setId(context.getTaskId().getProto()) - .setProgress(context.getProgress()) - .setState(context.getState()); - - builder.setInputStats(reloadInputStats()); - - if (context.getResultStats() != null) { - builder.setResultStats(context.getResultStats().getProto()); - } - return builder.build(); - } - - @Override - public boolean isProgressChanged() { - return context.isProgressChanged(); - } - - @Override - public void updateProgress() { - if(context != null && context.isStopped()){ - return; - } - - if (executor != null && context.getProgress() < 1.0f) { - context.setExecutorProgress(executor.getProgress()); - } - } - - private CatalogProtos.TableStatsProto reloadInputStats() { - synchronized(inputStats) { - if (this.executor == null) { - return inputStats.getProto(); - } - - TableStats executorInputStats = this.executor.getInputStats(); - - if (executorInputStats != null) { - inputStats.setValues(executorInputStats); - } - return inputStats.getProto(); - } - } - - private TaskCompletionReport getTaskCompletionReport() { - TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); - builder.setId(context.getTaskId().getProto()); - - builder.setInputStats(reloadInputStats()); - - if (context.hasResultStats()) { - builder.setResultStats(context.getResultStats().getProto()); - } else { - builder.setResultStats(new TableStats().getProto()); - } - - Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); - if (it.hasNext()) { - do { - Entry<Integer, String> entry = it.next(); - ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); - part.setPartId(entry.getKey()); - - // Set output volume - if (context.getPartitionOutputVolume() != null) { - for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { - if (entry.getKey().equals(e.getKey())) { - part.setVolume(e.getValue().longValue()); - break; - } - } - } - - builder.addShuffleFileOutputs(part.build()); - } while (it.hasNext()); - } - - return builder.build(); - } - - private void waitForFetch() throws InterruptedException, IOException { - context.getFetchLatch().await(); - LOG.info(context.getTaskId() + " All fetches are done!"); - Collection<String> inputs = Lists.newArrayList(context.getInputTables()); - - // Get all broadcasted tables - Set<String> broadcastTableNames = new HashSet<String>(); - List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); - if (broadcasts != null) { - for (EnforceProperty eachBroadcast : broadcasts) { - broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); - } - } - - // localize the fetched data and skip the broadcast table - for (String inputTable: inputs) { - if (broadcastTableNames.contains(inputTable)) { - continue; - } - File tableDir = new File(context.getFetchIn(), inputTable); - FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); - context.updateAssignedFragments(inputTable, frags); - } - } - - @Override - public void run() throws Exception { - startTime = System.currentTimeMillis(); - Throwable error = null; - try { - if(!context.isStopped()) { - context.setState(TaskAttemptState.TA_RUNNING); - if (context.hasFetchPhase()) { - // If the fetch is still in progress, the query unit must wait for - // complete. - waitForFetch(); - context.setFetcherProgress(FETCHER_PROGRESS); - updateProgress(); - } - - this.executor = executionBlockContext.getTQueryEngine(). - createPlan(context, plan); - this.executor.init(); - - while(!context.isStopped() && executor.next() != null) { - } - } - } catch (Throwable e) { - error = e ; - LOG.error(e.getMessage(), e); - stopScriptExecutors(); - context.stop(); - } finally { - if (executor != null) { - try { - executor.close(); - reloadInputStats(); - } catch (IOException e) { - LOG.error(e, e); - } - this.executor = null; - } - - executionBlockContext.completedTasksNum.incrementAndGet(); - context.getHashShuffleAppenderManager().finalizeTask(getId()); - - QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); - if (context.isStopped()) { - context.setExecutorProgress(0.0f); - - if (context.getState() == TaskAttemptState.TA_KILLED) { - queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); - executionBlockContext.killedTasksNum.incrementAndGet(); - } else { - context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); - } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); - } - - queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); - executionBlockContext.failedTasksNum.incrementAndGet(); - } - } else { - // if successful - context.stop(); - context.setProgress(1.0f); - context.setState(TaskAttemptState.TA_SUCCEEDED); - executionBlockContext.succeededTasksNum.incrementAndGet(); - - TaskCompletionReport report = getTaskCompletionReport(); - queryMasterStub.done(null, report, NullCallback.get()); - } - finishTime = System.currentTimeMillis(); - LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + - ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() - + ", killed: " + executionBlockContext.killedTasksNum.intValue() - + ", failed: " + executionBlockContext.failedTasksNum.intValue()); - } - } - - @Override - public void cleanup() { - TaskHistory taskHistory = createTaskHistory(); - executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); - executionBlockContext.getTasks().remove(getId()); - - fetcherRunners.clear(); - fetcherRunners = null; - try { - if(executor != null) { - executor.close(); - executor = null; - } - } catch (IOException e) { - LOG.fatal(e.getMessage(), e); - } - - executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); - stopScriptExecutors(); - } - - public TaskHistory createTaskHistory() { - TaskHistory taskHistory = null; - try { - taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(), - startTime, finishTime, reloadInputStats()); - - if (context.getOutputPath() != null) { - taskHistory.setOutputPath(context.getOutputPath().toString()); - } - - if (context.getWorkDir() != null) { - taskHistory.setWorkingPath(context.getWorkDir().toString()); - } - - if (context.getResultStats() != null) { - taskHistory.setOutputStats(context.getResultStats().getProto()); - } - - if (hasFetchPhase()) { - taskHistory.setTotalFetchCount(fetcherRunners.size()); - int i = 0; - FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); - for (Fetcher fetcher : fetcherRunners) { - // TODO store the fetcher histories - if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { - builder.setStartTime(fetcher.getStartTime()); - builder.setFinishTime(fetcher.getFinishTime()); - builder.setFileLength(fetcher.getFileLen()); - builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); - builder.setState(fetcher.getState()); - - taskHistory.addFetcherHistory(builder.build()); - } - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; - } - taskHistory.setFinishedFetchCount(i); - } - } catch (Exception e) { - LOG.warn(e.getMessage(), e); - } - - return taskHistory; - } - - public int hashCode() { - return context.hashCode(); - } - - public boolean equals(Object obj) { - if (obj instanceof LegacyTaskImpl) { - LegacyTaskImpl other = (LegacyTaskImpl) obj; - return this.context.equals(other.context); - } - return false; - } - - private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) - throws IOException { - Configuration c = new Configuration(systemConf); - c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); - FileSystem fs = FileSystem.get(c); - Path tablePath = new Path(file.getAbsolutePath()); - - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus f : fileLists) { - if (f.getLen() == 0) { - continue; - } - tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); - listTablets.add(tablet); - } - - // Special treatment for locally pseudo fetched chunks - synchronized (localChunks) { - for (FileChunk chunk : localChunks) { - if (name.equals(chunk.getEbId())) { - tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); - listTablets.add(tablet); - LOG.info("One local chunk is added to listTablets"); - } - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - private class FetchRunner implements Runnable { - private final TaskAttemptContext ctx; - private final Fetcher fetcher; - private int maxRetryNum; - - public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { - this.ctx = ctx; - this.fetcher = fetcher; - this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); - } - - @Override - public void run() { - int retryNum = 0; - int retryWaitTime = 1000; //sec - - try { // for releasing fetch latch - while(!context.isStopped() && retryNum < maxRetryNum) { - if (retryNum > 0) { - try { - Thread.sleep(retryWaitTime); - retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds - } catch (InterruptedException e) { - LOG.error(e); - } - LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); - } - try { - FileChunk fetched = fetcher.get(); - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null - && fetched.getFile() != null) { - if (fetched.fromRemote() == false) { - localChunks.add(fetched); - LOG.info("Add a new FileChunk to local chunk list"); - } - break; - } - } catch (Throwable e) { - LOG.error("Fetch failed: " + fetcher.getURI(), e); - } - retryNum++; - } - } finally { - if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ - fetcherFinished(ctx); - } else { - if (retryNum == maxRetryNum) { - LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); - } - stopScriptExecutors(); - context.stop(); // retry task - ctx.getFetchLatch().countDown(); - } - } - } - } - - @VisibleForTesting - public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { - if (totalFetcher > 0) { - return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; - } else { - return 0.0f; - } - } - - private synchronized void fetcherFinished(TaskAttemptContext ctx) { - int fetcherSize = fetcherRunners.size(); - if(fetcherSize == 0) { - return; - } - - ctx.getFetchLatch().countDown(); - - int remainFetcher = (int) ctx.getFetchLatch().getCount(); - if (remainFetcher == 0) { - context.setFetcherProgress(FETCHER_PROGRESS); - } else { - context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); - } - } - - private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, - List<FetchImpl> fetches) throws IOException { - - if (fetches.size() > 0) { - Path inputDir = executionBlockContext.getLocalDirAllocator(). - getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); - - int i = 0; - File storeDir; - File defaultStoreFile; - FileChunk storeChunk = null; - List<Fetcher> runnerList = Lists.newArrayList(); - - for (FetchImpl f : fetches) { - storeDir = new File(inputDir.toString(), f.getName()); - if (!storeDir.exists()) { - storeDir.mkdirs(); - } - - for (URI uri : f.getURIs()) { - defaultStoreFile = new File(storeDir, "in_" + i); - InetAddress address = InetAddress.getByName(uri.getHost()); - - WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); - if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { - boolean hasError = false; - try { - LOG.info("Try to get local file chunk at local host"); - storeChunk = getLocalStoredFileChunk(uri, systemConf); - } catch (Throwable t) { - hasError = true; - } - - // When a range request is out of range, storeChunk will be NULL. This case is normal state. - // So, we should skip and don't need to create storeChunk. - if (storeChunk == null && !hasError) { - continue; - } - - if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 - && hasError == false) { - storeChunk.setFromRemote(false); - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); - } - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); - } - - // If we decide that intermediate data should be really fetched from a remote host, storeChunk - // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it - storeChunk.setEbId(f.getName()); - Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); - LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); - runnerList.add(fetcher); - i++; - } - } - ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); - return runnerList; - } else { - return Lists.newArrayList(); - } - } - - private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { - // Parse the URI - LOG.info("getLocalStoredFileChunk starts"); - final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); - final List<String> types = params.get("type"); - final List<String> qids = params.get("qid"); - final List<String> taskIdList = params.get("ta"); - final List<String> stageIds = params.get("sid"); - final List<String> partIds = params.get("p"); - final List<String> offsetList = params.get("offset"); - final List<String> lengthList = params.get("length"); - - if (types == null || stageIds == null || qids == null || partIds == null) { - LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); - return null; - } - - if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { - LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); - return null; - } - - String queryId = qids.get(0); - String shuffleType = types.get(0); - String sid = stageIds.get(0); - String partId = partIds.get(0); - - if (shuffleType.equals("r") && taskIdList == null) { - LOG.error("Invalid URI - For range shuffle, taskId is required"); - return null; - } - List<String> taskIds = splitMaps(taskIdList); - - FileChunk chunk = null; - long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; - long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; - - LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId - + ", taskIds=" + taskIdList); - - // The working directory of Tajo worker for each query, including stage - String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; - - // If the stage requires a range shuffle - if (shuffleType.equals("r")) { - String ta = taskIds.get(0); - if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { - LOG.warn("Range shuffle - file not exist"); - return null; - } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); - String startKey = params.get("start").get(0); - String endKey = params.get("end").get(0); - boolean last = params.get("final") != null; - - try { - chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error("getFileChunks() throws exception"); - return null; - } - - // If the stage requires a hash shuffle or a scattered hash shuffle - } else if (shuffleType.equals("h") || shuffleType.equals("s")) { - int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); - String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; - if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { - LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); - return null; - } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); - File file = new File(path.toUri()); - long startPos = (offset >= 0 && length >= 0) ? offset : 0; - long readLen = (offset >= 0 && length >= 0) ? length : file.length(); - - if (startPos >= file.length()) { - LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); - return null; - } - chunk = new FileChunk(file, startPos, readLen); - - } else { - LOG.error("Unknown shuffle type"); - return null; - } - - return chunk; - } - - private List<String> splitMaps(List<String> mapq) { - if (null == mapq) { - return null; - } - final List<String> ret = new ArrayList<String>(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); - } - return ret; - } - - public static Path getTaskAttemptDir(TaskAttemptId quid) { - Path workDir = - StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), - String.valueOf(quid.getTaskId().getId()), - String.valueOf(quid.getId())); - return workDir; - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java index e763d13..0580ebc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java @@ -30,35 +30,38 @@ import org.apache.tajo.resource.NodeResources; import org.apache.tajo.storage.DiskUtil; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.*; -import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ResourceProtos.*; public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceEvent> { private static final Log LOG = LogFactory.getLog(NodeResourceManager.class); private final Dispatcher dispatcher; - private final EventHandler taskEventHandler; + private final TajoWorker.WorkerContext workerContext; + private final AtomicInteger runningQueryMasters = new AtomicInteger(0); private NodeResource totalResource; private NodeResource availableResource; private TajoConf tajoConf; + private boolean enableTest; - public NodeResourceManager(Dispatcher dispatcher, EventHandler taskEventHandler) { + public NodeResourceManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) { super(NodeResourceManager.class.getName()); this.dispatcher = dispatcher; - this.taskEventHandler = taskEventHandler; + this.workerContext = workerContext; } @Override protected void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - this.tajoConf = (TajoConf)conf; + this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); this.totalResource = createWorkerResource(tajoConf); this.availableResource = NodeResources.clone(totalResource); this.dispatcher.register(NodeResourceEvent.EventType.class, this); - + validateConf(tajoConf); + this.enableTest = conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE"); super.serviceInit(conf); LOG.info("Initialized NodeResourceManager for " + totalResource); } @@ -66,63 +69,85 @@ public class NodeResourceManager extends AbstractService implements EventHandler @Override public void handle(NodeResourceEvent event) { - if (event instanceof NodeResourceAllocateEvent) { - NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event; - BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder(); - for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { - NodeResource resource = new NodeResource(request.getResource()); - if (allocate(resource)) { - if(allocateEvent.getRequest().hasExecutionBlockRequest()){ - //send ExecutionBlock start event to TaskManager - startExecutionBlock(allocateEvent.getRequest().getExecutionBlockRequest()); + switch (event.getType()) { + case ALLOCATE: { + if (event.getResourceType() == NodeResourceEvent.ResourceType.TASK) { + // allocate task resource + NodeResourceAllocateEvent allocateEvent = TUtil.checkTypeAndGet(event, NodeResourceAllocateEvent.class); + BatchAllocationResponse.Builder response = BatchAllocationResponse.newBuilder(); + for (TaskAllocationProto request : allocateEvent.getRequest().getTaskRequestList()) { + NodeResource resource = new NodeResource(request.getResource()); + if (allocate(resource)) { + //send task start event to TaskExecutor + startTask(request.getTaskRequest(), resource); + } else { + // reject the exceeded requests + response.addCancellationTask(request); + } + } + allocateEvent.getCallback().run(response.build()); + + } else if (event.getResourceType() == NodeResourceEvent.ResourceType.QUERY_MASTER) { + QMResourceAllocateEvent allocateEvent = TUtil.checkTypeAndGet(event, QMResourceAllocateEvent.class); + // allocate query master resource + + NodeResource resource = new NodeResource(allocateEvent.getRequest().getResource()); + if (allocate(resource)) { + allocateEvent.getCallback().run(TajoWorker.TRUE_PROTO); + runningQueryMasters.incrementAndGet(); + } else { + allocateEvent.getCallback().run(TajoWorker.FALSE_PROTO); } - - //send task start event to TaskExecutor - startTask(request.getTaskRequest(), resource); - } else { - // reject the exceeded requests - response.addCancellationTask(request); } + break; } - allocateEvent.getCallback().run(response.build()); + case DEALLOCATE: { + NodeResourceDeallocateEvent deallocateEvent = TUtil.checkTypeAndGet(event, NodeResourceDeallocateEvent.class); + release(deallocateEvent.getResource()); - } else if (event instanceof NodeResourceDeallocateEvent) { - NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; - release(deallocateEvent.getResource()); - - // send current resource to ResourceTracker - getDispatcher().getEventHandler().handle( - new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE)); + if (deallocateEvent.getResourceType() == NodeResourceEvent.ResourceType.QUERY_MASTER) { + runningQueryMasters.decrementAndGet(); + } + // send current resource to ResourceTracker + getDispatcher().getEventHandler().handle( + new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE)); + break; + } } } - protected Dispatcher getDispatcher() { + public Dispatcher getDispatcher() { return dispatcher; } - protected NodeResource getTotalResource() { + public NodeResource getTotalResource() { return totalResource; } - protected NodeResource getAvailableResource() { + public NodeResource getAvailableResource() { return availableResource; } + public int getRunningQueryMasters() { + return runningQueryMasters.get(); + } + private boolean allocate(NodeResource resource) { - //TODO consider the jvm free memory - if (NodeResources.fitsIn(resource, availableResource)) { + + if (NodeResources.fitsIn(resource, availableResource) && checkFreeHeapMemory(resource)) { NodeResources.subtractFrom(availableResource, resource); return true; } return false; } - protected void startExecutionBlock(RunExecutionBlockRequestProto request) { - taskEventHandler.handle(new ExecutionBlockStartEvent(request)); + private boolean checkFreeHeapMemory(NodeResource resource) { + //TODO consider the jvm free memory + return true; } protected void startTask(TaskRequestProto request, NodeResource resource) { - taskEventHandler.handle(new TaskStartEvent(request, resource)); + workerContext.getTaskManager().getDispatcher().getEventHandler().handle(new TaskStartEvent(request, resource)); } private void release(NodeResource resource) { @@ -130,17 +155,19 @@ public class NodeResourceManager extends AbstractService implements EventHandler } private NodeResource createWorkerResource(TajoConf conf) { - int memoryMb; - if (conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { - memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); - } else { - memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), - conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); + int memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); + if (!enableTest) { + // Set memory resource to max heap + int maxHeap = (int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB); + if(maxHeap > memoryMb) { + memoryMb = maxHeap; + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, memoryMb); + } } int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); - int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM); + int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { @@ -150,4 +177,23 @@ public class NodeResourceManager extends AbstractService implements EventHandler int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM); return NodeResource.createResource(memoryMb, disks * diskParallels, vCores); } + + private void validateConf(TajoConf conf) { + // validate node memory allocation setting + int minMem = conf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); + int minQMMem = conf.getIntVar(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY); + int maxMem = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); + + if (minMem <= 0 || minQMMem <= 0 || minMem + minQMMem > maxMem) { + throw new RuntimeException("Invalid resource worker memory" + + " allocation configuration" + + ", " + TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.varname + + "=" + minMem + + ", " + TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.varname + + "=" + minQMMem + + ", " + TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname + + "=" + maxMem + ", min and max should be greater than 0" + + ", max should be no smaller than min."); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java index d13cd50..5d91cc6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -27,11 +27,15 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.resource.NodeResource; -import org.apache.tajo.rpc.*; +import org.apache.tajo.resource.DefaultResourceCalculator; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeStatusEvent; import java.net.ConnectException; @@ -42,8 +46,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - -import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; +import static org.apache.tajo.ResourceProtos.*; /** * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. @@ -55,39 +58,42 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N private TajoConf tajoConf; private StatusUpdaterThread updaterThread; private volatile boolean isStopped; - private volatile long heartBeatInterval; + private int heartBeatInterval; + private int nextHeartBeatInterval; private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue; private final TajoWorker.WorkerContext workerContext; - private final NodeResourceManager nodeResourceManager; private AsyncRpcClient rmClient; private ServiceTracker serviceTracker; - private TajoResourceTrackerProtocolService.Interface resourceTracker; - private int queueingLimit; + private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface resourceTracker; + private int queueingThreshold; - public NodeStatusUpdater(TajoWorker.WorkerContext workerContext, NodeResourceManager resourceManager) { + public NodeStatusUpdater(TajoWorker.WorkerContext workerContext) { super(NodeStatusUpdater.class.getSimpleName()); this.workerContext = workerContext; - this.nodeResourceManager = resourceManager; } @Override public void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - this.tajoConf = (TajoConf) conf; + + this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue(); this.serviceTracker = ServiceTrackerFactory.get(tajoConf); - this.nodeResourceManager.getDispatcher().register(NodeStatusEvent.EventType.class, this); - this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL); + this.workerContext.getNodeResourceManager().getDispatcher().register(NodeStatusEvent.EventType.class, this); + this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL); this.updaterThread = new StatusUpdaterThread(); + this.updaterThread.setName("NodeStatusUpdater"); super.serviceInit(conf); } @Override public void serviceStart() throws Exception { // if resource changed over than 50%, send reports - this.queueingLimit = nodeResourceManager.getTotalResource().getVirtualCores() / 2; + DefaultResourceCalculator calculator = new DefaultResourceCalculator(); + int maxContainer = calculator.computeAvailableContainers(workerContext.getNodeResourceManager().getTotalResource(), + NodeResources.createResource(tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1)); + + this.queueingThreshold = Math.max((int) Math.floor(maxContainer * 0.5), 1); + LOG.info("Queueing threshold:" + queueingThreshold); updaterThread.start(); super.serviceStart(); @@ -97,10 +103,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N @Override public void serviceStop() throws Exception { this.isStopped = true; - synchronized (updaterThread) { updaterThread.interrupt(); - updaterThread.join(); } super.serviceStop(); LOG.info("NodeStatusUpdater stopped."); @@ -115,35 +119,30 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N return heartBeatRequestQueue.size(); } - public int getQueueingLimit() { - return queueingLimit; + public int getQueueingThreshold() { + return queueingThreshold; } - private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) { - NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); - requestProto.setAvailableResource(resource.getProto()); + private NodeHeartbeatRequest.Builder createResourceReport() { + NodeHeartbeatRequest.Builder requestProto = NodeHeartbeatRequest.newBuilder(); requestProto.setWorkerId(workerContext.getConnectionInfo().getId()); - return requestProto.build(); - } + requestProto.setAvailableResource(workerContext.getNodeResourceManager().getAvailableResource().getProto()); + requestProto.setRunningTasks(workerContext.getTaskManager().getRunningTasks()); + requestProto.setRunningQueryMasters(workerContext.getNodeResourceManager().getRunningQueryMasters()); - private NodeHeartbeatRequestProto createHeartBeatReport() { - NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); - requestProto.setWorkerId(workerContext.getConnectionInfo().getId()); - return requestProto.build(); + return requestProto; } - private NodeHeartbeatRequestProto createNodeStatusReport() { - NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); - requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto()); - requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto()); - requestProto.setWorkerId(workerContext.getConnectionInfo().getId()); + private NodeHeartbeatRequest.Builder createNodeStatusReport() { + NodeHeartbeatRequest.Builder requestProto = createResourceReport(); + requestProto.setTotalResource(workerContext.getNodeResourceManager().getTotalResource().getProto()); requestProto.setConnectionInfo(workerContext.getConnectionInfo().getProto()); //TODO set node status to requestProto.setStatus() - return requestProto.build(); + return requestProto; } - protected TajoResourceTrackerProtocolService.Interface newStub() + protected TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface newStub() throws NoSuchMethodException, ConnectException, ClassNotFoundException { RpcClientManager.cleanup(rmClient); @@ -154,15 +153,15 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N return rmClient.getStub(); } - protected NodeHeartbeatResponseProto sendHeartbeat(NodeHeartbeatRequestProto requestProto) + protected NodeHeartbeatResponse sendHeartbeat(NodeHeartbeatRequest requestProto) throws NoSuchMethodException, ClassNotFoundException, ConnectException, ExecutionException { if (resourceTracker == null) { resourceTracker = newStub(); } - NodeHeartbeatResponseProto response = null; + NodeHeartbeatResponse response = null; try { - CallFuture<NodeHeartbeatResponseProto> callBack = new CallFuture<NodeHeartbeatResponseProto>(); + CallFuture<NodeHeartbeatResponse> callBack = new CallFuture<NodeHeartbeatResponse>(); resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack); response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -190,7 +189,6 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N long deadline = System.nanoTime() + unit.toNanos(timeout); int added = 0; while (added < numElements) { - added += heartBeatRequestQueue.drainTo(buffer, numElements - added); if (added < numElements) { // not enough elements immediately available; will have to wait NodeStatusEvent e = heartBeatRequestQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); if (e == null) { @@ -200,6 +198,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N added++; if (e.getType() == NodeStatusEvent.EventType.FLUSH_REPORTS) { + added += heartBeatRequestQueue.drainTo(buffer, numElements - added); break; } } @@ -210,37 +209,39 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N /* Node sends a heartbeats with its resource and status periodically to master. */ @Override public void run() { - NodeHeartbeatResponseProto lastResponse = null; + NodeHeartbeatResponse lastResponse = null; while (!isStopped && !Thread.interrupted()) { try { if (lastResponse != null) { if (lastResponse.getCommand() == ResponseCommand.NORMAL) { List<NodeStatusEvent> events = Lists.newArrayList(); + + if(lastResponse.hasHeartBeatInterval()) { + nextHeartBeatInterval = lastResponse.getHeartBeatInterval(); + } else { + nextHeartBeatInterval = heartBeatInterval; + } + try { /* batch update to ResourceTracker */ - drain(events, Math.max(queueingLimit, 1), heartBeatInterval, TimeUnit.MILLISECONDS); + drain(events, queueingThreshold, nextHeartBeatInterval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { break; } - if (!events.isEmpty()) { - // send current available resource; - lastResponse = sendHeartbeat(createResourceReport(nodeResourceManager.getAvailableResource())); - } else { - // send ping; - lastResponse = sendHeartbeat(createHeartBeatReport()); - } + // send current available resource; + lastResponse = sendHeartbeat(createResourceReport().build()); } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) { // Membership changed - lastResponse = sendHeartbeat(createNodeStatusReport()); + lastResponse = sendHeartbeat(createNodeStatusReport().build()); } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) { //TODO abort failure queries } } else { // Node registration on startup - lastResponse = sendHeartbeat(createNodeStatusReport()); + lastResponse = sendHeartbeat(createNodeStatusReport().build()); } } catch (NoSuchMethodException nsme) { LOG.fatal(nsme.getMessage(), nsme); @@ -249,15 +250,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N LOG.fatal(cnfe.getMessage(), cnfe); Runtime.getRuntime().halt(-1); } catch (Exception e) { - LOG.error(e.getMessage(), e); - if (!isStopped) { - synchronized (updaterThread) { - try { - updaterThread.wait(heartBeatInterval); - } catch (InterruptedException ie) { - // Do Nothing - } - } + if (isStopped) { + break; + } else { + LOG.error(e.getMessage(), e); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java deleted file mode 100644 index b713e70..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.master.container.TajoContainerId; - -public interface ResourceAllocator { - public void allocateTaskWorker(); - public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerId); - public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, - int numTasks, int memoryMBPerTask); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java deleted file mode 100644 index 05dd1a9..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ /dev/null @@ -1,415 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.*; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.master.event.ContainerAllocationEvent; -import org.apache.tajo.master.event.ContainerAllocatorEventType; -import org.apache.tajo.master.event.StageContainerAllocationEvent; -import org.apache.tajo.master.rm.TajoWorkerContainer; -import org.apache.tajo.master.rm.TajoWorkerContainerId; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; -import org.apache.tajo.querymaster.QueryMasterTask; -import org.apache.tajo.querymaster.Stage; -import org.apache.tajo.querymaster.StageState; -import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.util.ApplicationIdUtils; - -import java.net.InetSocketAddress; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; - -public class TajoResourceAllocator extends AbstractResourceAllocator { - private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class); - - private TajoConf tajoConf; - private QueryMasterTask.QueryMasterTaskContext queryTaskContext; - private final ExecutorService allocationExecutor; - private final Deallocator deallocator; - - private AtomicBoolean stopped = new AtomicBoolean(false); - - public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) { - this.queryTaskContext = queryTaskContext; - allocationExecutor = Executors.newFixedThreadPool( - queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM)); - deallocator = new Deallocator(); - } - - @Override - public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerIdProto) { - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); - ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId()); - containerId.setApplicationAttemptId(appAttemptId); - containerId.setId(containerIdProto.getId()); - return containerId; - } - - @Override - public void allocateTaskWorker() { - } - - @Override - public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, - int numTasks, - int memoryMBPerTask) { - //TODO consider disk slot - - ClusterResourceSummary clusterResource = workerContext.getClusterResource(); - int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask; - clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot - LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks + - ", Number of Cluster Slots=" + clusterSlots); - return Math.min(numTasks, clusterSlots); - } - - @Override - public void init(Configuration conf) { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("conf should be a TajoConf type."); - } - tajoConf = (TajoConf)conf; - - queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher()); - - queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler()); - - deallocator.start(); - - super.init(conf); - } - - @Override - public synchronized void stop() { - if (stopped.compareAndSet(false, true)) { - return; - } - - allocationExecutor.shutdownNow(); - deallocator.shutdown(); - - Map<TajoContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator() - .getContainers(); - List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values()); - for(ContainerProxy eachProxy: list) { - try { - eachProxy.stopContainer(); - } catch (Throwable e) { - LOG.warn(e.getMessage(), e); - } - } - - workerInfoMap.clear(); - super.stop(); - } - - @Override - public void start() { - super.start(); - } - - class TajoTaskRunnerLauncher implements TaskRunnerLauncher { - @Override - public void handle(TaskRunnerGroupEvent event) { - if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) { - if (!(event instanceof LaunchTaskRunnersEvent)) { - throw new IllegalArgumentException("event should be a LaunchTaskRunnersEvent type."); - } - LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event; - launchTaskRunners(launchEvent); - } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) { - stopContainers(event.getContainers()); - stopExecutionBlock(event.getExecutionBlockId(), event.getContainers()); - } - } - } - - private void launchTaskRunners(LaunchTaskRunnersEvent event) { - // Query in standby mode doesn't need launch Worker. - // But, Assign ExecutionBlock to assigned tajo worker - for(TajoContainer eachContainer: event.getContainers()) { - TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf, - eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson()); - allocationExecutor.submit(new LaunchRunner(eachContainer.getId(), containerProxy)); - } - } - - public void stopExecutionBlock(final ExecutionBlockId executionBlockId, - Collection<TajoContainer> containers) { - Set<NodeId> workers = Sets.newHashSet(); - for (TajoContainer container : containers){ - workers.add(container.getNodeId()); - } - - for (final NodeId worker : workers) { - allocationExecutor.submit(new Runnable() { - @Override - public void run() { - stopExecutionBlock(executionBlockId, worker); - } - }); - } - } - - private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker) { - NettyClientBase tajoWorkerRpc = null; - try { - InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort()); - tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); - - tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), - NullCallback.get(PrimitiveProtos.BoolProto.class)); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } - } - - protected static class LaunchRunner implements Runnable { - private final ContainerProxy proxy; - private final TajoContainerId id; - public LaunchRunner(TajoContainerId id, ContainerProxy proxy) { - this.proxy = proxy; - this.id = id; - } - @Override - public void run() { - proxy.launch(null); - if (LOG.isDebugEnabled()) { - LOG.debug("ContainerProxy started:" + id); - } - } - } - - private void stopContainers(Collection<TajoContainer> containers) { - deallocator.submit(Iterables.transform(containers, new Function<TajoContainer, TajoContainerId>() { - public TajoContainerId apply(TajoContainer input) { return input.getId(); } - })); - } - - private static final TajoContainerId FIN = new TajoWorkerContainerId(); - - private class Deallocator extends Thread { - - private final BlockingDeque<TajoContainerId> queue = new LinkedBlockingDeque<TajoContainerId>(); - - public Deallocator() { - setName("Deallocator"); - setDaemon(true); - } - - private void submit(Iterable<TajoContainerId> container) { - queue.addAll(Lists.newArrayList(container)); - } - - private void shutdown() { - queue.add(FIN); - } - - @Override - public void run() { - final AbstractResourceAllocator allocator = queryTaskContext.getResourceAllocator(); - while (!stopped.get() || !queue.isEmpty()) { - TajoContainerId containerId; - try { - containerId = queue.take(); - } catch (InterruptedException e) { - continue; - } - if (containerId == FIN) { - break; - } - ContainerProxy proxy = allocator.getContainer(containerId); - if (proxy == null) { - continue; - } - try { - LOG.info("Stopping ContainerProxy: " + proxy.getContainerId() + "," + proxy.getBlockId()); - proxy.stopContainer(); - } catch (Exception e) { - LOG.warn("Failed to stop container " + proxy.getContainerId() + "," + proxy.getBlockId(), e); - } - } - LOG.info("Deallocator exiting"); - } - } - - class TajoWorkerAllocationHandler implements EventHandler<ContainerAllocationEvent> { - @Override - public void handle(ContainerAllocationEvent event) { - allocationExecutor.submit(new TajoWorkerAllocationThread(event)); - } - } - - class TajoWorkerAllocationThread extends Thread { - ContainerAllocationEvent event; - TajoWorkerAllocationThread(ContainerAllocationEvent event) { - this.event = event; - } - - @Override - public void run() { - LOG.info("Start TajoWorkerAllocationThread"); - CallFuture<WorkerResourceAllocationResponse> callBack = - new CallFuture<WorkerResourceAllocationResponse>(); - - //TODO consider task's resource usage pattern - int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); - float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK); - - WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() - .setMinMemoryMBPerContainer(requiredMemoryMB) - .setMaxMemoryMBPerContainer(requiredMemoryMB) - .setNumContainers(event.getRequiredNum()) - .setResourceRequestPriority(!event.isLeafQuery() ? - ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK) - .setMinDiskSlotPerContainer(requiredDiskSlots) - .setMaxDiskSlotPerContainer(requiredDiskSlots) - .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) - .build(); - - - NettyClientBase tmClient = null; - try { - ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = RpcClientManager.getInstance(). - getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.allocateWorkerResources(callBack.getController(), request, callBack); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } - - WorkerResourceAllocationResponse response = null; - while(!stopped.get()) { - try { - response = callBack.get(3, TimeUnit.SECONDS); - break; - } catch (InterruptedException e) { - if(stopped.get()) { - return; - } - } catch (TimeoutException e) { - LOG.info("No available worker resource for " + event.getExecutionBlockId()); - continue; - } catch (ExecutionException e) { - LOG.error(e.getMessage(), e); - break; - } - } - - int numAllocatedContainers = 0; - - if(response != null) { - List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList(); - ExecutionBlockId executionBlockId = event.getExecutionBlockId(); - - List<TajoContainer> containers = new ArrayList<TajoContainer>(); - for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) { - TajoWorkerContainer container = new TajoWorkerContainer(); - NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(), - eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); - - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); - - containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(), - eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId())); - containerId.setId(eachAllocatedResource.getContainerId().getId()); - - container.setId(containerId); - container.setNodeId(nodeId); - - - WorkerResource workerResource = new WorkerResource(); - workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB()); - workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots()); - - Worker worker = new Worker(null, workerResource, - new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo())); - container.setWorkerResource(worker); - addWorkerConnectionInfo(worker.getConnectionInfo()); - containers.add(container); - } - - StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState(); - if (!Stage.isRunningState(state)) { - List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>(); - for(TajoContainer eachContainer: containers) { - containerIds.add(eachContainer.getId()); - } - try { - TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds); - } catch (Throwable e) { - deallocator.submit(containerIds); - LOG.error(e.getMessage(), e); - } - return; - } - - if (allocatedResources.size() > 0) { - if(LOG.isDebugEnabled()) { - LOG.debug("StageContainerAllocationEvent fire:" + executionBlockId); - } - queryTaskContext.getEventHandler().handle(new StageContainerAllocationEvent(executionBlockId, containers)); - } - numAllocatedContainers += allocatedResources.size(); - - } - if(event.getRequiredNum() > numAllocatedContainers) { - ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent( - event.getType(), event.getExecutionBlockId(), event.getPriority(), - event.getResource(), - event.getRequiredNum() - numAllocatedContainers, - event.isLeafQuery(), event.getProgress() - ); - queryTaskContext.getEventHandler().handle(shortRequestEvent); - - } - LOG.info("Stop TajoWorkerAllocationThread"); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 66c8e4a..a95698f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -42,9 +42,7 @@ import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.service.TajoMasterInfo; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.querymaster.QueryMaster; import org.apache.tajo.querymaster.QueryMasterManagerService; @@ -70,7 +68,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.conf.TajoConf.ConfVars; @@ -97,19 +94,19 @@ public class TajoWorker extends CompositeService { private WorkerContext workerContext; - private TaskRunnerManager taskRunnerManager; + private TaskManager taskManager; + + private TaskExecutor taskExecutor; private TajoPullServerService pullService; private ServiceTracker serviceTracker; - private WorkerHeartbeatService workerHeartbeatThread; - - private AtomicBoolean stopped = new AtomicBoolean(false); + private NodeResourceManager nodeResourceManager; - private AtomicInteger numClusterNodes = new AtomicInteger(); + private NodeStatusUpdater nodeStatusUpdater; - private ClusterResourceSummary clusterResource; + private AtomicBoolean stopped = new AtomicBoolean(false); private WorkerConnectionInfo connectionInfo; @@ -147,12 +144,9 @@ public class TajoWorker extends CompositeService { @Override public void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("conf should be a TajoConf type."); - } Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); - this.systemConf = (TajoConf)conf; + this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); RackResolver.init(systemConf); RpcClientManager rpcManager = RpcClientManager.getInstance(); @@ -165,23 +159,11 @@ public class TajoWorker extends CompositeService { this.workerContext = new TajoWorkerContext(); this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS); - - boolean randomPort = true; - if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) { - randomPort = false; - } int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort(); int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort(); int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort(); - if(randomPort) { - clientPort = 0; - peerRpcPort = 0; - qmManagerPort = 0; - systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0); - } this.dispatcher = new AsyncDispatcher(); addIfService(dispatcher); @@ -196,12 +178,19 @@ public class TajoWorker extends CompositeService { queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort); addIfService(queryMasterManagerService); - // taskrunner worker - taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher); - addService(taskRunnerManager); + this.taskManager = new TaskManager(dispatcher, workerContext); + addService(taskManager); - workerHeartbeatThread = new WorkerHeartbeatService(workerContext); - addIfService(workerHeartbeatThread); + this.taskExecutor = new TaskExecutor(workerContext); + addService(taskExecutor); + + AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + addService(rmDispatcher); + this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext); + addService(nodeResourceManager); + + this.nodeStatusUpdater = new NodeStatusUpdater(workerContext); + addService(nodeStatusUpdater); int httpPort = 0; if(!TajoPullServerService.isStandalone()) { @@ -268,8 +257,8 @@ public class TajoWorker extends CompositeService { workerSystemMetrics.register("task", "runningTasks", new Gauge<Integer>() { @Override public Integer getValue() { - if(taskRunnerManager != null) { - return taskRunnerManager.getNumTasks(); + if(taskExecutor != null) { + return taskExecutor.getRunningTasks(); } else { return 0; } @@ -394,7 +383,11 @@ public class TajoWorker extends CompositeService { QueryMasterManagerService getQueryMasterManagerService(); - TaskRunnerManager getTaskRunnerManager(); + TaskManager getTaskManager(); + + TaskExecutor getTaskExecuor(); + + NodeResourceManager getNodeResourceManager(); CatalogService getCatalog(); @@ -404,8 +397,6 @@ public class TajoWorker extends CompositeService { LocalDirAllocator getLocalDirAllocator(); - ClusterResourceSummary getClusterResource(); - TajoSystemMetrics getWorkerSystemMetrics(); HashShuffleAppenderManager getHashShuffleAppenderManager(); @@ -417,10 +408,6 @@ public class TajoWorker extends CompositeService { void cleanup(String strPath); void cleanupTemporalDirectories(); - - void setClusterResource(ClusterResourceSummary clusterResource); - - void setNumClusterNodes(int numClusterNodes); } class TajoWorkerContext implements WorkerContext { @@ -443,8 +430,19 @@ public class TajoWorker extends CompositeService { return queryMasterManagerService; } - public TaskRunnerManager getTaskRunnerManager() { - return taskRunnerManager; + @Override + public TaskManager getTaskManager(){ + return taskManager; + } + + @Override + public TaskExecutor getTaskExecuor() { + return taskExecutor; + } + + @Override + public NodeResourceManager getNodeResourceManager() { + return nodeResourceManager; } public CatalogService getCatalog() { @@ -507,22 +505,6 @@ public class TajoWorker extends CompositeService { } } - public void setNumClusterNodes(int numClusterNodes) { - TajoWorker.this.numClusterNodes.set(numClusterNodes); - } - - public void setClusterResource(ClusterResourceSummary clusterResource) { - synchronized (numClusterNodes) { - TajoWorker.this.clusterResource = clusterResource; - } - } - - public ClusterResourceSummary getClusterResource() { - synchronized (numClusterNodes) { - return TajoWorker.this.clusterResource; - } - } - public TajoSystemMetrics getWorkerSystemMetrics() { return workerSystemMetrics; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index de8afe8..7752211 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -18,26 +18,27 @@ package org.apache.tajo.worker; -import com.google.common.base.Preconditions; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; -import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.ResourceProtos.BatchAllocationRequest; +import org.apache.tajo.ResourceProtos.BatchAllocationResponse; +import org.apache.tajo.ResourceProtos.StopExecutionBlockRequest; import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.worker.event.TaskRunnerStartEvent; -import org.apache.tajo.worker.event.TaskRunnerStopEvent; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.event.ExecutionBlockStopEvent; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.apache.tajo.worker.event.QueryStopEvent; import java.net.InetSocketAddress; @@ -58,9 +59,9 @@ public class TajoWorkerManagerService extends CompositeService } @Override - public void init(Configuration conf) { - Preconditions.checkArgument(conf instanceof TajoConf); - TajoConf tajoConf = (TajoConf) conf; + public void serviceInit(Configuration conf) throws Exception { + + TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); try { // Setup RPC server InetSocketAddress initIsa = @@ -81,21 +82,16 @@ public class TajoWorkerManagerService extends CompositeService // Get the master address LOG.info("TajoWorkerManagerService is bind to " + bindAddr); tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr)); - super.init(tajoConf); + super.serviceInit(tajoConf); } @Override - public void start() { - super.start(); - } - - @Override - public void stop() { + public void serviceStop() throws Exception { if(rpcServer != null) { rpcServer.shutdown(); } LOG.info("TajoWorkerManagerService stopped"); - super.stop(); + super.serviceStop(); } public InetSocketAddress getBindAddr() { @@ -110,29 +106,23 @@ public class TajoWorkerManagerService extends CompositeService } @Override - public void startExecutionBlock(RpcController controller, - TajoWorkerProtocol.RunExecutionBlockRequestProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc(); - - try { - workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(request)); - done.run(TajoWorker.TRUE_PROTO); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - controller.setFailed(t.getMessage()); - done.run(TajoWorker.FALSE_PROTO); - } + public void allocateTasks(RpcController controller, + BatchAllocationRequest request, + RpcCallback<BatchAllocationResponse> done) { + workerContext.getWorkerSystemMetrics().counter("query", "allocationRequestNum").inc(); + workerContext.getNodeResourceManager().getDispatcher(). + getEventHandler().handle(new NodeResourceAllocateEvent(request, done)); } @Override public void stopExecutionBlock(RpcController controller, - TajoIdProtos.ExecutionBlockIdProto requestProto, + StopExecutionBlockRequest requestProto, RpcCallback<PrimitiveProtos.BoolProto> done) { try { - workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStopEvent( - new ExecutionBlockId(requestProto) - )); + + workerContext.getTaskManager().getDispatcher().getEventHandler().handle( + new ExecutionBlockStopEvent(requestProto.getExecutionBlockId(), requestProto.getCleanupList())); + done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -144,29 +134,18 @@ public class TajoWorkerManagerService extends CompositeService @Override public void killTaskAttempt(RpcController controller, TajoIdProtos.TaskAttemptIdProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { - Task task = workerContext.getTaskRunnerManager().getTaskByTaskAttemptId(new TaskAttemptId(request)); + //TODO change to async ? + Task task = workerContext.getTaskManager().getTaskByTaskAttemptId(new TaskAttemptId(request)); if(task != null) task.kill(); done.run(TajoWorker.TRUE_PROTO); } @Override - public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request, + public void stopQuery(RpcController controller, TajoIdProtos.QueryIdProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { - workerContext.cleanup(new QueryId(request).toString()); - done.run(TajoWorker.TRUE_PROTO); - } - @Override - public void cleanupExecutionBlocks(RpcController controller, - TajoWorkerProtocol.ExecutionBlockListProto ebIds, - RpcCallback<PrimitiveProtos.BoolProto> done) { - for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : ebIds.getExecutionBlockIdList()) { - String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); - workerContext.cleanup(inputDir); - String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); - workerContext.cleanup(outputDir); - } + workerContext.getTaskManager().getDispatcher().getEventHandler().handle(new QueryStopEvent(new QueryId(request))); done.run(TajoWorker.TRUE_PROTO); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index c849940..66216ee 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -18,9 +18,10 @@ package org.apache.tajo.worker; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ResourceProtos.TaskStatusProto; import java.io.IOException; +import java.util.List; public interface Task { @@ -48,5 +49,9 @@ public interface Task { ExecutionBlockContext getExecutionBlockContext(); - TajoWorkerProtocol.TaskStatusProto getReport(); + TaskStatusProto getReport(); + + TaskHistory createTaskHistory(); + + List<Fetcher> getFetchers(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index 2576726..761bf52 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -20,7 +20,6 @@ package org.apache.tajo.worker; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.TajoProtos; /** * The driver class for Tajo Task processing.
