TEZ-134. Untangle MR configuration in YarnTezDagChild. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/be6d4bc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/be6d4bc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/be6d4bc0 Branch: refs/heads/master Commit: be6d4bc096d9bae363cbcf1a3dc5ec67b057507d Parents: 0d5027d Author: Siddharth Seth <[email protected]> Authored: Sat May 18 20:51:06 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Sat May 18 20:51:06 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/common/TezJobConfig.java | 19 +- .../org/apache/hadoop/mapred/YarnTezDagChild.java | 291 ++++----------- .../main/java/org/apache/tez/engine/api/Task.java | 2 + .../apache/tez/common/TezEngineTaskContext.java | 5 + .../tez/engine/common/shuffle/impl/Shuffle.java | 2 +- .../tez/engine/common/sort/impl/TezMerger.java | 2 +- .../task/local/output/TezLocalTaskOutputFiles.java | 4 +- .../task/local/output/TezTaskOutputFiles.java | 2 +- .../apache/tez/engine/runtime/RuntimeUtils.java | 22 +- .../org/apache/tez/engine/task/RuntimeTask.java | 21 +- .../org/apache/hadoop/mapred/LocalJobRunner.java | 8 +- .../apache/tez/mapreduce/task/MRRuntimeTask.java | 262 +++++++++++++ .../apache/tez/mapreduce/processor/MapUtils.java | 50 +++ .../mapreduce/processor/map/TestMapProcessor.java | 23 +- .../processor/reduce/TestReduceProcessor.java | 23 +- 15 files changed, 477 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java index 4a7abea..0d6f6be 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java @@ -62,11 +62,16 @@ public class TezJobConfig { public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000; /** - * + * List of directories avialble to the engine. */ - public static final String LOCAL_DIR = "tez.engine.local.dir"; - public static final String DEFAULT_LOCAL_DIR = "/tmp"; - + public static final String LOCAL_DIRS = "tez.engine.local.dirs"; + public static final String DEFAULT_LOCAL_DIRS = "/tmp"; + + /** + * One local dir for the speicfic job. + */ + public static final String JOB_LOCAL_DIR = "tez.engine.job.local.dir"; + /** * The directory which contains the localized files for this task. */ @@ -74,6 +79,8 @@ public class TezJobConfig { public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir"; public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp"; + public static final String TEZ_TASK_WORKING_DIR = "tez.engine.task.working.dir"; + /** * */ @@ -299,4 +306,8 @@ public class TezJobConfig { * credentials. */ public static final String DAG_CREDENTIALS_BINARY = "tez.dag.credentials.binary"; + + + public static final String APPLICATION_ATTEMPT_ID = + "tez.job.application.attempt.id"; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java index 2295b6a..6fd6eff 100644 --- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java +++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java @@ -20,17 +20,9 @@ package org.apache.hadoop.mapred; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintStream; import java.net.InetSocketAddress; -import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.List; - -import javax.crypto.SecretKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,16 +31,8 @@ import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.filecache.DistributedCache; -import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; -import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; @@ -72,16 +56,15 @@ import org.apache.tez.common.TezJobConfig; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.apache.tez.engine.common.security.TokenCache; import org.apache.tez.engine.runtime.RuntimeUtils; -import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.tez.engine.task.RuntimeTask; import org.apache.tez.mapreduce.input.SimpleInput; import org.apache.tez.mapreduce.output.SimpleOutput; -import org.apache.tez.mapreduce.processor.MRTask; /** - * The main() for TEZ MapReduce task processes. + * The main() for TEZ Task processes. */ public class YarnTezDagChild { @@ -91,11 +74,9 @@ public class YarnTezDagChild { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); LOG.debug("Child starting"); - DeprecatedKeys.init(); - - final JobConf defaultConf = new JobConf(); - // HACK Eventually load the DagConf for security etc setup. -// defaultConf.addResource(MRJobConfig.JOB_CONF_FILE); + final Configuration defaultConf = new Configuration(); + // Security settings will be loaded based on core-site and core-default. Don't + // depend on the jobConf for this. UserGroupInformation.setConfiguration(defaultConf); String host = args[0]; @@ -121,9 +102,10 @@ public class YarnTezDagChild { // Create TaskUmbilicalProtocol as actual task owner. UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(appID.toString()); - Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials); - SecurityUtil.setTokenService(jt, address); - taskOwner.addToken(jt); + + Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); final TezTaskUmbilicalProtocol umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { @Override @@ -142,9 +124,8 @@ public class YarnTezDagChild { ContainerTask containerTask = null; UserGroupInformation childUGI = null; TezTaskAttemptID taskAttemptId = null; - MRTask task = null; ContainerContext containerContext = new ContainerContext(containerId, pid); - + try { while (true) { // poll for new task @@ -165,19 +146,19 @@ public class YarnTezDagChild { } taskContext = (TezEngineTaskContext) containerTask .getTezEngineTaskContext(); - LOG.info("XXXX: New container task context:" + LOG.info("DEBUG: New container task context:" + taskContext.toString()); taskAttemptId = taskContext.getTaskAttemptId(); final Task t = createAndConfigureTezTask(taskContext, umbilical, - credentials, jt, + credentials, jobToken, containerId.getApplicationAttemptId().getAttemptId()); - task = (MRTask) t.getProcessor(); - final JobConf job = task.getConf(); - // Initiate Java VM metrics - JvmMetrics.initSingleton(containerId.toString(), job.getSessionId()); + final Configuration conf = ((RuntimeTask)t).getConfiguration(); + + // TODO Initiate Java VM metrics + // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId()); childUGI = UserGroupInformation.createRemoteUser(System .getenv(ApplicationConstants.Environment.USER.toString())); // Add tokens to new user so that it may execute its task correctly. @@ -186,7 +167,7 @@ public class YarnTezDagChild { childUGI.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - runTezTask(t, umbilical, job); // run the task + runTezTask(t, umbilical, conf); // run the task return null; } }); @@ -196,34 +177,6 @@ public class YarnTezDagChild { } catch (FSError e) { LOG.fatal("FSError from child", e); umbilical.fsError(taskAttemptId, e.getMessage()); - } catch (Exception exception) { - LOG.warn("Exception running child : " - + StringUtils.stringifyException(exception)); - try { - if (task != null) { - // do cleanup for the task - if (childUGI == null) { // no need to job into doAs block - task.taskCleanup(umbilical); - } else { - final MRTask taskFinal = task; - childUGI.doAs(new PrivilegedExceptionAction<Object>() { - @Override - public Object run() throws Exception { - taskFinal.taskCleanup(umbilical); - return null; - } - }); - } - } - } catch (Exception e) { - LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e)); - } - // Report back any failures, for diagnostic purposes - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - exception.printStackTrace(new PrintStream(baos)); - if (taskAttemptId != null) { - umbilical.fatalError(taskAttemptId, baos.toString()); - } } catch (Throwable throwable) { LOG.fatal("Error running child : " + StringUtils.stringifyException(throwable)); @@ -249,29 +202,36 @@ public class YarnTezDagChild { * out an output directory. * @throws IOException */ - private static void configureLocalDirs(JobConf job) throws IOException { + /** + * Configure tez-local-dirs, tez-localized-file-dir, etc. Also create these + * dirs. + */ + + private static void configureLocalDirs(Configuration conf) throws IOException { String[] localSysDirs = StringUtils.getTrimmedStrings( System.getenv(Environment.LOCAL_DIRS.name())); - job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs); - job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, + conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs); + conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, System.getenv(Environment.PWD.name())); - LOG.info(TezJobConfig.LOCAL_DIR + " for child: " + - job.get(TezJobConfig.LOCAL_DIR)); + + LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " + + conf.get(TezJobConfig.LOCAL_DIRS)); LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: " - + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR)); - LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR); + + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR)); + + LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); Path workDir = null; // First, try to find the JOB_LOCAL_DIR on this host. try { - workDir = lDirAlloc.getLocalPathToRead("work", job); + workDir = lDirAlloc.getLocalPathToRead("work", conf); } catch (DiskErrorException e) { // DiskErrorException means dir not found. If not found, it will // be created below. } if (workDir == null) { // JOB_LOCAL_DIR doesn't exist on this host -- Create it. - workDir = lDirAlloc.getLocalPathForWrite("work", job); - FileSystem lfs = FileSystem.getLocal(job).getRaw(); + workDir = lDirAlloc.getLocalPathForWrite("work", conf); + FileSystem lfs = FileSystem.getLocal(conf).getRaw(); boolean madeDir = false; try { madeDir = lfs.mkdirs(workDir); @@ -281,155 +241,29 @@ public class YarnTezDagChild { // at the same time. If this task loses the race, it's okay because // the directory already exists. madeDir = true; - workDir = lDirAlloc.getLocalPathToRead("work", job); + workDir = lDirAlloc.getLocalPathToRead("work", conf); } if (!madeDir) { throw new IOException("Mkdirs failed to create " + workDir.toString()); } } - // TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc. - job.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString()); - } - - private static JobConf configureTask(MRTask task, Credentials credentials, - Token<JobTokenIdentifier> jt, int appAttemptId) - throws IOException, InterruptedException { - JobConf job = task.getConf(); - - // Set it in conf, so as to be able to be used the the OutputCommitter. - job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId); - - // set tcp nodelay - job.setBoolean("ipc.client.tcpnodelay", true); - job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, - YarnOutputFiles.class, MapOutputFile.class); - // set the jobTokenFile into task - SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword()); - - task.setJobTokenSecret(sk); -// task.setJobTokenSecret( -// JobTokenSecretManager.createSecretKey(jt.getPassword())); - - // setup the child's MRConfig.LOCAL_DIR. - configureLocalDirs(job); - - // setup the child's attempt directories - // Do the task-type specific localization - task.localizeConfiguration(job); - - // Set up the DistributedCache related configs - setupDistributedCacheConfig(job); - - // Overwrite the localized task jobconf which is linked to in the current - // work-dir. - Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE); - writeLocalJobFile(localTaskFile, job); - task.setConf(job); - return job; - } - - /** - * Set up the DistributedCache related configs to make - * {@link DistributedCache#getLocalCacheFiles(Configuration)} - * and - * {@link DistributedCache#getLocalCacheArchives(Configuration)} - * working. - * @param job - * @throws IOException - */ - private static void setupDistributedCacheConfig(final JobConf job) - throws IOException { - - String localWorkDir = System.getenv("PWD"); - // ^ ^ all symlinks are created in the current work-dir - - // Update the configuration object with localized archives. - URI[] cacheArchives = DistributedCache.getCacheArchives(job); - if (cacheArchives != null) { - List<String> localArchives = new ArrayList<String>(); - for (int i = 0; i < cacheArchives.length; ++i) { - URI u = cacheArchives[i]; - Path p = new Path(u); - Path name = - new Path((null == u.getFragment()) ? p.getName() - : u.getFragment()); - String linkName = name.toUri().getPath(); - localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); - } - if (!localArchives.isEmpty()) { - job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils - .arrayToString(localArchives.toArray(new String[localArchives - .size()]))); - } - } - - // Update the configuration object with localized files. - URI[] cacheFiles = DistributedCache.getCacheFiles(job); - if (cacheFiles != null) { - List<String> localFiles = new ArrayList<String>(); - for (int i = 0; i < cacheFiles.length; ++i) { - URI u = cacheFiles[i]; - Path p = new Path(u); - Path name = - new Path((null == u.getFragment()) ? p.getName() - : u.getFragment()); - String linkName = name.toUri().getPath(); - localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); - } - if (!localFiles.isEmpty()) { - job.set(MRJobConfig.CACHE_LOCALFILES, - StringUtils.arrayToString(localFiles - .toArray(new String[localFiles.size()]))); - } - } - } - - private static final FsPermission urw_gr = - FsPermission.createImmutable((short) 0640); - - /** - * Write the task specific job-configuration file. - * @throws IOException - */ - private static void writeLocalJobFile(Path jobFile, JobConf conf) - throws IOException { - FileSystem localFs = FileSystem.getLocal(conf); - localFs.delete(jobFile); - OutputStream out = null; - try { - out = FileSystem.create(localFs, jobFile, urw_gr); - conf.writeXml(out); - } finally { - IOUtils.cleanup(LOG, out); - } + conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString()); } - + private static Task createAndConfigureTezTask( - TezEngineTaskContext taskContext, - TezTaskUmbilicalProtocol master, - Credentials credentials, Token<JobTokenIdentifier> jt, - int appAttemptId) - throws IOException, InterruptedException { + TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master, + Credentials cxredentials, Token<JobTokenIdentifier> jobToken, + int appAttemptId) throws IOException, InterruptedException { - Configuration jConf = new JobConf(MRJobConfig.JOB_CONF_FILE); - Configuration conf = MultiStageMRConfigUtil.getConfForVertex(jConf, - taskContext.getVertexName()); + Configuration conf = new Configuration(); + // set tcp nodelay + conf.setBoolean("ipc.client.tcpnodelay", true); + conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId); + + configureLocalDirs(conf); + - // TOOD Post MRR - // A single file per vertex will likely be a better solution. Does not - // require translation - client can take care of this. Will work independent - // of whether the configuration is for intermediate tasks or not. Has the - // overhead of localizing multiple files per job - i.e. the client would - // need to write these files to hdfs, add them as local resources per - // vertex. A solution like this may be more practical once it's possible to - // submit configuration parameters to the AM and effectively tasks via RPC. - - // TODO Avoid all this extra config manipulation. - // FIXME we need I/O/p level configs to be used in init below - final JobConf job = new JobConf(conf); - job.setCredentials(credentials); - // FIXME need Input/Output vertices else we have this hack if (taskContext.getInputSpecList().isEmpty()) { taskContext.getInputSpecList().add( @@ -440,26 +274,37 @@ public class YarnTezDagChild { new OutputSpec("null", 0, SimpleOutput.class.getName())); } Task t = RuntimeUtils.createRuntimeTask(taskContext); - + t.initialize(conf, master); + // FIXME wrapper should initialize all of processor, inputs and outputs // Currently, processor is inited via task init // and processor then inits inputs and outputs - t.initialize(job, master); - - MRTask task = (MRTask)t.getProcessor(); - configureTask(task, credentials, jt, appAttemptId); - return t; } private static void runTezTask( - Task t, TezTaskUmbilicalProtocol master, JobConf job) + Task t, TezTaskUmbilicalProtocol master, Configuration conf) throws IOException, InterruptedException { // use job-specified working directory - FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); + FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf)); // Run! t.run(); t.close(); } + + private static Path getWorkingDirectory(Configuration conf) { + String name = conf.get(JobContext.WORKING_DIR); + if (name != null) { + return new Path(name); + } else { + try { + Path dir = FileSystem.get(conf).getWorkingDirectory(); + conf.set(JobContext.WORKING_DIR, dir.toString()); + return dir; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java ---------------------------------------------------------------------- diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java index ea50a06..c08eda6 100644 --- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java +++ b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java @@ -74,4 +74,6 @@ public interface Task { */ public void close() throws IOException, InterruptedException; + + public Configuration getConfiguration(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java index 5823de6..93aeb0b 100644 --- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java +++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java @@ -55,6 +55,11 @@ public class TezEngineTaskContext extends TezTaskContext { this.processorName = processorName; } + public String getRuntimeName() { + // FIXME. Add this to the DAG configuration, and fetch from there. + return "org.apache.tez.mapreduce.task.MRRuntimeTask"; + } + public String getProcessorName() { return processorName; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java index 6a065de..202abd6 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java @@ -80,7 +80,7 @@ public class Shuffle implements ExceptionReporter { FileSystem localFS = FileSystem.getLocal(this.conf); LocalDirAllocator localDirAllocator = - new LocalDirAllocator(TezJobConfig.LOCAL_DIR); + new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); copyPhase = this.runningTaskContext.getProgress().addPhase("copy", 0.33f); mergePhase = this.runningTaskContext.getProgress().addPhase("merge", 0.66f); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java index 829495b..eeea764 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java @@ -57,7 +57,7 @@ public class TezMerger { // Local directories private static LocalDirAllocator lDirAlloc = - new LocalDirAllocator(TezJobConfig.LOCAL_DIR); + new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java index ccfbd78..69484af 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java @@ -42,7 +42,7 @@ import org.apache.tez.dag.records.TezTaskID; public class TezLocalTaskOutputFiles extends TezTaskOutput { private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(TezJobConfig.LOCAL_DIR); + new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); public TezLocalTaskOutputFiles() { } @@ -223,7 +223,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput { } private String[] getLocalDirs() throws IOException { - return getConf().getStrings(TezJobConfig.LOCAL_DIR); + return getConf().getStrings(TezJobConfig.LOCAL_DIRS); } @SuppressWarnings("deprecation") http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java index 1bd65e2..5fb6519 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java @@ -56,7 +56,7 @@ public class TezTaskOutputFiles extends TezTaskOutput { // assume configured to $localdir/usercache/$user/appcache/$appId private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(TezJobConfig.LOCAL_DIR); + new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); private Path getAttemptOutputDir() { LOG.info("DEBUG: getAttemptOutputDir: " http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java index e04d405..900c2f0 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java @@ -66,6 +66,7 @@ public class RuntimeUtils { + ", Processor: " + taskContext.getProcessorName() + ", InputCount=" + taskContext.getInputSpecList().size() + ", OutputCount=" + taskContext.getOutputSpecList().size()); + RuntimeTask t = null; try { Class<?> processorClazz = @@ -108,11 +109,30 @@ public class RuntimeUtils { outputs[i] = output; } } - t = new RuntimeTask(processor, inputs, outputs); + // t = new RuntimeTask(taskContext, processor, inputs, outputs); + t = createRuntime(taskContext, processor, inputs, outputs); } catch (ClassNotFoundException e) { throw new YarnException("Unable to initialize RuntimeTask, context=" + taskContext, e); } return t; } + + private static RuntimeTask createRuntime(TezEngineTaskContext taskContext, + Processor processor, Input[] inputs, Output[] outputs) { + try { + // TODO Change this to use getNewInstance + Class<?> runtimeClazz = Class.forName(taskContext.getRuntimeName()); + Constructor<?> ctor = runtimeClazz.getConstructor( + TezEngineTaskContext.class, Processor.class, Input[].class, + Output[].class); + ctor.setAccessible(true); + return (RuntimeTask) ctor.newInstance(taskContext, processor, inputs, outputs); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to load runtimeClass: " + + taskContext.getRuntimeName(), e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java index 3e2c6f2..871f3ba 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java @@ -20,6 +20,7 @@ package org.apache.tez.engine.task; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezEngineTaskContext; import org.apache.tez.engine.api.Input; import org.apache.tez.engine.api.Master; import org.apache.tez.engine.api.Output; @@ -28,17 +29,19 @@ import org.apache.tez.engine.api.Task; public class RuntimeTask implements Task { - private final Input[] inputs; - private final Output[] outputs; - private final Processor processor; + protected final Input[] inputs; + protected final Output[] outputs; + protected final Processor processor; - private Configuration conf; - private Master master; + protected TezEngineTaskContext taskContext; + protected Configuration conf; + protected Master master; - public RuntimeTask( + public RuntimeTask(TezEngineTaskContext taskContext, Processor processor, Input[] inputs, Output[] outputs) { + this.taskContext = taskContext; this.inputs = inputs; this.processor = processor; this.outputs = outputs; @@ -74,7 +77,13 @@ public class RuntimeTask implements Task { public void close() throws IOException, InterruptedException { // NOTE: Allow processor to close input/output + // This can be changed to close input/output since MRRuntimeTask is used for + // MR jobs, which changes the order. processor.close(); } + @Override + public Configuration getConfiguration() { + return this.conf; + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 63472d2..5c181e2 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -465,7 +465,7 @@ public class LocalJobRunner implements ClientProtocol { LOG.info("XXX mapId: " + i + " LOCAL_DIR = " + mapOutputFiles.get(mapId).getConf().get( - TezJobConfig.LOCAL_DIR)); + TezJobConfig.LOCAL_DIRS)); Path mapOut = mapOutputFiles.get(mapId).getOutputFile(); TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles(); localOutputFile.setConf(localConf); @@ -902,7 +902,7 @@ public class LocalJobRunner implements ClientProtocol { TaskAttemptID taskAttemptID, String user, JobConf conf) { String[] localDirs = conf.getTrimmedStrings( - TezJobConfig.LOCAL_DIR, TezJobConfig.DEFAULT_LOCAL_DIR); + TezJobConfig.LOCAL_DIRS, TezJobConfig.DEFAULT_LOCAL_DIRS); String jobId = taskAttemptID.getJobID().toString(); String taskId = taskAttemptID.getTaskID().toString(); boolean isCleanup = false; @@ -913,9 +913,9 @@ public class LocalJobRunner implements ClientProtocol { childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR + getLocalTaskDir(user, jobId, taskId, isCleanup)); } - LOG.info(TezJobConfig.LOCAL_DIR + " for child : " + taskAttemptID + + LOG.info(TezJobConfig.LOCAL_DIRS + " for child : " + taskAttemptID + " is " + childMapredLocalDir); - conf.set(TezJobConfig.LOCAL_DIR, childMapredLocalDir.toString()); + conf.set(TezJobConfig.LOCAL_DIRS, childMapredLocalDir.toString()); conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, TezLocalTaskOutputFiles.class, TezTaskOutput.class); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java new file mode 100644 index 0000000..6091d4b --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.task; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import javax.crypto.SecretKey; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.tez.common.TezEngineTaskContext; +import org.apache.tez.common.TezJobConfig; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Master; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.apache.tez.engine.common.security.TokenCache; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.tez.mapreduce.processor.MRTask; +import org.apache.tez.mapreduce.task.impl.YarnOutputFiles; + +@SuppressWarnings("deprecation") +public class MRRuntimeTask extends RuntimeTask { + + private static final Log LOG = LogFactory.getLog(MRRuntimeTask.class); + + private MRTask mrTask; + + public MRRuntimeTask(TezEngineTaskContext taskContext, Processor processor, + Input[] inputs, Output[] outputs) { + super(taskContext, processor, inputs, outputs); + } + + @Override + public void initialize(Configuration conf, Master master) throws IOException, + InterruptedException { + + DeprecatedKeys.init(); + + Configuration mrConf = new Configuration(conf); + mrConf.addResource(MRJobConfig.JOB_CONF_FILE); + Configuration taskConf = MultiStageMRConfigUtil.getConfForVertex(mrConf, + taskContext.getVertexName()); + + // TODO Avoid all this extra config manipulation. + // FIXME we need I/O/p level configs to be used in init below + + // TOOD Post MRR + // A single file per vertex will likely be a better solution. Does not + // require translation - client can take care of this. Will work independent + // of whether the configuration is for intermediate tasks or not. Has the + // overhead of localizing multiple files per job - i.e. the client would + // need to write these files to hdfs, add them as local resources per + // vertex. A solution like this may be more practical once it's possible to + // submit configuration parameters to the AM and effectively tasks via RPC. + + final JobConf job = new JobConf(taskConf); + + MRTask mrTask = (MRTask) getProcessor(); + this.mrTask = mrTask; + configureMRTask(job, mrTask); + + this.conf = job; + this.master = master; + + // NOTE: Allow processor to initialize input/output + processor.initialize(this.conf, this.master); + } + + @Override + public void run() throws IOException, InterruptedException { + TezTaskUmbilicalProtocol umbilical = (TezTaskUmbilicalProtocol) master; + try { + super.run(); + } catch (FSError e) { + throw e; + } catch (Exception exception) { + LOG.warn("Exception running child : " + + StringUtils.stringifyException(exception)); + try { + if (mrTask != null) { + mrTask.taskCleanup(umbilical); + } + } catch (Exception e) { + LOG.info("Exception cleanup up: " + StringUtils.stringifyException(e)); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + exception.printStackTrace(new PrintStream(baos)); + if (taskContext.getTaskAttemptId() != null) { + umbilical.fatalError(taskContext.getTaskAttemptId(), baos.toString()); + } + } + } + + @Override + public void close() throws IOException, InterruptedException { + // NOTE: Allow processor to close input/output + processor.close(); + } + + private static void configureMRTask(JobConf job, MRTask task) + throws IOException, InterruptedException { + + Credentials credentials = UserGroupInformation.getCurrentUser() + .getCredentials(); + job.setCredentials(credentials); + // TODO Can this be avoided all together. Have the MRTezOutputCommitter use + // the Tez parameter. + // TODO This could be fetched from the env if YARN is setting it for all + // Containers. + // Set it in conf, so as to be able to be used the the OutputCommitter. + job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, + job.getInt(TezJobConfig.APPLICATION_ATTEMPT_ID, -1)); + + job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class, + MapOutputFile.class); // MR + + Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials); + if (jobToken != null) { + // Will MR ever run without a job token. + SecretKey sk = JobTokenSecretManager.createSecretKey(jobToken + .getPassword()); + task.setJobTokenSecret(sk); + } else { + LOG.warn("No job token set"); + } + + job.set(MRJobConfig.JOB_LOCAL_DIR, job.get(TezJobConfig.JOB_LOCAL_DIR)); + + // setup the child's attempt directories + // Do the task-type specific localization + task.localizeConfiguration(job); + + // Set up the DistributedCache related configs + setupDistributedCacheConfig(job); + + // Overwrite the localized task jobconf which is linked to in the current + // work-dir. + Path localTaskFile = new Path( + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), + MRJobConfig.JOB_CONF_FILE); + writeLocalJobFile(localTaskFile, job); + + task.setConf(job); + } + + /** + * Set up the DistributedCache related configs to make + * {@link DistributedCache#getLocalCacheFiles(Configuration)} and + * {@link DistributedCache#getLocalCacheArchives(Configuration)} working. + * + * @param job + * @throws IOException + */ + private static void setupDistributedCacheConfig(final JobConf job) + throws IOException { + + String localWorkDir = System.getenv(System.getenv(Environment.PWD.name())); + // ^ ^ all symlinks are created in the current work-dir + + // Update the configuration object with localized archives. + URI[] cacheArchives = DistributedCache.getCacheArchives(job); + if (cacheArchives != null) { + List<String> localArchives = new ArrayList<String>(); + for (int i = 0; i < cacheArchives.length; ++i) { + URI u = cacheArchives[i]; + Path p = new Path(u); + Path name = new Path((null == u.getFragment()) ? p.getName() + : u.getFragment()); + String linkName = name.toUri().getPath(); + localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); + } + if (!localArchives.isEmpty()) { + job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils + .arrayToString(localArchives.toArray(new String[localArchives + .size()]))); + } + } + + // Update the configuration object with localized files. + URI[] cacheFiles = DistributedCache.getCacheFiles(job); + if (cacheFiles != null) { + List<String> localFiles = new ArrayList<String>(); + for (int i = 0; i < cacheFiles.length; ++i) { + URI u = cacheFiles[i]; + Path p = new Path(u); + Path name = new Path((null == u.getFragment()) ? p.getName() + : u.getFragment()); + String linkName = name.toUri().getPath(); + localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); + } + if (!localFiles.isEmpty()) { + job.set(MRJobConfig.CACHE_LOCALFILES, StringUtils + .arrayToString(localFiles.toArray(new String[localFiles.size()]))); + } + } + } + + private static final FsPermission urw_gr = FsPermission + .createImmutable((short) 0640); + + /** + * Write the task specific job-configuration file. + * + * @throws IOException + */ + private static void writeLocalJobFile(Path jobFile, JobConf conf) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + localFs.delete(jobFile); + OutputStream out = null; + try { + out = FileSystem.create(localFs, jobFile, urw_gr); + conf.writeXml(out); + } finally { + IOUtils.cleanup(LOG, out); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 0b6bc5f..48a1113 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -24,14 +24,17 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -40,6 +43,8 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.tez.common.InputSpec; import org.apache.tez.common.OutputSpec; import org.apache.tez.common.TezEngineTaskContext; @@ -55,6 +60,51 @@ public class MapUtils { private static final Log LOG = LogFactory.getLog(MapUtils.class); + public static void configureLocalDirs(Configuration conf, String localDir) + throws IOException { + String[] localSysDirs = new String[1]; + localSysDirs[0] = localDir; + + conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs); + conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, + System.getenv(Environment.PWD.name())); + + LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " + + conf.get(TezJobConfig.LOCAL_DIRS)); + LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: " + + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR)); + + LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); + Path workDir = null; + // First, try to find the JOB_LOCAL_DIR on this host. + try { + workDir = lDirAlloc.getLocalPathToRead("work", conf); + } catch (DiskErrorException e) { + // DiskErrorException means dir not found. If not found, it will + // be created below. + } + if (workDir == null) { + // JOB_LOCAL_DIR doesn't exist on this host -- Create it. + workDir = lDirAlloc.getLocalPathForWrite("work", conf); + FileSystem lfs = FileSystem.getLocal(conf).getRaw(); + boolean madeDir = false; + try { + madeDir = lfs.mkdirs(workDir); + } catch (FileAlreadyExistsException e) { + // Since all tasks will be running in their own JVM, the race condition + // exists where multiple tasks could be trying to create this directory + // at the same time. If this task loses the race, it's okay because + // the directory already exists. + madeDir = true; + workDir = lDirAlloc.getLocalPathToRead("work", conf); + } + if (!madeDir) { + throw new IOException("Mkdirs failed to create " + workDir.toString()); + } + } + conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString()); + } + private static InputSplit createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index cda15fb..1e5fe9b 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -55,36 +55,40 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +@SuppressWarnings("deprecation") public class TestMapProcessor { private static final Log LOG = LogFactory.getLog(TestMapProcessor.class); private static JobConf defaultConf = new JobConf(); private static FileSystem localFs = null; + private static Path workDir = null; static { try { defaultConf.set("fs.defaultFS", "file:///"); localFs = FileSystem.getLocal(defaultConf); + workDir = + new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestMapProcessor").makeQualified(localFs); + MapUtils.configureLocalDirs(defaultConf, workDir.toString()); } catch (IOException e) { throw new RuntimeException("init failure", e); } } - @SuppressWarnings("deprecation") - private static Path workDir = - new Path(new Path(System.getProperty("test.build.data", "/tmp")), - "TestMapProcessor").makeQualified(localFs); + + TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); public void setUpJobConf(JobConf job) { - job.set(TezJobConfig.LOCAL_DIR, workDir.toString()); + job.set(TezJobConfig.LOCAL_DIRS, workDir.toString()); job.setClass( Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, TezLocalTaskOutputFiles.class, TezTaskOutput.class); job.setNumReduceTasks(1); } - + @Before @After public void cleanup() throws Exception { @@ -98,15 +102,18 @@ public class TestMapProcessor { setUpJobConf(jobConf); TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(); mapOutputs.setConf(jobConf); - + Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); + conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0); + Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf, vertexName); JobConf job = new JobConf(stageConf); - + job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); + MapUtils.runMapProcessor(localFs, workDir, job, 0, new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 69571e1..d17b0be 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -56,27 +56,30 @@ import org.junit.Before; import org.junit.Test; +@SuppressWarnings("deprecation") public class TestReduceProcessor { private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class); private static JobConf defaultConf = new JobConf(); - private static FileSystem localFs = null; + private static FileSystem localFs = null; + private static Path workDir = null; static { try { defaultConf.set("fs.defaultFS", "file:///"); localFs = FileSystem.getLocal(defaultConf); + workDir = + new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestReduceProcessor").makeQualified(localFs); + + MapUtils.configureLocalDirs(defaultConf, workDir.toString()); } catch (IOException e) { throw new RuntimeException("init failure", e); } } - @SuppressWarnings("deprecation") - private static Path workDir = - new Path(new Path(System.getProperty("test.build.data", "/tmp")), - "TestReduceProcessor").makeQualified(localFs); public void setUpJobConf(JobConf job) { - job.set(TezJobConfig.LOCAL_DIR, workDir.toString()); + job.set(TezJobConfig.LOCAL_DIRS, workDir.toString()); job.setClass( Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, TezLocalTaskOutputFiles.class, @@ -101,6 +104,7 @@ public class TestReduceProcessor { mapOutputs.setConf(jobConf); Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf); + conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0); Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf, mapVertexName); @@ -109,6 +113,7 @@ public class TestReduceProcessor { mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, "localized-resources").toUri().toString()); + // Run a map MapUtils.runMapProcessor(localFs, workDir, mapConf, 0, new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName, @@ -124,6 +129,8 @@ public class TestReduceProcessor { reduceVertexName); JobConf reduceConf = new JobConf(reduceStageConf); reduceConf.setOutputFormat(SequenceFileOutputFormat.class); + reduceConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + "localized-resources").toUri().toString()); FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output")); // Now run a reduce @@ -147,8 +154,8 @@ public class TestReduceProcessor { .toMRTaskId(taskContext.getTaskAttemptId().getTaskID())); Path reduceOutputFile = new Path(reduceOutputDir, "part-00000"); - @SuppressWarnings("deprecation") - SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, reduceConf); + SequenceFile.Reader reader = new SequenceFile.Reader(localFs, + reduceOutputFile, reduceConf); LongWritable key = new LongWritable(); Text value = new Text();
