Repository: tez Updated Branches: refs/heads/master c5f2eac8d -> db6d9b29e
TEZ-1469. AM/Session LRs are not shipped to vertices in new API use-case (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db6d9b29 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db6d9b29 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db6d9b29 Branch: refs/heads/master Commit: db6d9b29e07b0337509024cc5ce9bdf9fdc6627b Parents: c5f2eac Author: Bikas Saha <[email protected]> Authored: Wed Aug 20 18:46:10 2014 -0700 Committer: Bikas Saha <[email protected]> Committed: Wed Aug 20 18:46:10 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/client/AMConfiguration.java | 27 +++-- .../java/org/apache/tez/client/TezClient.java | 78 ++++++-------- .../org/apache/tez/client/TezClientUtils.java | 106 +++++++------------ .../org/apache/tez/common/TezCommonUtils.java | 19 +++- .../main/java/org/apache/tez/dag/api/DAG.java | 26 ++++- .../tez/dag/api/DataSourceDescriptor.java | 22 ++-- .../org/apache/tez/dag/api/TezConstants.java | 9 +- .../java/org/apache/tez/dag/api/Vertex.java | 17 +-- .../org/apache/tez/client/TestTezClient.java | 22 ++-- .../apache/tez/common/TestTezCommonUtils.java | 4 +- .../org/apache/tez/dag/api/TestDAGPlan.java | 18 ++-- .../org/apache/tez/dag/api/TestDAGVerify.java | 81 +++++++++++++- .../java/org/apache/tez/dag/app/AppContext.java | 3 - .../org/apache/tez/dag/app/DAGAppMaster.java | 14 +-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 - .../apache/tez/mapreduce/client/YARNRunner.java | 2 +- .../mapreduce/hadoop/TestMRInputHelpers.java | 10 +- .../mapreduce/examples/FilterLinesByWord.java | 4 +- .../examples/FilterLinesByWordOneToOne.java | 4 +- .../tez/mapreduce/examples/MRRSleepJob.java | 6 +- .../examples/TestOrderedWordCount.java | 8 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 2 +- 23 files changed, 274 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c611668..0deb1ca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -79,6 +79,7 @@ INCOMPATIBLE CHANGES TEZ-1455. Replace deprecated junit.framework.Assert with org.junit.Assert TEZ-1465. Update and document IntersectExample. Change name to JoinExample TEZ-1449. Change user payloads to work with a byte buffer + TEZ-1472. AM/Session LRs are not shipped to vertices in new API use-case Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java index 9206e72..d8cc2bf 100644 --- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java @@ -31,18 +31,18 @@ import com.google.common.collect.Maps; @Private class AMConfiguration { - private Map<String, LocalResource> localResources; + private Map<String, LocalResource> amLocalResources = Maps.newHashMap(); private TezConfiguration tezConf; private Credentials credentials; private YarnConfiguration yarnConfig; private Map<String, String> env; + private LocalResource binaryConfLRsrc; AMConfiguration(TezConfiguration tezConf, Map<String, LocalResource> localResources, Credentials credentials) { - this.localResources = Maps.newHashMap(); this.tezConf = tezConf; if (localResources != null) { - addLocalResources(localResources); + addAMLocalResources(localResources); } if (credentials != null) { setCredentials(credentials); @@ -50,12 +50,12 @@ class AMConfiguration { } - void addLocalResources(Map<String, LocalResource> localResources) { - this.localResources.putAll(localResources); + void addAMLocalResources(Map<String, LocalResource> localResources) { + this.amLocalResources.putAll(localResources); } - void clearLocalResources() { - this.localResources.clear(); + void clearAMLocalResources() { + this.amLocalResources.clear(); } void setCredentials(Credentials credentials) { @@ -74,8 +74,8 @@ class AMConfiguration { return this.tezConf.get(TezConfiguration.TEZ_QUEUE_NAME); } - Map<String, LocalResource> getLocalResources() { - return localResources; + Map<String, LocalResource> getAMLocalResources() { + return amLocalResources; } TezConfiguration getTezConfiguration() { @@ -93,4 +93,13 @@ class AMConfiguration { Map<String, String> getEnv() { return env; } + + void setBinaryConfLR(LocalResource binaryConfLRsrc) { + this.binaryConfLRsrc = binaryConfLRsrc; + } + + LocalResource getBinaryConfLR() { + return binaryConfLRsrc; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 7250379..4b1f221 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.tez.common.TezYARNUtils; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DAGSubmissionTimedOut; @@ -48,7 +47,6 @@ import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto; @@ -159,8 +157,8 @@ public class TezClient { * as app master name is session mode * @param tezConf * Configuration for the framework - * @param localResources - * resources for the App Master + * @param localFiles + * local files for the App Master * @param credentials * Set security credentials to be used inside the app master, if * needed. Tez App Master needs credentials to access the staging @@ -174,9 +172,9 @@ public class TezClient { * calling start() */ public static TezClient create(String name, TezConfiguration tezConf, - @Nullable Map<String, LocalResource> localResources, + @Nullable Map<String, LocalResource> localFiles, @Nullable Credentials credentials) { - return new TezClient(name, tezConf, localResources, credentials); + return new TezClient(name, tezConf, localFiles, credentials); } /** @@ -195,58 +193,59 @@ public class TezClient { /** * Create a new TezClient with AM session mode set explicitly. This overrides * the setting from configuration. - * Set the initial resources and security credentials for the App Master. + * Set the initial files and security credentials for the App Master. * @param name * Name of the client. Used for logging etc. This will also be used * as app master name is session mode * @param tezConf Configuration for the framework * @param isSession The AM will run in session mode or not - * @param localResources resources for the App Master + * @param localFiles local files for the App Master * @param credentials credentials for the App Master */ public static TezClient create(String name, TezConfiguration tezConf, boolean isSession, - @Nullable Map<String, LocalResource> localResources, + @Nullable Map<String, LocalResource> localFiles, @Nullable Credentials credentials) { - return new TezClient(name, tezConf, isSession, localResources, credentials); + return new TezClient(name, tezConf, isSession, localFiles, credentials); } /** - * Add local resources for the DAG App Master. <br> + * Add local files for the DAG App Master. These may be files, archives, + * jars etc.<br> * <p> - * In non-session mode these will be added to the resources of the App Master - * to be launched for the next DAG. Resources added via this method will + * In non-session mode these will be added to the files of the App Master + * to be launched for the next DAG. Files added via this method will * accumulate and be used for every new App Master until - * clearAppMasterLocalResource() is invoked. <br> + * {@link #clearAppMasterLocalFiles()} is invoked. <br> * <p> - * In session mode, the recommended usage is to add all resources before - * calling start() so that all needed resources are available to the app - * master before it starts. When called after start(), these local resources + * In session mode, the recommended usage is to add all files before + * calling start() so that all needed files are available to the app + * master before it starts. When called after start(), these local files * will be re-localized to the running session DAG App Master and will be * added to its classpath for execution of this DAG. * <p> - * Caveats for invoking this method after start() in session mode: Resources + * Caveats for invoking this method after start() in session mode: files * accumulate across DAG submissions and are never removed from the classpath. - * Only LocalResourceType.FILE is supported. All resources will be treated as + * Only LocalResourceType.FILE is supported. All files will be treated as * private. * - * @param localResources + * @param localFiles */ - public synchronized void addAppMasterLocalResources(Map<String, LocalResource> localResources) { - Preconditions.checkNotNull(localResources); + public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) { + Preconditions.checkNotNull(localFiles); if (isSession && sessionStarted) { - additionalLocalResources.putAll(localResources); + additionalLocalResources.putAll(localFiles); } - amConfig.addLocalResources(localResources); + amConfig.addAMLocalResources(localFiles); } /** - * If the next DAG App Master needs different local resources, then use this - * method to clear the local resources and then add the new local resources - * using addAppMasterLocalResources(). This method is a no-op in session mode, + * If the next DAG App Master needs different local files, then use this + * method to clear the local files and then add the new local files + * using {@link #addAppMasterLocalFiles(Map)}. This method is a no-op in session mode, * after start() is called. */ - public synchronized void clearAppMasterLocalResource() { - amConfig.clearLocalResources(); + public synchronized void clearAppMasterLocalFiles() { + amConfig.clearAMLocalResources(); } /** @@ -304,7 +303,7 @@ public class TezClient { ApplicationSubmissionContext appContext = TezClientUtils.createApplicationSubmissionContext( - amConfig.getTezConfiguration(), sessionAppId, + sessionAppId, null, clientName, amConfig, tezJarResources, sessionCredentials); @@ -365,19 +364,10 @@ public class TezClient { + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported"); } } - - // Obtain DAG specific credentials. - TezClientUtils.setupDAGCredentials(dag, sessionCredentials, amConfig.getTezConfiguration()); - - // TODO TEZ-1229 - fix jar resources - // setup env - for (Vertex v : dag.getVertices()) { - Map<String, String> taskEnv = v.getTaskEnvironment(); - TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(), - TezConfiguration.TEZ_TASK_LAUNCH_ENV, TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, - TezClientUtils.usingTezLibsFromArchive(getTezJarResources(sessionCredentials))); - TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration()); - } + + Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials); + TezClientUtils.updateDAGVertices(dag, amConfig, tezJarResources, + TezClientUtils.usingTezLibsFromArchive(tezJarResources), sessionCredentials); DAGPlan dagPlan = dag.createDag(amConfig.getTezConfiguration()); SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder(); @@ -684,7 +674,7 @@ public class TezClient { // Add credentials for tez-local resources. Map<String, LocalResource> tezJarResources = getTezJarResources(credentials); ApplicationSubmissionContext appContext = TezClientUtils - .createApplicationSubmissionContext(amConfig.getTezConfiguration(), + .createApplicationSubmissionContext( appId, dag, dag.getName(), amConfig, tezJarResources, credentials); LOG.info("Submitting DAG to YARN" + ", applicationId=" + appId http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 566a2d8..d931519 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -387,13 +387,13 @@ public class TezClientUtils { * @throws YarnException */ static ApplicationSubmissionContext createApplicationSubmissionContext( - TezConfiguration conf, ApplicationId appId, DAG dag, String amName, + ApplicationId appId, DAG dag, String amName, AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, Credentials sessionCreds) throws IOException, YarnException{ Preconditions.checkNotNull(sessionCreds); - + TezConfiguration conf = amConfig.getTezConfiguration(); boolean tezLrsAsArchive = usingTezLibsFromArchive(tezJarResources); FileSystem fs = TezClientUtils.ensureStagingDirExists(conf, @@ -434,12 +434,6 @@ public class TezClientUtils { amLaunchCredentials.writeTokenStorageToStream(dob); securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - // Need to set credentials based on DAG and the URIs which have been set for the DAG. - - if (dag != null) { - setupDAGCredentials(dag, sessionCreds, conf); - } - // Setup the command to run the AM List<String> vargs = new ArrayList<String>(8); vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); @@ -497,18 +491,17 @@ public class TezClientUtils { } } - Map<String, LocalResource> localResources = + Map<String, LocalResource> amLocalResources = new TreeMap<String, LocalResource>(); // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials. - if (amConfig.getLocalResources() != null) { - localResources.putAll(amConfig.getLocalResources()); + if (amConfig.getAMLocalResources() != null) { + amLocalResources.putAll(amConfig.getAMLocalResources()); } - localResources.putAll(tezJarResources); + amLocalResources.putAll(tezJarResources); // emit conf as PB file - Configuration finalTezConf = createFinalTezConfForApp(conf, - amConfig.getTezConfiguration()); + Configuration finalTezConf = createFinalTezConfForApp(amConfig.getTezConfiguration()); FSDataOutputStream amConfPBOutBinaryStream = null; try { @@ -535,27 +528,19 @@ public class TezClientUtils { TezClientUtils.createLocalResource(fs, binaryConfPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); - localResources.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, + amConfig.setBinaryConfLR(binaryConfLRsrc); + amLocalResources.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, binaryConfLRsrc); // Create Session Jars definition to be sent to AM as a local resource - Path sessionJarsPath = TezCommonUtils.getTezSessionJarStagingPath(tezSysStagingPath); + Path sessionJarsPath = TezCommonUtils.getTezAMJarStagingPath(tezSysStagingPath); FSDataOutputStream sessionJarsPBOutStream = null; try { - Map<String, LocalResource> sessionJars = - new HashMap<String, LocalResource>(tezJarResources.size() + 1); - sessionJars.putAll(tezJarResources); - sessionJars.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, - binaryConfLRsrc); - DAGProtos.PlanLocalResourcesProto proto = - DagTypeConverters.convertFromLocalResources(sessionJars); sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath); - proto.writeDelimitedTo(sessionJarsPBOutStream); - // Write out the initial list of resources which will be available in the AM DAGProtos.PlanLocalResourcesProto amResourceProto; - if (amConfig.getLocalResources() != null && !amConfig.getLocalResources().isEmpty()) { - amResourceProto = DagTypeConverters.convertFromLocalResources(localResources); + if (amLocalResources != null && !amLocalResources.isEmpty()) { + amResourceProto = DagTypeConverters.convertFromLocalResources(amLocalResources); } else { amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance(); } @@ -570,8 +555,8 @@ public class TezClientUtils { TezClientUtils.createLocalResource(fs, sessionJarsPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); - localResources.put( - TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME, + amLocalResources.put( + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME, sessionJarsPBLRsrc); String user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -579,21 +564,8 @@ public class TezClientUtils { Map<ApplicationAccessType, String> acls = aclManager.toYARNACls(); if(dag != null) { - - for (Vertex v : dag.getVertices()) { - if (tezJarResources != null) { - v.getTaskLocalFiles().putAll(tezJarResources); - } - v.getTaskLocalFiles().put(TezConstants.TEZ_PB_BINARY_CONF_NAME, - binaryConfLRsrc); - - Map<String, String> taskEnv = v.getTaskEnvironment(); - TezYARNUtils.setupDefaultEnv(taskEnv, conf, - TezConfiguration.TEZ_TASK_LAUNCH_ENV, - TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive); - - TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration()); - } + + updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, sessionCreds); // emit protobuf DAG file style Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath); @@ -602,8 +574,6 @@ public class TezClientUtils { + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :" + sessionJarsPath + " binaryPlanPath :" + binaryPath); } - amConfig.getTezConfiguration().set(TezConstants.TEZ_AM_PLAN_REMOTE_PATH, - binaryPath.toUri().toString()); DAGPlan dagPB = dag.createDag(amConfig.getTezConfiguration()); @@ -619,14 +589,14 @@ public class TezClientUtils { } } - localResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME, + amLocalResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME, TezClientUtils.createLocalResource(fs, binaryPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) { Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath); - localResources.put(TezConstants.TEZ_PB_PLAN_TEXT_NAME, + amLocalResources.put(TezConstants.TEZ_PB_PLAN_TEXT_NAME, TezClientUtils.createLocalResource(fs, textPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); @@ -635,7 +605,7 @@ public class TezClientUtils { // Setup ContainerLaunchContext for AM container ContainerLaunchContext amContainer = - ContainerLaunchContext.newInstance(localResources, environment, + ContainerLaunchContext.newInstance(amLocalResources, environment, vargsFinal, null, securityTokens, acls); // Set up the ApplicationSubmissionContext @@ -662,6 +632,25 @@ public class TezClientUtils { } + static void updateDAGVertices(DAG dag, AMConfiguration amConfig, + Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive, + Credentials credentials) throws IOException { + setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration()); + for (Vertex v : dag.getVertices()) { + if (tezJarResources != null) { + v.getTaskLocalFiles().putAll(tezJarResources); + } + v.getTaskLocalFiles().put(TezConstants.TEZ_PB_BINARY_CONF_NAME, + amConfig.getBinaryConfLR()); + + Map<String, String> taskEnv = v.getTaskEnvironment(); + TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(), + TezConfiguration.TEZ_TASK_LAUNCH_ENV, + TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive); + + setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration()); + } + } static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) { if (vargs != null && !vargs.isEmpty()) { @@ -714,29 +703,14 @@ public class TezClientUtils { + "," + TezConstants.TEZ_CONTAINER_LOGGER_NAME); } - static Configuration createFinalTezConfForApp(TezConfiguration tezConf, - TezConfiguration amConf) { + static Configuration createFinalTezConfForApp(TezConfiguration amConf) { Configuration conf = new Configuration(false); conf.setQuietMode(true); - assert tezConf != null; assert amConf != null; Entry<String, String> entry; - Iterator<Entry<String, String>> iter = tezConf.iterator(); - while (iter.hasNext()) { - entry = iter.next(); - // Copy all tez config parameters. - if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) { - conf.set(entry.getKey(), entry.getValue()); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding tez dag am parameter from conf: " + entry.getKey() - + ", with value: " + entry.getValue()); - } - } - } - - iter = amConf.iterator(); + Iterator<Entry<String, String>> iter = amConf.iterator(); while (iter.hasNext()) { entry = iter.next(); // Copy all tez config parameters. http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index 15878c3..5148770 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.StringTokenizer; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; @@ -165,8 +167,8 @@ public class TezCommonUtils { * @return path to store the session jars */ @Private - public static Path getTezSessionJarStagingPath(Path tezSysStagingPath) { - return new Path(tezSysStagingPath, TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME); + public static Path getTezAMJarStagingPath(Path tezSysStagingPath) { + return new Path(tezSysStagingPath, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME); } /** @@ -295,6 +297,19 @@ public class TezCommonUtils { public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException { return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION)); } + + public static void addAdditionalLocalResources(Map<String, LocalResource> additionalLrs, + Map<String, LocalResource> originalLRs) { + if (additionalLrs != null && !additionalLrs.isEmpty()) { + for (Map.Entry<String, LocalResource> lr : additionalLrs.entrySet()) { + if (originalLRs.containsKey(lr.getKey())) { + throw new TezUncheckedException("Attempting to add duplicate resource: " + lr.getKey()); + } else { + originalLRs.put(lr.getKey(), lr.getValue()); + } + } + } + } @Private public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 5ad724c..fe73c47 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -63,6 +63,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** @@ -85,13 +86,28 @@ public class DAG { Set<VertexGroup> vertexGroups = Sets.newHashSet(); Set<GroupInputEdge> groupInputEdges = Sets.newHashSet(); private DAGAccessControls dagAccessControls; - + Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap(); + private Stack<String> topologicalVertexStack = new Stack<String>(); public DAG(String name) { this.name = name; } + /** + * Set the files etc that must be provided to the tasks of this DAG + * @param localFiles + * files that must be available locally for each task. These files + * may be regular files, archives etc. as specified by the value + * elements of the map. + * @return {@link DAG} + */ + public DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) { + Preconditions.checkNotNull(localFiles); + TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles); + return this; + } + public synchronized DAG addVertex(Vertex vertex) { if (vertices.containsKey(vertex.getName())) { throw new IllegalStateException( @@ -597,7 +613,7 @@ public class DAG { if (dataSource.getCredentials() != null) { credentials.addAll(dataSource.getCredentials()); } - vertex.addAdditionalLocalResources(dataSource.getAdditionalLocalResources()); + vertex.addTaskLocalFiles(dataSource.getAdditionalLocalFiles()); } if (dataSources.size() == 1) { DataSourceDescriptor dataSource = dataSources.get(0); @@ -614,6 +630,9 @@ public class DAG { } } + // add common task files for this DAG + vertex.addTaskLocalFiles(commonTaskLocalFiles); + VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder(); vertexBuilder.setName(vertex.getName()); vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46. @@ -752,7 +771,8 @@ public class DAG { confProtoBuilder.addConfKeyValues(kvp); } } - dagBuilder.setDagKeyValues(confProtoBuilder); + dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere + // should this replace BINARY_PB_CONF??? if (credentials != null) { dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials)); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java index fc9cf23..f8e6072 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java @@ -42,7 +42,7 @@ public class DataSourceDescriptor { private final Credentials credentials; private final int numShards; private final VertexLocationHint locationHint; - private final Map<String, LocalResource> additionalLocalResources; + private final Map<String, LocalResource> additionalLocalFiles; private DataSourceDescriptor(InputDescriptor inputDescriptor, @Nullable InputInitializerDescriptor initializerDescriptor, @@ -55,13 +55,13 @@ public class DataSourceDescriptor { int numShards, @Nullable Credentials credentials, @Nullable VertexLocationHint locationHint, - @Nullable Map<String, LocalResource> additionalLocalResources) { + @Nullable Map<String, LocalResource> additionalLocalFiles) { this.inputDescriptor = inputDescriptor; this.initializerDescriptor = initializerDescriptor; this.numShards = numShards; this.credentials = credentials; this.locationHint = locationHint; - this.additionalLocalResources = additionalLocalResources; + this.additionalLocalFiles = additionalLocalFiles; } /** @@ -104,19 +104,19 @@ public class DataSourceDescriptor { * @param numShards Number of shards of data * @param credentials Credentials needed to access the data * @param locationHint Location hints for the vertex tasks - * @param additionalLocalResources additional local resources required by this Input. An attempt - * will be made to add these resources to the Vertex as Private + * @param additionalLocalFiles additional local files required by this Input. An attempt + * will be made to add these files to the Vertex as Private * resources. If a name conflict occurs, a {@link - * org.apache.tez.dag.api.TezException} will be thrown + * org.apache.tez.dag.api.TezUncheckedException} will be thrown */ public static DataSourceDescriptor create(InputDescriptor inputDescriptor, @Nullable InputInitializerDescriptor initializerDescriptor, int numShards, @Nullable Credentials credentials, @Nullable VertexLocationHint locationHint, - @Nullable Map<String, LocalResource> additionalLocalResources) { + @Nullable Map<String, LocalResource> additionalLocalFiles) { return new DataSourceDescriptor(inputDescriptor, initializerDescriptor, numShards, credentials, - locationHint, additionalLocalResources); + locationHint, additionalLocalFiles); } public InputDescriptor getInputDescriptor() { @@ -160,12 +160,12 @@ public class DataSourceDescriptor { } /** - * Get the list of additional local resources which were specified during creation. + * Get the list of additional local files which were specified during creation. * @return */ @InterfaceAudience.Private - public @Nullable Map<String, LocalResource> getAdditionalLocalResources() { - return additionalLocalResources; + public @Nullable Map<String, LocalResource> getAdditionalLocalFiles() { + return additionalLocalFiles; } http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index ba634af..8af7e84 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -63,7 +63,7 @@ public class TezConstants { public static final String TEZ_CONTAINER_ERR_FILE_NAME = "stderr"; public static final String TEZ_CONTAINER_OUT_FILE_NAME = "stdout"; - public static final String TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME = + public static final String TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME = TezConfiguration.TEZ_SESSION_PREFIX + "local-resources.pb"; public static final String TEZ_APPLICATION_TYPE = "TEZ"; @@ -83,13 +83,6 @@ public class TezConstants { // Configuration keys used internally and not set by the users - /** - * The complete path to the serialized dag plan file - * <code>TEZ_AM_PLAN_PB_BINARY</code>. Used to make the plan available to - * individual tasks if needed. This will be inside the staging dir - */ - public static final String TEZ_AM_PLAN_REMOTE_PATH = "dag-am-plan.remote.path"; - // These are session specific DAG ACL's. Currently here because these can only be specified // via code in the API. /** http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java index 5e15e16..6397c82 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.VertexGroup.GroupInfo; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.runtime.api.LogicalIOProcessor; @@ -247,9 +248,9 @@ public class Vertex { * elements of the map. * @return this Vertex */ - public Vertex setTaskLocalFiles(Map<String, LocalResource> localFiles) { + public Vertex addTaskLocalFiles(Map<String, LocalResource> localFiles) { if (localFiles != null) { - this.taskLocalResources.putAll(localFiles); + TezCommonUtils.addAdditionalLocalResources(localFiles, taskLocalResources); } return this; } @@ -428,18 +429,6 @@ public class Vertex { return Collections.unmodifiableList(outputVertices); } - void addAdditionalLocalResources(Map<String, LocalResource> additionalLrs) { - if (additionalLrs != null && !additionalLrs.isEmpty()) { - for (Map.Entry<String, LocalResource> lr : additionalLrs.entrySet()) { - if (taskLocalResources.containsKey(lr.getKey())) { - throw new TezUncheckedException("Attempting to add duplicate resource: " + lr.getKey()); - } else { - taskLocalResources.put(lr.getKey(), lr.getValue()); - } - } - } - } - /** * Set the cpu/memory etc resources used by tasks of this vertex * @param resource {@link Resource} for the tasks of this vertex http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index ddddb16..e05b004 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -19,6 +19,7 @@ package org.apache.tez.client; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -134,7 +135,7 @@ public class TestTezClient { ApplicationSubmissionContext context = captor.getValue(); Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size()); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( - TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME)); + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_BINARY_CONF_NAME)); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( @@ -143,10 +144,19 @@ public class TestTezClient { verify(client.mockYarnClient, times(0)).submitApplication(captor.capture()); } - DAG dag = new DAG("DAG").addVertex( - Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1))); + String mockLR1Name = "LR1"; + Map<String, LocalResource> lrDAG = Collections.singletonMap(mockLR1Name, LocalResource + .newInstance(URL.newInstance("file:///", "localhost", 0, "test"), LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, + Resource.newInstance(1, 1)); + DAG dag = new DAG("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG); DAGClient dagClient = client.submitDAG(dag); + // verify that both DAG and TezClient localResources are added to the vertex + Map<String, LocalResource> vertexLR = vertex.getTaskLocalFiles(); + Assert.assertTrue(vertexLR.containsKey(mockLR1Name)); + Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString())); if (isSession) { @@ -157,7 +167,7 @@ public class TestTezClient { ApplicationSubmissionContext context = captor.getValue(); Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size()); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( - TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME)); + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_BINARY_CONF_NAME)); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( @@ -171,7 +181,7 @@ public class TestTezClient { lrs.clear(); lrs.put(lrName2, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); - client.addAppMasterLocalResources(lrs); + client.addAppMasterLocalFiles(lrs); ApplicationId appId2 = ApplicationId.newInstance(0, 2); when(client.mockYarnClient.createApplication().getNewApplicationResponse().getApplicationId()) @@ -199,7 +209,7 @@ public class TestTezClient { ApplicationSubmissionContext context = captor.getValue(); Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size()); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( - TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME)); + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_BINARY_CONF_NAME)); Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java index 27f798d..e0975ab 100644 --- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java +++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java @@ -136,10 +136,10 @@ public class TestTezCommonUtils { public void testTezSessionJarStagingPath() throws Exception { String strAppId = "testAppId"; Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId); - Path confStageDir = TezCommonUtils.getTezSessionJarStagingPath(stageDir); + Path confStageDir = TezCommonUtils.getTezAMJarStagingPath(stageDir); String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator - + TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME; + + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME; Assert.assertEquals(confStageDir.toString(), expectedDir); } http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index 6a74553..fbea24b 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -108,9 +108,9 @@ public class TestDAGPlan { Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)); Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); InputDescriptor inputDescriptor = InputDescriptor.create("input") .setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes()))); @@ -144,9 +144,9 @@ public class TestDAGPlan { Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)); Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); InputDescriptor inputDescriptor = InputDescriptor.create("input"). setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes()))); @@ -208,11 +208,11 @@ public class TestDAGPlan { Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); Vertex v3 = Vertex.create("v3", pd3, 1, Resource.newInstance(1024, 1)); v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); v3.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); InputDescriptor inputDescriptor = InputDescriptor.create("input"). setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes()))); @@ -278,9 +278,9 @@ public class TestDAGPlan { Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)); Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>()) - .setTaskLocalFiles(new HashMap<String, LocalResource>()); + .addTaskLocalFiles(new HashMap<String, LocalResource>()); InputDescriptor inputDescriptor = InputDescriptor.create("input"). setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes()))); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index 9885272..033d08b 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -19,10 +19,15 @@ package org.apache.tez.dag.api; import java.util.Arrays; +import java.util.Map; import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; @@ -33,6 +38,8 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Maps; + public class TestDAGVerify { private final String dummyProcessorClassName = TestDAGVerify.class.getName(); @@ -896,8 +903,80 @@ public class TestDAGVerify { dag.createDag(new TezConfiguration()); } + + + @Test(timeout = 5000) + public void testVerifyCommonFiles() { + Vertex v1 = Vertex.create("v1", + ProcessorDescriptor.create(dummyProcessorClassName), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", + ProcessorDescriptor.create("MapProcessor"), + dummyTaskCount, dummyTaskResource); + Edge e1 = Edge.create(v1, v2, + EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create(dummyOutputClassName), + InputDescriptor.create(dummyInputClassName))); + Map<String, LocalResource> lrs = Maps.newHashMap(); + String lrName1 = "LR1"; + lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); + + DAG dag = new DAG("testDag"); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addEdge(e1); + dag.addTaskLocalFiles(lrs); + dag.createDag(new TezConfiguration()); + Assert.assertTrue(v1.getTaskLocalFiles().containsKey(lrName1)); + Assert.assertTrue(v2.getTaskLocalFiles().containsKey(lrName1)); + } - + @Test(timeout = 5000) + public void testVerifyCommonFilesFail() { + Vertex v1 = Vertex.create("v1", + ProcessorDescriptor.create(dummyProcessorClassName), + dummyTaskCount, dummyTaskResource); + Vertex v2 = Vertex.create("v2", + ProcessorDescriptor.create("MapProcessor"), + dummyTaskCount, dummyTaskResource); + Edge e1 = Edge.create(v1, v2, + EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create(dummyOutputClassName), + InputDescriptor.create(dummyInputClassName))); + Map<String, LocalResource> lrs = Maps.newHashMap(); + String lrName1 = "LR1"; + lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); + v1.addTaskLocalFiles(lrs); + try { + v1.addTaskLocalFiles(lrs); + Assert.fail(); + } catch (TezUncheckedException e) { + Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource")); + } + DAG dag = new DAG("testDag"); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addEdge(e1); + dag.addTaskLocalFiles(lrs); + try { + dag.addTaskLocalFiles(lrs); + Assert.fail(); + } catch (TezUncheckedException e) { + Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource")); + } + try { + // dag will add duplicate common files to vertex + dag.createDag(new TezConfiguration()); + Assert.fail(); + } catch (TezUncheckedException e) { + Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource")); + } + } + @Test(timeout = 5000) public void testDAGAccessControls() { DAG dag = new DAG("testDag"); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 89c468b..9cc28cb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.app.dag.DAG; @@ -79,8 +78,6 @@ public interface AppContext { TaskSchedulerEventHandler getTaskScheduler(); - Map<String, LocalResource> getSessionResources(); - boolean isSession(); DAGAppMasterState getAMState(); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index fa4b629..62004d2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -216,8 +216,6 @@ public class DAGAppMaster extends AbstractService { private HistoryEventHandler historyEventHandler; private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>(); private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>(); - private final Map<String, LocalResource> sessionResources = - new HashMap<String, LocalResource>(); private boolean isLocal = false; //Local mode flag @@ -398,15 +396,10 @@ public class DAGAppMaster extends AbstractService { FileInputStream sessionResourcesStream = null; try { sessionResourcesStream = new FileInputStream( - new File(workingDirectory, TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME)); - PlanLocalResourcesProto sessionLocalResourcesProto = - PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream); + new File(workingDirectory, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto .parseDelimitedFrom(sessionResourcesStream); - sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources( - sessionLocalResourcesProto)); amResources.putAll(DagTypeConverters.convertFromPlanLocalResources(amLocalResourceProto)); - amResources.putAll(sessionResources); } finally { if (sessionResourcesStream != null) { sessionResourcesStream.close(); @@ -1192,11 +1185,6 @@ public class DAGAppMaster extends AbstractService { } @Override - public Map<String, LocalResource> getSessionResources() { - return sessionResources; - } - - @Override public boolean isSession() { return isSession; } http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 6387b8f..1fb40a0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -648,7 +648,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.localResources = DagTypeConverters .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig() .getLocalResourceList()); - this.localResources.putAll(appContext.getSessionResources()); this.environment = DagTypeConverters .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig() .getEnvironmentSettingList()); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 94d5818..a966316 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -444,7 +444,7 @@ public class YARNRunner implements ClientProtocol { : MRHelpers.getJavaOptsForMRReducer(stageConf); vertex.setTaskEnvironment(taskEnv) - .setTaskLocalFiles(taskLocalResources) + .addTaskLocalFiles(taskLocalResources) .setLocationHint(VertexLocationHint.create(locations)) .setTaskLaunchCmdOpts(taskJavaOpts); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 3f9368f..c5f81a9 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -104,9 +104,9 @@ public class TestMRInputHelpers { DataSourceDescriptor dataSource = generateDataSourceDescriptorMapReduce(newSplitsDir); - Assert.assertTrue(dataSource.getAdditionalLocalResources() + Assert.assertTrue(dataSource.getAdditionalLocalFiles() .containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); - Assert.assertTrue(dataSource.getAdditionalLocalResources() + Assert.assertTrue(dataSource.getAdditionalLocalFiles() .containsKey(MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); RemoteIterator<LocatedFileStatus> files = @@ -141,8 +141,8 @@ public class TestMRInputHelpers { public void testOldSplitsGen() throws Exception { DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(oldSplitsDir); Assert.assertTrue( - dataSource.getAdditionalLocalResources().containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); - Assert.assertTrue(dataSource.getAdditionalLocalResources() + dataSource.getAdditionalLocalFiles().containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); + Assert.assertTrue(dataSource.getAdditionalLocalFiles() .containsKey(MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); RemoteIterator<LocatedFileStatus> files = @@ -177,7 +177,7 @@ public class TestMRInputHelpers { public void testInputSplitLocalResourceCreation() throws Exception { DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(oldSplitsDir); - Map<String, LocalResource> localResources = dataSource.getAdditionalLocalResources(); + Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles(); Assert.assertEquals(2, localResources.size()); Assert.assertTrue(localResources.containsKey( http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java index 875bfcb..d9304e4 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java @@ -171,7 +171,7 @@ public class FilterLinesByWord extends Configured implements Tool { // Setup stage1 Vertex Vertex stage1Vertex = Vertex.create("stage1", ProcessorDescriptor.create( FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload)) - .setTaskLocalFiles(commonLocalResources); + .addTaskLocalFiles(commonLocalResources); DataSourceDescriptor dsd; if (generateSplitsInClient) { @@ -189,7 +189,7 @@ public class FilterLinesByWord extends Configured implements Tool { Vertex stage2Vertex = Vertex.create("stage2", ProcessorDescriptor.create( FilterByWordOutputProcessor.class.getName()).setUserPayload( TezUtils.createUserPayloadFromConf(stage2Conf)), 1); - stage2Vertex.setTaskLocalFiles(commonLocalResources); + stage2Vertex.addTaskLocalFiles(commonLocalResources); // Configure the Output for stage2 OutputDescriptor od = OutputDescriptor.create(MROutput.class.getName()) http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java index 61d827e..ca2a288 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java @@ -162,7 +162,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool { // Setup stage1 Vertex Vertex stage1Vertex = Vertex.create("stage1", ProcessorDescriptor.create( FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload)) - .setTaskLocalFiles(commonLocalResources); + .addTaskLocalFiles(commonLocalResources); DataSourceDescriptor dsd; if (generateSplitsInClient) { @@ -180,7 +180,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool { Vertex stage2Vertex = Vertex.create("stage2", ProcessorDescriptor.create( FilterByWordOutputProcessor.class.getName()).setUserPayload(TezUtils .createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards()); - stage2Vertex.setTaskLocalFiles(commonLocalResources); + stage2Vertex.addTaskLocalFiles(commonLocalResources); // Configure the Output for stage2 stage2Vertex.addDataSink( http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index e4c515b..38a1045 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -526,7 +526,7 @@ public class MRRSleepJob extends Configured implements Tool { Vertex mapVertex = Vertex.create("map", ProcessorDescriptor.create( MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks) - .setTaskLocalFiles(commonLocalResources); + .addTaskLocalFiles(commonLocalResources); mapVertex.addDataSource("MRInput", dataSource); vertices.add(mapVertex); @@ -539,7 +539,7 @@ public class MRRSleepJob extends Configured implements Tool { Vertex ivertex = Vertex.create("ireduce" + (i + 1), ProcessorDescriptor.create(ReduceProcessor.class.getName()). setUserPayload(iReduceUserPayload), numIReducer); - ivertex.setTaskLocalFiles(commonLocalResources); + ivertex.addTaskLocalFiles(commonLocalResources); vertices.add(ivertex); } } @@ -549,7 +549,7 @@ public class MRRSleepJob extends Configured implements Tool { UserPayload reducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf); finalReduceVertex = Vertex.create("reduce", ProcessorDescriptor.create( ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer); - finalReduceVertex.setTaskLocalFiles(commonLocalResources); + finalReduceVertex.addTaskLocalFiles(commonLocalResources); finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf, NullOutputFormat.class).build()); vertices.add(finalReduceVertex); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index 3c4e66f..6157cbd 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -199,7 +199,7 @@ public class TestOrderedWordCount extends Configured implements Tool { Vertex mapVertex = Vertex.create("initialmap", ProcessorDescriptor.create( MapProcessor.class.getName()).setUserPayload( TezUtils.createUserPayloadFromConf(mapStageConf)) - .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources); + .setHistoryText(mapStageHistoryText)).addTaskLocalFiles(commonLocalResources); mapVertex.addDataSource("MRInput", dsd); vertices.add(mapVertex); @@ -210,7 +210,7 @@ public class TestOrderedWordCount extends Configured implements Tool { ReduceProcessor.class.getName()) .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf)) .setHistoryText(iReduceStageHistoryText), 2); - ivertex.setTaskLocalFiles(commonLocalResources); + ivertex.addTaskLocalFiles(commonLocalResources); vertices.add(ivertex); ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096); @@ -222,7 +222,7 @@ public class TestOrderedWordCount extends Configured implements Tool { ReduceProcessor.class.getName()) .setUserPayload(finalReducePayload) .setHistoryText(finalReduceStageHistoryText), 1); - finalReduceVertex.setTaskLocalFiles(commonLocalResources); + finalReduceVertex.addTaskLocalFiles(commonLocalResources); finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath) .build()); @@ -386,7 +386,7 @@ public class TestOrderedWordCount extends Configured implements Tool { LOG.info("Pre-warming Session"); PreWarmVertex preWarmVertex = PreWarmVertex.create("PreWarm", preWarmNumContainers, dag .getVertex("initialmap").getTaskResource()); - preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles()); + preWarmVertex.addTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles()); preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment()); preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts()); http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 49de6c1..48faefe 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -785,7 +785,7 @@ public class TestMRRJobsDAGApi { LOG.info("Submitting dag to tez session with appId=" + tezSession.getAppMasterApplicationId() + " and Dag Name=" + dag.getName()); if (additionalLocalResources != null) { - tezSession.addAppMasterLocalResources(additionalLocalResources); + tezSession.addAppMasterLocalFiles(additionalLocalResources); } dagClient = tezSession.submitDAG(dag); Assert.assertEquals(TezAppMasterStatus.RUNNING,
