http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 4716dcc..c849940 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -18,839 +18,35 @@ package org.apache.tajo.worker; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import io.netty.handler.codec.http.QueryStringDecoder; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.TajoProtos.TaskAttemptState; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.physical.PhysicalExec; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.engine.query.TaskRequest; -import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.*; -import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.plan.function.python.TajoScriptEngine; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.serder.LogicalNodeDeserializer; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.pullserver.TajoPullServerService; -import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.NetUtils; +import org.apache.tajo.ipc.TajoWorkerProtocol; -import java.io.File; import java.io.IOException; -import java.net.InetAddress; -import java.net.URI; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; +public interface Task { -public class Task { - private static final Log LOG = LogFactory.getLog(Task.class); - private static final float FETCHER_PROGRESS = 0.5f; + void init() throws IOException; - private final TajoConf systemConf; - private final QueryContext queryContext; - private final ExecutionBlockContext executionBlockContext; - private final TaskAttemptId taskId; - private final String taskRunnerId; + void fetch(); - private final Path taskDir; - private final TaskRequest request; - private TaskAttemptContext context; - private List<Fetcher> fetcherRunners; - private LogicalNode plan; - private final Map<String, TableDesc> descs = Maps.newHashMap(); - private PhysicalExec executor; - private boolean interQuery; - private Path inputTableBaseDir; + void run() throws Exception; - private long startTime; - private long finishTime; + void kill(); - private final TableStats inputStats; - private List<FileChunk> localChunks; + void abort(); - // TODO - to be refactored - private ShuffleType shuffleType = null; - private Schema finalSchema = null; - private TupleComparator sortComp = null; + void cleanup(); - public Task(String taskRunnerId, - Path baseDir, - TaskAttemptId taskId, - final ExecutionBlockContext executionBlockContext, - final TaskRequest request) throws IOException { - this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request); - } + boolean hasFetchPhase(); - public Task(String taskRunnerId, - Path baseDir, - TaskAttemptId taskId, - TajoConf conf, - final ExecutionBlockContext executionBlockContext, - final TaskRequest request) throws IOException { - this.taskRunnerId = taskRunnerId; - this.request = request; - this.taskId = taskId; + boolean isProgressChanged(); - this.systemConf = conf; - this.queryContext = request.getQueryContext(systemConf); - this.executionBlockContext = executionBlockContext; - this.taskDir = StorageUtil.concatPath(baseDir, - taskId.getTaskId().getId() + "_" + taskId.getId()); + boolean isStopped(); - this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, - request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); - this.context.setDataChannel(request.getDataChannel()); - this.context.setEnforcer(request.getEnforcer()); - this.context.setState(TaskAttemptState.TA_PENDING); - this.inputStats = new TableStats(); - this.fetcherRunners = Lists.newArrayList(); - } + void updateProgress(); - public void initPlan() throws IOException { - plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); - LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); - if (scanNode != null) { - for (LogicalNode node : scanNode) { - ScanNode scan = (ScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } + TaskAttemptContext getTaskContext(); - LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); - if (partitionScanNode != null) { - for (LogicalNode node : partitionScanNode) { - PartitionedTableScanNode scan = (PartitionedTableScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } + ExecutionBlockContext getExecutionBlockContext(); - interQuery = request.getProto().getInterQuery(); - if (interQuery) { - context.setInterQuery(); - this.shuffleType = context.getDataChannel().getShuffleType(); - - if (shuffleType == ShuffleType.RANGE_SHUFFLE) { - SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); - this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); - this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); - } - } else { - Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf)) - .getAppenderFilePath(taskId, queryContext.getStagingDir()); - LOG.info("Output File Path: " + outFilePath); - context.setOutputPath(outFilePath); - } - - this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); - LOG.info("=================================="); - LOG.info("* Stage " + request.getId() + " is initialized"); - LOG.info("* InterQuery: " + interQuery - + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + - ", Fragments (num: " + request.getFragments().size() + ")" + - ", Fetches (total:" + request.getFetches().size() + ") :"); - - if(LOG.isDebugEnabled()) { - for (FetchImpl f : request.getFetches()) { - LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); - } - } - LOG.info("* Local task dir: " + taskDir); - if(LOG.isDebugEnabled()) { - LOG.debug("* plan:\n"); - LOG.debug(plan.toString()); - } - LOG.info("=================================="); - } - - private void startScriptExecutors() throws IOException { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.start(systemConf); - } - } - - private void stopScriptExecutors() { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.shutdown(); - } - } - - public void init() throws IOException { - initPlan(); - startScriptExecutors(); - - if (context.getState() == TaskAttemptState.TA_PENDING) { - // initialize a task temporal dir - FileSystem localFS = executionBlockContext.getLocalFS(); - localFS.mkdirs(taskDir); - - if (request.getFetches().size() > 0) { - inputTableBaseDir = localFS.makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( - getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); - localFS.mkdirs(inputTableBaseDir); - Path tableDir; - for (String inputTable : context.getInputTables()) { - tableDir = new Path(inputTableBaseDir, inputTable); - if (!localFS.exists(tableDir)) { - LOG.info("the directory is created " + tableDir.toUri()); - localFS.mkdirs(tableDir); - } - } - } - // for localizing the intermediate data - fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); - } - } - - public TaskAttemptId getTaskId() { - return taskId; - } - - public TaskAttemptId getId() { - return context.getTaskId(); - } - - public TaskAttemptState getStatus() { - return context.getState(); - } - - public String toString() { - return "queryId: " + this.getId() + " status: " + this.getStatus(); - } - - public void setState(TaskAttemptState status) { - context.setState(status); - } - - public TaskAttemptContext getContext() { - return context; - } - - public boolean hasFetchPhase() { - return fetcherRunners.size() > 0; - } - - public List<Fetcher> getFetchers() { - return new ArrayList<Fetcher>(fetcherRunners); - } - - public void fetch() { - ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); - for (Fetcher f : fetcherRunners) { - executorService.submit(new FetchRunner(context, f)); - } - } - - public void kill() { - stopScriptExecutors(); - context.setState(TaskAttemptState.TA_KILLED); - context.stop(); - } - - public void abort() { - stopScriptExecutors(); - context.stop(); - } - - public void cleanUp() { - // remove itself from worker - if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { - synchronized (executionBlockContext.getTasks()) { - executionBlockContext.getTasks().remove(this.getId()); - } - } else { - LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); - } - } - - public TaskStatusProto getReport() { - TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); - builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); - builder.setId(context.getTaskId().getProto()) - .setProgress(context.getProgress()) - .setState(context.getState()); - - builder.setInputStats(reloadInputStats()); - - if (context.getResultStats() != null) { - builder.setResultStats(context.getResultStats().getProto()); - } - return builder.build(); - } - - public boolean isRunning(){ - return context.getState() == TaskAttemptState.TA_RUNNING; - } - - public boolean isProgressChanged() { - return context.isProgressChanged(); - } - - public void updateProgress() { - if(context != null && context.isStopped()){ - return; - } - - if (executor != null && context.getProgress() < 1.0f) { - context.setExecutorProgress(executor.getProgress()); - } - } - - private CatalogProtos.TableStatsProto reloadInputStats() { - synchronized(inputStats) { - if (this.executor == null) { - return inputStats.getProto(); - } - - TableStats executorInputStats = this.executor.getInputStats(); - - if (executorInputStats != null) { - inputStats.setValues(executorInputStats); - } - return inputStats.getProto(); - } - } - - private TaskCompletionReport getTaskCompletionReport() { - TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); - builder.setId(context.getTaskId().getProto()); - - builder.setInputStats(reloadInputStats()); - - if (context.hasResultStats()) { - builder.setResultStats(context.getResultStats().getProto()); - } else { - builder.setResultStats(new TableStats().getProto()); - } - - Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); - if (it.hasNext()) { - do { - Entry<Integer, String> entry = it.next(); - ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); - part.setPartId(entry.getKey()); - - // Set output volume - if (context.getPartitionOutputVolume() != null) { - for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { - if (entry.getKey().equals(e.getKey())) { - part.setVolume(e.getValue().longValue()); - break; - } - } - } - - builder.addShuffleFileOutputs(part.build()); - } while (it.hasNext()); - } - - return builder.build(); - } - - private void waitForFetch() throws InterruptedException, IOException { - context.getFetchLatch().await(); - LOG.info(context.getTaskId() + " All fetches are done!"); - Collection<String> inputs = Lists.newArrayList(context.getInputTables()); - - // Get all broadcasted tables - Set<String> broadcastTableNames = new HashSet<String>(); - List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); - if (broadcasts != null) { - for (EnforceProperty eachBroadcast : broadcasts) { - broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); - } - } - - // localize the fetched data and skip the broadcast table - for (String inputTable: inputs) { - if (broadcastTableNames.contains(inputTable)) { - continue; - } - File tableDir = new File(context.getFetchIn(), inputTable); - FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); - context.updateAssignedFragments(inputTable, frags); - } - } - - public void run() throws Exception { - startTime = System.currentTimeMillis(); - Throwable error = null; - try { - if(!context.isStopped()) { - context.setState(TaskAttemptState.TA_RUNNING); - if (context.hasFetchPhase()) { - // If the fetch is still in progress, the query unit must wait for - // complete. - waitForFetch(); - context.setFetcherProgress(FETCHER_PROGRESS); - context.setProgressChanged(true); - updateProgress(); - } - - this.executor = executionBlockContext.getTQueryEngine(). - createPlan(context, plan); - this.executor.init(); - - while(!context.isStopped() && executor.next() != null) { - } - } - } catch (Throwable e) { - error = e ; - LOG.error(e.getMessage(), e); - stopScriptExecutors(); - context.stop(); - } finally { - if (executor != null) { - try { - executor.close(); - reloadInputStats(); - } catch (IOException e) { - LOG.error(e, e); - } - this.executor = null; - } - - executionBlockContext.completedTasksNum.incrementAndGet(); - context.getHashShuffleAppenderManager().finalizeTask(taskId); - - QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); - if (context.isStopped()) { - context.setExecutorProgress(0.0f); - - if (context.getState() == TaskAttemptState.TA_KILLED) { - queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); - executionBlockContext.killedTasksNum.incrementAndGet(); - } else { - context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); - } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); - } - - queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); - executionBlockContext.failedTasksNum.incrementAndGet(); - } - } else { - // if successful - context.setProgress(1.0f); - context.setState(TaskAttemptState.TA_SUCCEEDED); - executionBlockContext.succeededTasksNum.incrementAndGet(); - - TaskCompletionReport report = getTaskCompletionReport(); - queryMasterStub.done(null, report, NullCallback.get()); - } - finishTime = System.currentTimeMillis(); - LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + - ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() - + ", killed: " + executionBlockContext.killedTasksNum.intValue() - + ", failed: " + executionBlockContext.failedTasksNum.intValue()); - cleanupTask(); - } - } - - public void cleanupTask() { - TaskHistory taskHistory = createTaskHistory(); - executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); - executionBlockContext.getTasks().remove(getId()); - - fetcherRunners.clear(); - fetcherRunners = null; - try { - if(executor != null) { - executor.close(); - executor = null; - } - } catch (IOException e) { - LOG.fatal(e.getMessage(), e); - } - - executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); - stopScriptExecutors(); - } - - public TaskHistory createTaskHistory() { - TaskHistory taskHistory = null; - try { - taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), - startTime, finishTime, reloadInputStats()); - - if (context.getOutputPath() != null) { - taskHistory.setOutputPath(context.getOutputPath().toString()); - } - - if (context.getWorkDir() != null) { - taskHistory.setWorkingPath(context.getWorkDir().toString()); - } - - if (context.getResultStats() != null) { - taskHistory.setOutputStats(context.getResultStats().getProto()); - } - - if (hasFetchPhase()) { - taskHistory.setTotalFetchCount(fetcherRunners.size()); - int i = 0; - FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); - for (Fetcher fetcher : fetcherRunners) { - // TODO store the fetcher histories - if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { - builder.setStartTime(fetcher.getStartTime()); - builder.setFinishTime(fetcher.getFinishTime()); - builder.setFileLength(fetcher.getFileLen()); - builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); - builder.setState(fetcher.getState()); - - taskHistory.addFetcherHistory(builder.build()); - } - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; - } - taskHistory.setFinishedFetchCount(i); - } - } catch (Exception e) { - LOG.warn(e.getMessage(), e); - } - - return taskHistory; - } - - public int hashCode() { - return context.hashCode(); - } - - public boolean equals(Object obj) { - if (obj instanceof Task) { - Task other = (Task) obj; - return this.context.equals(other.context); - } - return false; - } - - private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) - throws IOException { - Configuration c = new Configuration(systemConf); - c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); - FileSystem fs = FileSystem.get(c); - Path tablePath = new Path(file.getAbsolutePath()); - - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus f : fileLists) { - if (f.getLen() == 0) { - continue; - } - tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); - listTablets.add(tablet); - } - - // Special treatment for locally pseudo fetched chunks - synchronized (localChunks) { - for (FileChunk chunk : localChunks) { - if (name.equals(chunk.getEbId())) { - tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); - listTablets.add(tablet); - LOG.info("One local chunk is added to listTablets"); - } - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - private class FetchRunner implements Runnable { - private final TaskAttemptContext ctx; - private final Fetcher fetcher; - private int maxRetryNum; - - public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { - this.ctx = ctx; - this.fetcher = fetcher; - this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); - } - - @Override - public void run() { - int retryNum = 0; - int retryWaitTime = 1000; //sec - - try { // for releasing fetch latch - while(!context.isStopped() && retryNum < maxRetryNum) { - if (retryNum > 0) { - try { - Thread.sleep(retryWaitTime); - retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds - } catch (InterruptedException e) { - LOG.error(e); - } - LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); - } - try { - FileChunk fetched = fetcher.get(); - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null - && fetched.getFile() != null) { - if (fetched.fromRemote() == false) { - localChunks.add(fetched); - LOG.info("Add a new FileChunk to local chunk list"); - } - break; - } - } catch (Throwable e) { - LOG.error("Fetch failed: " + fetcher.getURI(), e); - } - retryNum++; - } - } finally { - if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ - fetcherFinished(ctx); - } else { - if (retryNum == maxRetryNum) { - LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); - } - stopScriptExecutors(); - context.stop(); // retry task - ctx.getFetchLatch().countDown(); - } - } - } - } - - @VisibleForTesting - public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { - if (totalFetcher > 0) { - return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; - } else { - return 0.0f; - } - } - - private synchronized void fetcherFinished(TaskAttemptContext ctx) { - int fetcherSize = fetcherRunners.size(); - if(fetcherSize == 0) { - return; - } - - ctx.getFetchLatch().countDown(); - - int remainFetcher = (int) ctx.getFetchLatch().getCount(); - if (remainFetcher == 0) { - context.setFetcherProgress(FETCHER_PROGRESS); - } else { - context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); - context.setProgressChanged(true); - } - } - - private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, - List<FetchImpl> fetches) throws IOException { - - if (fetches.size() > 0) { - Path inputDir = executionBlockContext.getLocalDirAllocator(). - getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); - - int i = 0; - File storeDir; - File defaultStoreFile; - FileChunk storeChunk = null; - List<Fetcher> runnerList = Lists.newArrayList(); - - for (FetchImpl f : fetches) { - storeDir = new File(inputDir.toString(), f.getName()); - if (!storeDir.exists()) { - storeDir.mkdirs(); - } - - for (URI uri : f.getURIs()) { - defaultStoreFile = new File(storeDir, "in_" + i); - InetAddress address = InetAddress.getByName(uri.getHost()); - - WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); - if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { - boolean hasError = false; - try { - LOG.info("Try to get local file chunk at local host"); - storeChunk = getLocalStoredFileChunk(uri, systemConf); - } catch (Throwable t) { - hasError = true; - } - - // When a range request is out of range, storeChunk will be NULL. This case is normal state. - // So, we should skip and don't need to create storeChunk. - if (storeChunk == null && !hasError) { - continue; - } - - if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 - && hasError == false) { - storeChunk.setFromRemote(false); - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); - } - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); - } - - // If we decide that intermediate data should be really fetched from a remote host, storeChunk - // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it - storeChunk.setEbId(f.getName()); - Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); - LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); - runnerList.add(fetcher); - i++; - } - } - ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); - return runnerList; - } else { - return Lists.newArrayList(); - } - } - - private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { - // Parse the URI - LOG.info("getLocalStoredFileChunk starts"); - final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); - final List<String> types = params.get("type"); - final List<String> qids = params.get("qid"); - final List<String> taskIdList = params.get("ta"); - final List<String> stageIds = params.get("sid"); - final List<String> partIds = params.get("p"); - final List<String> offsetList = params.get("offset"); - final List<String> lengthList = params.get("length"); - - if (types == null || stageIds == null || qids == null || partIds == null) { - LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); - return null; - } - - if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { - LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); - return null; - } - - String queryId = qids.get(0); - String shuffleType = types.get(0); - String sid = stageIds.get(0); - String partId = partIds.get(0); - - if (shuffleType.equals("r") && taskIdList == null) { - LOG.error("Invalid URI - For range shuffle, taskId is required"); - return null; - } - List<String> taskIds = splitMaps(taskIdList); - - FileChunk chunk = null; - long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; - long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; - - LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId - + ", taskIds=" + taskIdList); - - // The working directory of Tajo worker for each query, including stage - String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; - - // If the stage requires a range shuffle - if (shuffleType.equals("r")) { - String ta = taskIds.get(0); - if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { - LOG.warn("Range shuffle - file not exist"); - return null; - } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); - String startKey = params.get("start").get(0); - String endKey = params.get("end").get(0); - boolean last = params.get("final") != null; - - try { - chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error("getFileChunks() throws exception"); - return null; - } - - // If the stage requires a hash shuffle or a scattered hash shuffle - } else if (shuffleType.equals("h") || shuffleType.equals("s")) { - int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); - String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; - if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { - LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); - return null; - } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); - File file = new File(path.toUri()); - long startPos = (offset >= 0 && length >= 0) ? offset : 0; - long readLen = (offset >= 0 && length >= 0) ? length : file.length(); - - if (startPos >= file.length()) { - LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); - return null; - } - chunk = new FileChunk(file, startPos, readLen); - - } else { - LOG.error("Unknown shuffle type"); - return null; - } - - return chunk; - } - - private List<String> splitMaps(List<String> mapq) { - if (null == mapq) { - return null; - } - final List<String> ret = new ArrayList<String>(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); - } - return ret; - } - - public static Path getTaskAttemptDir(TaskAttemptId quid) { - Path workDir = - StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), - String.valueOf(quid.getTaskId().getId()), - String.valueOf(quid.getId())); - return workDir; - } + TajoWorkerProtocol.TaskStatusProto getReport(); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 58028ac..d020639 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; @@ -60,13 +59,13 @@ public class TaskAttemptContext { private volatile TaskAttemptState state; private TableStats resultStats; - private TaskAttemptId queryId; + private TaskAttemptId taskId; private final Path workDir; private boolean needFetch = false; private CountDownLatch doneFetchPhaseSignal; private float progress = 0.0f; private float fetcherProgress = 0.0f; - private AtomicBoolean progressChanged = new AtomicBoolean(false); + private volatile boolean progressChanged; /** a map of shuffled file outputs */ private Map<Integer, String> shuffleFileOutputs; @@ -87,7 +86,7 @@ public class TaskAttemptContext { private EvalContext evalContext = new EvalContext(); public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext, - final TaskAttemptId queryId, + final TaskAttemptId taskId, final FragmentProto[] fragments, final Path workDir) { this.queryContext = queryContext; @@ -97,7 +96,7 @@ public class TaskAttemptContext { this.sharedResource = executionBlockContext.getSharedResource(); } - this.queryId = queryId; + this.taskId = taskId; if (fragments != null) { for (FragmentProto t : fragments) { @@ -114,25 +113,15 @@ public class TaskAttemptContext { this.workDir = workDir; this.shuffleFileOutputs = Maps.newHashMap(); - state = TaskAttemptState.TA_PENDING; + this.state = TaskAttemptState.TA_PENDING; this.partitionOutputVolume = Maps.newHashMap(); - - if (workerContext != null) { - this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager(); - } else { - try { - this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf()); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } } @VisibleForTesting - public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId queryId, + public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId, final Fragment [] fragments, final Path workDir) { - this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir); + this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir); } public TajoConf getConf() { @@ -308,9 +297,10 @@ public class TaskAttemptContext { public Path getWorkDir() { return this.workDir; } - + + //TODO change to getTaskAttemptId() public TaskAttemptId getTaskId() { - return this.queryId; + return this.taskId; } public float getProgress() { @@ -326,17 +316,11 @@ public class TaskAttemptContext { this.progress = progress; } - if (previousProgress != progress) { - setProgressChanged(true); - } + this.progressChanged = previousProgress != progress; } public boolean isProgressChanged() { - return progressChanged.get(); - } - - public void setProgressChanged(boolean changed){ - progressChanged.set(changed); + return progressChanged; } public void setExecutorProgress(float executorProgress) { @@ -355,7 +339,9 @@ public class TaskAttemptContext { if(Float.isNaN(fetcherProgress) || Float.isInfinite(fetcherProgress)){ fetcherProgress = 0.0f; } + float previousProgress = this.fetcherProgress; this.fetcherProgress = fetcherProgress; + this.progressChanged = previousProgress != fetcherProgress; } public FragmentProto getTable(String id) { @@ -383,13 +369,13 @@ public class TaskAttemptContext { } public int hashCode() { - return Objects.hashCode(queryId); + return Objects.hashCode(taskId); } public boolean equals(Object obj) { if (obj instanceof TaskAttemptContext) { TaskAttemptContext other = (TaskAttemptContext) obj; - return queryId.equals(other.getTaskId()); + return taskId.equals(other.getTaskId()); } else { return false; } @@ -399,11 +385,18 @@ public class TaskAttemptContext { return queryContext; } - public TaskAttemptId getQueryId() { - return queryId; - } - public HashShuffleAppenderManager getHashShuffleAppenderManager() { + if(hashShuffleAppenderManager == null) { + if (workerContext != null) { + this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager(); + } else { + try { + this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf()); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } return hashShuffleAppenderManager; } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java new file mode 100644 index 0000000..2576726 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.TajoProtos; + +/** + * The driver class for Tajo Task processing. + */ +public class TaskContainer implements Runnable { + private static final Log LOG = LogFactory.getLog(TaskContainer.class); + + private final TaskExecutor executor; + private final int sequenceId; + + public TaskContainer(int sequenceId, TaskExecutor executor) { + this.sequenceId = sequenceId; + this.executor = executor; + } + + @Override + public void run() { + while (!executor.isStopped()) { + + Task task = null; + try { + task = executor.getNextTask(); + + task.getExecutionBlockContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); + + if (LOG.isDebugEnabled()) { + LOG.debug(sequenceId + TaskContainer.class.getSimpleName() + + " got task:" + task.getTaskContext().getTaskId()); + } + + TaskAttemptContext taskAttemptContext = task.getTaskContext(); + if (taskAttemptContext.isStopped()) return; + + task.init(); + + if (task.hasFetchPhase()) { + task.fetch(); // The fetch is performed in an asynchronous way. + } + + if (!taskAttemptContext.isStopped()) { + task.run(); + } + + task.cleanup(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + if (task != null) { + try { + task.abort(); + task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e.getMessage()); + } catch (Throwable t) { + LOG.fatal(t.getMessage(), t); + } + } + } finally { + if (task != null) { + executor.stopTask(task.getTaskContext().getTaskId()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java new file mode 100644 index 0000000..299952e --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.worker.event.*; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * TaskExecutor uses a number of threads equal to the number of slots available for running tasks on the Worker + */ +public class TaskExecutor extends AbstractService implements EventHandler<TaskExecutorEvent> { + private static final Log LOG = LogFactory.getLog(TaskExecutor.class); + + private final TaskManager taskManager; + private final EventHandler rmEventHandler; + private final Map<TaskAttemptId, NodeResource> allocatedResourceMap; + private final BlockingQueue<Task> taskQueue; + private final AtomicInteger runningTasks; + private ThreadPoolExecutor fetcherExecutor; + private ExecutorService threadPool; + private TajoConf tajoConf; + private volatile boolean isStopped; + + public TaskExecutor(TaskManager taskManager, EventHandler rmEventHandler) { + super(TaskExecutor.class.getName()); + this.taskManager = taskManager; + this.rmEventHandler = rmEventHandler; + this.allocatedResourceMap = Maps.newConcurrentMap(); + this.runningTasks = new AtomicInteger(); + this.taskQueue = new LinkedBlockingQueue<Task>(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + + this.tajoConf = (TajoConf) conf; + this.taskManager.getDispatcher().register(TaskExecutorEvent.EventType.class, this); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + int nThreads = this.tajoConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + this.threadPool = Executors.newFixedThreadPool(nThreads, + new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build()); + + //TODO move to tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM); + int maxFetcherThreads = Runtime.getRuntime().availableProcessors() * 2; + this.fetcherExecutor = new ThreadPoolExecutor(Math.min(nThreads, maxFetcherThreads), + maxFetcherThreads, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(true)); + + + for (int i = 0; i < nThreads; i++) { + threadPool.submit(new TaskContainer(i, this)); + } + + super.serviceStart(); + LOG.info("Started TaskExecutor[" + nThreads + "], Fetcher executor[" + maxFetcherThreads + "]"); + } + + @Override + protected void serviceStop() throws Exception { + isStopped = true; + + threadPool.shutdown(); + fetcherExecutor.shutdown(); + super.serviceStop(); + } + + public boolean isStopped() { + return isStopped; + } + + public int getRunningTasks() { + return runningTasks.get(); + } + + /** + * This will block until a task is available. + */ + protected Task getNextTask() { + Task task = null; + try { + task = taskQueue.take(); + } catch (InterruptedException e) { + LOG.fatal(e); + } + return task; + } + + @SuppressWarnings("unchecked") + protected void stopTask(TaskAttemptId taskId) { + runningTasks.decrementAndGet(); + rmEventHandler.handle(new NodeResourceDeallocateEvent(allocatedResourceMap.remove(taskId))); + } + + protected ExecutorService getFetcherExecutor() { + return fetcherExecutor; + } + + + protected Task createTask(ExecutionBlockContext executionBlockContext, + TajoWorkerProtocol.TaskRequestProto taskRequest) throws IOException { + Task task = null; + TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); + if (executionBlockContext.getTasks().containsKey(taskAttemptId)) { + String errorMessage = "Duplicate Task Attempt: " + taskAttemptId; + LOG.error(errorMessage); + executionBlockContext.fatalError(taskAttemptId, errorMessage); + } else { + task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext, getFetcherExecutor()); + executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task); + } + return task; + } + + @Override + public void handle(TaskExecutorEvent event) { + + if (event instanceof TaskStartEvent) { + TaskStartEvent startEvent = (TaskStartEvent) event; + allocatedResourceMap.put(startEvent.getTaskId(), startEvent.getAllocatedResource()); + + ExecutionBlockContext context = taskManager.getExecutionBlockContext( + startEvent.getTaskId().getTaskId().getExecutionBlockId()); + + try { + Task task = createTask(context, startEvent.getTaskRequest()); + if (task != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() + + ", allocated resource: " + startEvent.getAllocatedResource()); + } + taskQueue.put(task); + runningTasks.incrementAndGet(); + context.getWorkerContext().getWorkerSystemMetrics() + .histogram("tasks", "running").update(runningTasks.get()); + } else { + LOG.warn("Release duplicate task resource: " + startEvent.getAllocatedResource()); + stopTask(startEvent.getTaskId()); + } + } catch (InterruptedException e) { + if (!isStopped) { + LOG.fatal(e.getMessage(), e); + } + } catch (IOException e) { + stopTask(startEvent.getTaskId()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java new file mode 100644 index 0000000..be3960b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -0,0 +1,838 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.*; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.function.python.TajoScriptEngine; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.LogicalNodeDeserializer; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.NetUtils; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; + +public class TaskImpl implements Task { + private static final Log LOG = LogFactory.getLog(TaskImpl.class); + private static final float FETCHER_PROGRESS = 0.5f; + + private final TajoConf systemConf; + private final QueryContext queryContext; + private final ExecutionBlockContext executionBlockContext; + private final TaskRequest request; + private final Map<String, TableDesc> descs; + private final TableStats inputStats; + private final ExecutorService fetcherExecutor; + private final Path taskDir; + + private final TaskAttemptContext context; + private List<Fetcher> fetcherRunners; + private LogicalNode plan; + private PhysicalExec executor; + + private boolean interQuery; + private Path inputTableBaseDir; + + private long startTime; + private long endTime; + + private List<FileChunk> localChunks; + // TODO - to be refactored + private ShuffleType shuffleType = null; + private Schema finalSchema = null; + + private TupleComparator sortComp = null; + + public TaskImpl(final TaskRequest request, + final ExecutionBlockContext executionBlockContext, + final ExecutorService fetcherExecutor) throws IOException { + + this.request = request; + this.executionBlockContext = executionBlockContext; + this.systemConf = executionBlockContext.getConf(); + this.queryContext = request.getQueryContext(systemConf); + this.inputStats = new TableStats(); + this.fetcherRunners = Lists.newArrayList(); + this.fetcherExecutor = fetcherExecutor; + this.descs = Maps.newHashMap(); + + Path baseDirPath = executionBlockContext.createBaseDir(); + LOG.info("Task basedir is created (" + baseDirPath +")"); + TaskAttemptId taskAttemptId = request.getId(); + + this.taskDir = StorageUtil.concatPath(baseDirPath, + taskAttemptId.getTaskId().getId() + "_" + taskAttemptId.getId()); + this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskAttemptId, + request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); + this.context.setDataChannel(request.getDataChannel()); + this.context.setEnforcer(request.getEnforcer()); + this.context.setState(TaskAttemptState.TA_PENDING); + } + + public void initPlan() throws IOException { + plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); + LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); + if (scanNode != null) { + for (LogicalNode node : scanNode) { + ScanNode scan = (ScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } + + LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); + if (partitionScanNode != null) { + for (LogicalNode node : partitionScanNode) { + PartitionedTableScanNode scan = (PartitionedTableScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } + + interQuery = request.getProto().getInterQuery(); + if (interQuery) { + context.setInterQuery(); + this.shuffleType = context.getDataChannel().getShuffleType(); + + if (shuffleType == ShuffleType.RANGE_SHUFFLE) { + SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); + this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); + this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); + } + } else { + Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf)) + .getAppenderFilePath(getId(), queryContext.getStagingDir()); + LOG.info("Output File Path: " + outFilePath); + context.setOutputPath(outFilePath); + } + + this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); + LOG.info("=================================="); + LOG.info("* Stage " + request.getId() + " is initialized"); + LOG.info("* InterQuery: " + interQuery + + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + + ", Fragments (num: " + request.getFragments().size() + ")" + + ", Fetches (total:" + request.getFetches().size() + ") :"); + + if(LOG.isDebugEnabled()) { + for (FetchImpl f : request.getFetches()) { + LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); + } + } + LOG.info("* Local task dir: " + taskDir); + if(LOG.isDebugEnabled()) { + LOG.debug("* plan:\n"); + LOG.debug(plan.toString()); + } + LOG.info("=================================="); + } + + private void startScriptExecutors() throws IOException { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.start(systemConf); + } + } + + private void stopScriptExecutors() { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.shutdown(); + } + } + + @Override + public void init() throws IOException { + LOG.info("Initializing: " + getId()); + + initPlan(); + startScriptExecutors(); + + if (context.getState() == TaskAttemptState.TA_PENDING) { + // initialize a task temporal dir + FileSystem localFS = executionBlockContext.getLocalFS(); + localFS.mkdirs(taskDir); + + if (request.getFetches().size() > 0) { + inputTableBaseDir = localFS.makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( + getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); + localFS.mkdirs(inputTableBaseDir); + Path tableDir; + for (String inputTable : context.getInputTables()) { + tableDir = new Path(inputTableBaseDir, inputTable); + if (!localFS.exists(tableDir)) { + LOG.info("the directory is created " + tableDir.toUri()); + localFS.mkdirs(tableDir); + } + } + } + // for localizing the intermediate data + fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); + } + } + + private TaskAttemptId getId() { + return context.getTaskId(); + } + + public String toString() { + return "TaskId: " + this.getId() + " Status: " + context.getState(); + } + + @Override + public boolean isStopped() { + return context.isStopped(); + } + + @Override + public TaskAttemptContext getTaskContext() { + return context; + } + + @Override + public ExecutionBlockContext getExecutionBlockContext() { + return executionBlockContext; + } + + @Override + public boolean hasFetchPhase() { + return fetcherRunners.size() > 0; + } + + @Override + public void fetch() { + for (Fetcher f : fetcherRunners) { + fetcherExecutor.submit(new FetchRunner(context, f)); + } + } + + @Override + public void kill() { + stopScriptExecutors(); + context.setState(TaskAttemptState.TA_KILLED); + context.stop(); + } + + @Override + public void abort() { + stopScriptExecutors(); + context.setState(TaskAttemptState.TA_FAILED); + context.stop(); + } + + @Override + public TaskStatusProto getReport() { + TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); + builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); + builder.setId(context.getTaskId().getProto()) + .setProgress(context.getProgress()) + .setState(context.getState()); + + builder.setInputStats(reloadInputStats()); + + if (context.getResultStats() != null) { + builder.setResultStats(context.getResultStats().getProto()); + } + return builder.build(); + } + + @Override + public boolean isProgressChanged() { + return context.isProgressChanged(); + } + + @Override + public void updateProgress() { + if(context != null && context.isStopped()){ + return; + } + + if (executor != null && context.getProgress() < 1.0f) { + context.setExecutorProgress(executor.getProgress()); + } + } + + private CatalogProtos.TableStatsProto reloadInputStats() { + synchronized(inputStats) { + if (this.executor == null) { + return inputStats.getProto(); + } + + TableStats executorInputStats = this.executor.getInputStats(); + + if (executorInputStats != null) { + inputStats.setValues(executorInputStats); + } + return inputStats.getProto(); + } + } + + private TaskCompletionReport getTaskCompletionReport() { + TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); + builder.setId(context.getTaskId().getProto()); + + builder.setInputStats(reloadInputStats()); + + if (context.hasResultStats()) { + builder.setResultStats(context.getResultStats().getProto()); + } else { + builder.setResultStats(new TableStats().getProto()); + } + + Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); + if (it.hasNext()) { + do { + Entry<Integer, String> entry = it.next(); + ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); + part.setPartId(entry.getKey()); + + // Set output volume + if (context.getPartitionOutputVolume() != null) { + for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { + if (entry.getKey().equals(e.getKey())) { + part.setVolume(e.getValue().longValue()); + break; + } + } + } + + builder.addShuffleFileOutputs(part.build()); + } while (it.hasNext()); + } + + return builder.build(); + } + + private void waitForFetch() throws InterruptedException, IOException { + context.getFetchLatch().await(); + LOG.info(context.getTaskId() + " All fetches are done!"); + Collection<String> inputs = Lists.newArrayList(context.getInputTables()); + + // Get all broadcasted tables + Set<String> broadcastTableNames = new HashSet<String>(); + List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); + if (broadcasts != null) { + for (EnforceProperty eachBroadcast : broadcasts) { + broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); + } + } + + // localize the fetched data and skip the broadcast table + for (String inputTable: inputs) { + if (broadcastTableNames.contains(inputTable)) { + continue; + } + File tableDir = new File(context.getFetchIn(), inputTable); + FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); + context.updateAssignedFragments(inputTable, frags); + } + } + + @Override + public void run() throws Exception { + startTime = System.currentTimeMillis(); + Throwable error = null; + + try { + if(!context.isStopped()) { + context.setState(TajoProtos.TaskAttemptState.TA_RUNNING); + if (context.hasFetchPhase()) { + // If the fetch is still in progress, the query unit must wait for complete. + waitForFetch(); + context.setFetcherProgress(FETCHER_PROGRESS); + updateProgress(); + } + + this.executor = executionBlockContext.getTQueryEngine().createPlan(context, plan); + this.executor.init(); + + while(!context.isStopped() && executor.next() != null) { + } + } + } catch (Throwable e) { + error = e ; + LOG.error(e.getMessage(), e); + stopScriptExecutors(); + context.stop(); + } finally { + if (executor != null) { + try { + executor.close(); + reloadInputStats(); + } catch (IOException e) { + LOG.error(e, e); + } + this.executor = null; + } + + executionBlockContext.completedTasksNum.incrementAndGet(); + context.getHashShuffleAppenderManager().finalizeTask(getId()); + + QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); + if (context.isStopped()) { + context.setExecutorProgress(0.0f); + + if (context.getState() == TaskAttemptState.TA_KILLED) { + queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); + executionBlockContext.killedTasksNum.incrementAndGet(); + } else { + context.setState(TaskAttemptState.TA_FAILED); + TaskFatalErrorReport.Builder errorBuilder = + TaskFatalErrorReport.newBuilder() + .setId(getId().getProto()); + if (error != null) { + if (error.getMessage() == null) { + errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); + } else { + errorBuilder.setErrorMessage(error.getMessage()); + } + errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); + } + + queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); + executionBlockContext.failedTasksNum.incrementAndGet(); + } + } else { + // if successful + context.stop(); + context.setProgress(1.0f); + context.setState(TaskAttemptState.TA_SUCCEEDED); + executionBlockContext.succeededTasksNum.incrementAndGet(); + + TaskCompletionReport report = getTaskCompletionReport(); + queryMasterStub.done(null, report, NullCallback.get()); + } + endTime = System.currentTimeMillis(); + LOG.info(context.getTaskId() + " completed. " + + "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + + ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() + + ", killed: " + executionBlockContext.killedTasksNum.intValue() + + ", failed: " + executionBlockContext.failedTasksNum.intValue()); + } + } + + @Override + public void cleanup() { + TaskHistory taskHistory = createTaskHistory(); + executionBlockContext.addTaskHistory(getId().getTaskId(), taskHistory); + executionBlockContext.getTasks().remove(getId()); + + fetcherRunners.clear(); + fetcherRunners = null; + try { + if(executor != null) { + executor.close(); + executor = null; + } + } catch (IOException e) { + LOG.fatal(e.getMessage(), e); + } + + executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); + stopScriptExecutors(); + } + + public TaskHistory createTaskHistory() { + TaskHistory taskHistory = null; + try { + taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(), + startTime, endTime, reloadInputStats()); + + if (context.getOutputPath() != null) { + taskHistory.setOutputPath(context.getOutputPath().toString()); + } + + if (context.getWorkDir() != null) { + taskHistory.setWorkingPath(context.getWorkDir().toString()); + } + + if (context.getResultStats() != null) { + taskHistory.setOutputStats(context.getResultStats().getProto()); + } + + if (hasFetchPhase()) { + taskHistory.setTotalFetchCount(fetcherRunners.size()); + int i = 0; + FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); + for (Fetcher fetcher : fetcherRunners) { + // TODO store the fetcher histories + if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { + builder.setStartTime(fetcher.getStartTime()); + builder.setFinishTime(fetcher.getFinishTime()); + builder.setFileLength(fetcher.getFileLen()); + builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); + builder.setState(fetcher.getState()); + + taskHistory.addFetcherHistory(builder.build()); + } + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; + } + taskHistory.setFinishedFetchCount(i); + } + } catch (Exception e) { + LOG.warn(e.getMessage(), e); + } + + return taskHistory; + } + + public int hashCode() { + return context.hashCode(); + } + + public boolean equals(Object obj) { + if (obj instanceof TaskImpl) { + TaskImpl other = (TaskImpl) obj; + return this.context.equals(other.context); + } + return false; + } + + private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) + throws IOException { + Configuration c = new Configuration(systemConf); + c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); + FileSystem fs = FileSystem.get(c); + Path tablePath = new Path(file.getAbsolutePath()); + + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus f : fileLists) { + if (f.getLen() == 0) { + continue; + } + tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); + listTablets.add(tablet); + } + + // Special treatment for locally pseudo fetched chunks + synchronized (localChunks) { + for (FileChunk chunk : localChunks) { + if (name.equals(chunk.getEbId())) { + tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); + listTablets.add(tablet); + LOG.info("One local chunk is added to listTablets"); + } + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + private class FetchRunner implements Runnable { + private final TaskAttemptContext ctx; + private final Fetcher fetcher; + private int maxRetryNum; + + public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { + this.ctx = ctx; + this.fetcher = fetcher; + this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); + } + + @Override + public void run() { + int retryNum = 0; + int retryWaitTime = 1000; //sec + + try { // for releasing fetch latch + while(!context.isStopped() && retryNum < maxRetryNum) { + if (retryNum > 0) { + try { + Thread.sleep(retryWaitTime); + retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds + } catch (InterruptedException e) { + LOG.error(e); + } + LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); + } + try { + FileChunk fetched = fetcher.get(); + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null + && fetched.getFile() != null) { + if (fetched.fromRemote() == false) { + localChunks.add(fetched); + LOG.info("Add a new FileChunk to local chunk list"); + } + break; + } + } catch (Throwable e) { + LOG.error("Fetch failed: " + fetcher.getURI(), e); + } + retryNum++; + } + } finally { + if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ + fetcherFinished(ctx); + } else { + if (retryNum == maxRetryNum) { + LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); + } + stopScriptExecutors(); + context.stop(); // retry task + ctx.getFetchLatch().countDown(); + } + } + } + } + + @VisibleForTesting + public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { + if (totalFetcher > 0) { + return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; + } else { + return 0.0f; + } + } + + private synchronized void fetcherFinished(TaskAttemptContext ctx) { + int fetcherSize = fetcherRunners.size(); + if(fetcherSize == 0) { + return; + } + + ctx.getFetchLatch().countDown(); + + int remainFetcher = (int) ctx.getFetchLatch().getCount(); + if (remainFetcher == 0) { + context.setFetcherProgress(FETCHER_PROGRESS); + } else { + context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); + } + } + + private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, + List<FetchImpl> fetches) throws IOException { + + if (fetches.size() > 0) { + Path inputDir = executionBlockContext.getLocalDirAllocator(). + getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); + + int i = 0; + File storeDir; + File defaultStoreFile; + FileChunk storeChunk = null; + List<Fetcher> runnerList = Lists.newArrayList(); + + for (FetchImpl f : fetches) { + storeDir = new File(inputDir.toString(), f.getName()); + if (!storeDir.exists()) { + if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir); + } + + for (URI uri : f.getURIs()) { + defaultStoreFile = new File(storeDir, "in_" + i); + InetAddress address = InetAddress.getByName(uri.getHost()); + + WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); + if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { + boolean hasError = false; + try { + LOG.info("Try to get local file chunk at local host"); + storeChunk = getLocalStoredFileChunk(uri, systemConf); + } catch (Throwable t) { + hasError = true; + } + + // When a range request is out of range, storeChunk will be NULL. This case is normal state. + // So, we should skip and don't need to create storeChunk. + if (storeChunk == null && !hasError) { + continue; + } + + if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 + && hasError == false) { + storeChunk.setFromRemote(false); + } else { + storeChunk = new FileChunk(defaultStoreFile, 0, -1); + storeChunk.setFromRemote(true); + } + } else { + storeChunk = new FileChunk(defaultStoreFile, 0, -1); + storeChunk.setFromRemote(true); + } + + // If we decide that intermediate data should be really fetched from a remote host, storeChunk + // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it + storeChunk.setEbId(f.getName()); + Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); + LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); + runnerList.add(fetcher); + i++; + } + } + ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); + return runnerList; + } else { + return Lists.newArrayList(); + } + } + + private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { + // Parse the URI + LOG.info("getLocalStoredFileChunk starts"); + final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); + final List<String> types = params.get("type"); + final List<String> qids = params.get("qid"); + final List<String> taskIdList = params.get("ta"); + final List<String> stageIds = params.get("sid"); + final List<String> partIds = params.get("p"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); + + if (types == null || stageIds == null || qids == null || partIds == null) { + LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); + return null; + } + + if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { + LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); + return null; + } + + String queryId = qids.get(0); + String shuffleType = types.get(0); + String sid = stageIds.get(0); + String partId = partIds.get(0); + + if (shuffleType.equals("r") && taskIdList == null) { + LOG.error("Invalid URI - For range shuffle, taskId is required"); + return null; + } + List<String> taskIds = splitMaps(taskIdList); + + FileChunk chunk; + long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; + long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; + + LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + + ", taskIds=" + taskIdList); + + // The working directory of Tajo worker for each query, including stage + String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; + + // If the stage requires a range shuffle + if (shuffleType.equals("r")) { + String ta = taskIds.get(0); + if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { + LOG.warn("Range shuffle - file not exist"); + return null; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); + String startKey = params.get("start").get(0); + String endKey = params.get("end").get(0); + boolean last = params.get("final") != null; + + try { + chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); + } catch (Throwable t) { + LOG.error("getFileChunks() throws exception"); + return null; + } + + // If the stage requires a hash shuffle or a scattered hash shuffle + } else if (shuffleType.equals("h") || shuffleType.equals("s")) { + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); + String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; + if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { + LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); + return null; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); + File file = new File(path.toUri()); + long startPos = (offset >= 0 && length >= 0) ? offset : 0; + long readLen = (offset >= 0 && length >= 0) ? length : file.length(); + + if (startPos >= file.length()) { + LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); + return null; + } + chunk = new FileChunk(file, startPos, readLen); + + } else { + LOG.error("Unknown shuffle type"); + return null; + } + + return chunk; + } + + private List<String> splitMaps(List<String> mapq) { + if (null == mapq) { + return null; + } + final List<String> ret = new ArrayList<String>(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + + public static Path getTaskAttemptDir(TaskAttemptId quid) { + Path workDir = + StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), + String.valueOf(quid.getTaskId().getId()), + String.valueOf(quid.getId())); + return workDir; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java new file mode 100644 index 0000000..7990a72 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.worker.event.*; + +import java.io.IOException; +import java.util.*; + +/** + * A TaskManager is responsible for managing executionBlock resource and tasks. + * */ +public class TaskManager extends AbstractService implements EventHandler<TaskManagerEvent> { + private static final Log LOG = LogFactory.getLog(TaskManager.class); + + private final TajoWorker.WorkerContext workerContext; + private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap; + private final Dispatcher dispatcher; + private final EventHandler rmEventHandler; + + private TajoConf tajoConf; + + public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) { + super(TaskManager.class.getName()); + + this.dispatcher = dispatcher; + this.workerContext = workerContext; + this.executionBlockContextMap = Maps.newHashMap(); + this.rmEventHandler = rmEventHandler; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + + this.tajoConf = (TajoConf)conf; + dispatcher.register(TaskManagerEvent.EventType.class, this); + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + + for(ExecutionBlockContext context: executionBlockContextMap.values()) { + context.stop(); + } + executionBlockContextMap.clear(); + super.serviceStop(); + } + + protected Dispatcher getDispatcher() { + return dispatcher; + } + + protected TajoWorker.WorkerContext getWorkerContext() { + return workerContext; + } + + protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { + try { + ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), null, request); + + context.init(); + return context; + } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + protected void stopExecutionBlock(ExecutionBlockContext context, + TajoWorkerProtocol.ExecutionBlockListProto cleanupList) { + + if(context != null){ + try { + context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId()); + context.sendShuffleReport(); + getWorkerContext().getTaskHistoryWriter().flushTaskHistories(); + } catch (Exception e) { + LOG.fatal(e.getMessage(), e); + throw new RuntimeException(e); + } finally { + context.stop(); + + /* cleanup intermediate files */ + for (TajoIdProtos.ExecutionBlockIdProto ebId : cleanupList.getExecutionBlockIdList()) { + String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(ebId)).toString(); + workerContext.cleanup(inputDir); + String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(ebId)).toString(); + workerContext.cleanup(outputDir); + } + } + LOG.info("Stopped execution block:" + context.getExecutionBlockId()); + } + } + + @Override + public void handle(TaskManagerEvent event) { + LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType()); + + if (event instanceof ExecutionBlockStartEvent) { + + //receive event from NodeResourceManager + if(!executionBlockContextMap.containsKey(event.getExecutionBlockId())) { + ExecutionBlockContext context = createExecutionBlock(((ExecutionBlockStartEvent) event).getRequestProto()); + executionBlockContextMap.put(context.getExecutionBlockId(), context); + } else { + LOG.warn("Already initialized ExecutionBlock: " + event.getExecutionBlockId()); + } + } else if (event instanceof ExecutionBlockStopEvent) { + //receive event from QueryMaster + rmEventHandler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS)); + stopExecutionBlock(executionBlockContextMap.remove(event.getExecutionBlockId()), + ((ExecutionBlockStopEvent) event).getCleanupList()); + } + } + + protected ExecutionBlockContext getExecutionBlockContext(ExecutionBlockId executionBlockId) { + return executionBlockContextMap.get(executionBlockId); + } + + public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) { + ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId()); + if (context != null) { + return context.getTask(taskAttemptId); + } + return null; + } + + public List<TaskHistory> getTaskHistories(ExecutionBlockId executionblockId) throws IOException { + List<TaskHistory> histories = new ArrayList<TaskHistory>(); + ExecutionBlockContext context = executionBlockContextMap.get(executionblockId); + if (context != null) { + histories.addAll(context.getTaskHistories().values()); + } + //TODO get List<TaskHistory> from HistoryReader + return histories; + } + + public TaskHistory getTaskHistory(TaskId taskId) { + TaskHistory history = null; + ExecutionBlockContext context = executionBlockContextMap.get(taskId.getExecutionBlockId()); + if (context != null) { + history = context.getTaskHistories().get(taskId); + } + //TODO get TaskHistory from HistoryReader + return history; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 774f358..207b47e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -34,10 +34,8 @@ import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.container.TajoContainerIdPBImpl; import org.apache.tajo.master.container.TajoConverterUtils; import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import java.net.ConnectException; import java.util.concurrent.*; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; @@ -45,6 +43,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.*; /** * The driver class for Tajo Task processing. */ +@Deprecated public class TaskRunner extends AbstractService { /** class logger */ private static final Log LOG = LogFactory.getLog(TaskRunner.class); @@ -256,7 +255,7 @@ public class TaskRunner extends AbstractService { LOG.info("Initializing: " + taskAttemptId); Task task = null; try { - task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext, + task = new LegacyTaskImpl(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext, new TaskRequestImpl(taskRequest)); getContext().getTasks().put(taskAttemptId, task); @@ -269,10 +268,11 @@ public class TaskRunner extends AbstractService { } catch (Throwable t) { LOG.error(t.getMessage(), t); fatalError(qmClientService, taskAttemptId, t.getMessage()); + } finally { if(task != null) { - task.cleanupTask(); + task.cleanup(); } - } finally { + callFuture = null; taskRequest = null; }
