Repository: tez Updated Branches: refs/heads/master e49ed3397 -> 1d7ad6f73
TEZ-717. Client changes for local mode DAG submission. Contributed by Jonathan Eagles. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d7ad6f7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d7ad6f7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d7ad6f7 Branch: refs/heads/master Commit: 1d7ad6f73b4409c5f9f13978588768cf43cccafd Parents: e49ed33 Author: Siddharth Seth <[email protected]> Authored: Tue Jul 29 22:27:46 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Jul 29 22:27:46 2014 -0700 ---------------------------------------------------------------------- .../org/apache/tez/client/FrameworkClient.java | 15 +- .../java/org/apache/tez/client/TezClient.java | 4 +- .../dag/api/client/rpc/DAGClientRPCImpl.java | 2 +- .../java/org/apache/tez/common/TezUtils.java | 4 +- .../java/org/apache/tez/client/LocalClient.java | 270 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 35 +-- 6 files changed, 309 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java index 2f97399..eca15f6 100644 --- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java @@ -27,10 +27,23 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; public abstract class FrameworkClient { - public static FrameworkClient createFrameworkClient() { + private static FrameworkClient localClient = null; + + public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) { + + boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + if (isLocal) { + // Cache local client instance + if (localClient == null) { + localClient = ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient"); + } + return localClient; + } return new TezYarnClient(YarnClient.createYarnClient()); } http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 454aad6..ea401b8 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -596,9 +596,9 @@ public class TezClient { // for testing protected FrameworkClient createFrameworkClient() { - return FrameworkClient.createFrameworkClient(); + return FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration()); } - + // for testing protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) throws TezException, IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index bf8fa39..dfe1323 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -76,7 +76,7 @@ public class DAGClientRPCImpl extends DAGClient { this.appId = appId; this.dagId = dagId; this.conf = conf; - frameworkClient = FrameworkClient.createFrameworkClient(); + frameworkClient = FrameworkClient.createFrameworkClient(conf); frameworkClient.init(new YarnConfiguration(conf)); frameworkClient.start(); appReport = null; http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-common/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java index f73f223..0b292f5 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java @@ -18,6 +18,7 @@ package org.apache.tez.common; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -35,7 +36,6 @@ import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; -import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -61,7 +61,7 @@ public class TezUtils { FileInputStream confPBBinaryStream = null; ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); try { - confPBBinaryStream = new FileInputStream(TezConfiguration.TEZ_PB_BINARY_CONF_NAME); + confPBBinaryStream = new FileInputStream(new File(TezConfiguration.TEZ_PB_BINARY_CONF_NAME).getAbsolutePath()); confProtoBuilder.mergeFrom(confPBBinaryStream); } finally { if (confPBBinaryStream != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java new file mode 100644 index 0000000..a2a2ab1 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http:www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.log4j.Logger; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClientHandler; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.dag.app.DAGAppMasterState; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.utils.EnvironmentUpdateUtils; + +public class LocalClient extends FrameworkClient { + public static final Logger LOG = Logger.getLogger(LocalClient.class); + + private volatile DAGAppMaster dagAppMaster = null; + private volatile DAGClientHandler clientHandler = null; + private Thread dagAmThread; + private Configuration conf; + private final long clusterTimeStamp = System.currentTimeMillis(); + private final long TIME_OUT = 60 * 1000; + private int appIdNumber = 1; + private boolean isSession; + + public LocalClient() { + } + + @Override + public void init(Configuration conf) { + this.conf = conf; + this.conf.set("fs.defaultFS", "file:///"); + isSession = conf.getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, + TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT); + } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public void close() throws IOException { + } + + @Override + public YarnClientApplication createApplication() throws YarnException, IOException { + ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); + ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, appIdNumber++); + context.setApplicationId(appId); + GetNewApplicationResponse response = Records.newRecord(GetNewApplicationResponse.class); + response.setApplicationId(appId); + return new YarnClientApplication(response, context); + } + + @Override + public ApplicationId submitApplication(ApplicationSubmissionContext appContext) { + ApplicationId appId = appContext.getApplicationId(); + startDAGAppMaster(appContext); + return appId; + } + + @Override + public void killApplication(ApplicationId appId) { + clientHandler.shutdownAM(); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) { + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setApplicationId(appId); + report.setCurrentApplicationAttemptId(dagAppMaster.getAttemptID()); + + AppContext runningAppContext = dagAppMaster.getContext(); + if (runningAppContext != null) { + DAG dag = runningAppContext.getCurrentDAG(); + if (dag != null) { + report.setUser(runningAppContext.getUser()); + } + report.setName(runningAppContext.getApplicationName()); + report.setStartTime(runningAppContext.getStartTime()); + } + + report.setHost(dagAppMaster.getAppNMHost()); + report.setRpcPort(dagAppMaster.getRpcPort()); + report.setClientToAMToken(null); + report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState())); + + List<String> diagnostics = dagAppMaster.getDiagnostics(); + if (diagnostics != null) { + report.setDiagnostics(diagnostics.toString()); + } + report.setTrackingUrl("N/A"); + report.setFinishTime(0); + report.setFinalApplicationStatus(null); + report.setApplicationResourceUsageReport(null); + report.setOriginalTrackingUrl("N/A"); + report.setProgress(dagAppMaster.getProgress()); + report.setAMRMToken(null); + + return report; + } + + protected YarnApplicationState convertDAGAppMasterState(DAGAppMasterState dagAppMasterState) { + switch (dagAppMasterState) { + case NEW: + return YarnApplicationState.NEW; + case INITED: + case RECOVERING: + case IDLE: + case RUNNING: + return YarnApplicationState.RUNNING; + case SUCCEEDED: + return YarnApplicationState.FINISHED; + case FAILED: + return YarnApplicationState.FAILED; + case KILLED: + return YarnApplicationState.KILLED; + case ERROR: + return YarnApplicationState.FAILED; + default: + return YarnApplicationState.SUBMITTED; + } + } + + protected void startDAGAppMaster(final ApplicationSubmissionContext appContext) { + if (dagAmThread == null) { + try { + dagAmThread = createDAGAppMaster(appContext); + dagAmThread.start(); + + // Wait until DAGAppMaster is started + long waitingTime = 0; + while (true) { + if (dagAppMaster != null) { + DAGAppMasterState dagAMState = dagAppMaster.getState(); + LOG.info("DAGAppMaster state: " + dagAMState); + if (dagAMState.equals(DAGAppMasterState.NEW)) { + LOG.info("DAGAppMaster is not started wait for 100ms..."); + } else if (dagAMState.equals(DAGAppMasterState.INITED)) { + LOG.info("DAGAppMaster is not startetd wait for 100ms..."); + } else if (dagAMState.equals(DAGAppMasterState.ERROR)) { + throw new TezException("DAGAppMaster got an error during initialization"); + } else if (dagAMState.equals(DAGAppMasterState.KILLED)) { + throw new TezException("DAGAppMaster is killed"); + } else { + break; + } + } + + if (waitingTime < TIME_OUT) { + LOG.info("DAGAppMaster is not created wait for 100ms..."); + Thread.sleep(100); + waitingTime += 100; + } else { + throw new TezException("Time out creating DAGAppMaster"); + } + } + } catch (Throwable t) { + LOG.fatal("Error starting DAGAppMaster", t); + dagAmThread.interrupt(); + System.exit(0); + } + } + } + + protected Thread createDAGAppMaster(final ApplicationSubmissionContext appContext) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + ApplicationId appId = appContext.getApplicationId(); + + // Set up working directory for DAGAppMaster + Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()); + LOG.info("Setting current directory: " + userDir.toUri().getPath()); + System.setProperty("user.dir", userDir.toUri().getPath()); + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(userDir); + fs.setWorkingDirectory(userDir); + + // Prepare Environment + Path logDir = new Path(userDir, "logs"); + Path localDir = new Path(userDir, "local"); + + EnvironmentUpdateUtils.put(Environment.LOG_DIRS.name(), logDir.toUri().getPath()); + EnvironmentUpdateUtils.put(Environment.LOCAL_DIRS.name(), localDir.toUri().getPath()); + + // Add session specific credentials to the AM credentials. + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + ByteBuffer tokens = appContext.getAMContainerSpec().getTokens(); + + if (tokens != null) { + Credentials amCredentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(tokens); + amCredentials.readTokenStorageStream(dibb); + tokens.rewind(); + currentUser.addCredentials(amCredentials); + } + + // Construct, initialize, and start the DAGAppMaster + ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(appId, 0); + ContainerId cId = ContainerId.newInstance(applicationAttemptId, 1); + String currentHost = InetAddress.getLocalHost().getHostName(); + int nmPort = YarnConfiguration.DEFAULT_NM_PORT; + int nmHttpPort = YarnConfiguration.DEFAULT_NM_WEBAPP_PORT; + long appSubmitTime = System.currentTimeMillis(); + + dagAppMaster = new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, appSubmitTime, isSession); + clientHandler = new DAGClientHandler(dagAppMaster); + DAGAppMaster.initAndStartAppMaster(dagAppMaster, currentUser.getShortUserName()); + + } catch (Throwable t) { + LOG.fatal("Error starting DAGAppMaster", t); + System.exit(1); + } + } + }); + + thread.setName("DAGAppMaster Thread"); + LOG.info("DAGAppMaster thread has been created"); + + return thread; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 8f4bbab..bfb974b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -403,7 +403,7 @@ public class DAGAppMaster extends AbstractService { FileInputStream sessionResourcesStream = null; try { sessionResourcesStream = new FileInputStream( - TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME); + new File(TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME).getAbsolutePath()); PlanLocalResourcesProto sessionLocalResourcesProto = PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream); PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto @@ -859,6 +859,10 @@ public class DAGAppMaster extends AbstractService { return nmHttpPort; } + public int getRpcPort() { + return clientRpcServer.getBindAddress().getPort(); + } + public DAGAppMasterState getState() { return state; } @@ -1696,17 +1700,10 @@ public class DAGAppMaster extends AbstractService { long appSubmitTime = Long.parseLong(appSubmitTimeStr); - final Configuration conf = new Configuration(new YarnConfiguration()); - TezUtils.addUserSpecifiedTezConfiguration(conf); String jobUserName = System .getenv(ApplicationConstants.Environment.USER.name()); - // Do not automatically close FileSystem objects so that in case of - // SIGTERM I have a chance to write out the job history. I'll be closing - // the objects myself. - conf.setBoolean("fs.automatic.close", false); - // Command line options Options opts = new Options(); opts.addOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION, @@ -1722,9 +1719,7 @@ public class DAGAppMaster extends AbstractService { ShutdownHookManager.get().addShutdownHook( new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); - Limits.setConfiguration(conf); - initAndStartAppMaster(appMaster, conf, - jobUserName); + initAndStartAppMaster(appMaster, jobUserName); } catch (Throwable t) { LOG.fatal("Error starting DAGAppMaster", t); @@ -1775,8 +1770,8 @@ public class DAGAppMaster extends AbstractService { DAGPlan dagPlan = null; // Read the protobuf DAG - dagPBBinaryStream = new FileInputStream( - TezConfiguration.TEZ_PB_PLAN_BINARY_NAME); + dagPBBinaryStream = new FileInputStream(new File( + TezConfiguration.TEZ_PB_PLAN_BINARY_NAME).getAbsolutePath()); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); startDAG(dagPlan, null); @@ -1868,9 +1863,19 @@ public class DAGAppMaster extends AbstractService { } // TODO XXX Does this really need to be a YarnConfiguration ? - protected static void initAndStartAppMaster(final DAGAppMaster appMaster, - final Configuration conf, String jobUserName) throws IOException, + public static void initAndStartAppMaster(final DAGAppMaster appMaster, + String jobUserName) throws IOException, InterruptedException { + + final Configuration conf = new Configuration(new YarnConfiguration()); + TezUtils.addUserSpecifiedTezConfiguration(conf); + + // Do not automatically close FileSystem objects so that in case of + // SIGTERM I have a chance to write out the job history. I'll be closing + // the objects myself. + conf.setBoolean("fs.automatic.close", false); + Limits.setConfiguration(conf); + UserGroupInformation.setConfiguration(conf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
