Repository: tez
Updated Branches:
  refs/heads/master e49ed3397 -> 1d7ad6f73


TEZ-717. Client changes for local mode DAG submission. Contributed by
Jonathan Eagles.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d7ad6f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d7ad6f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d7ad6f7

Branch: refs/heads/master
Commit: 1d7ad6f73b4409c5f9f13978588768cf43cccafd
Parents: e49ed33
Author: Siddharth Seth <[email protected]>
Authored: Tue Jul 29 22:27:46 2014 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Tue Jul 29 22:27:46 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/FrameworkClient.java  |  15 +-
 .../java/org/apache/tez/client/TezClient.java   |   4 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |   2 +-
 .../java/org/apache/tez/common/TezUtils.java    |   4 +-
 .../java/org/apache/tez/client/LocalClient.java | 270 +++++++++++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  35 +--
 6 files changed, 309 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java 
b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index 2f97399..eca15f6 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -27,10 +27,23 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 
 public abstract class FrameworkClient {
 
-  public static FrameworkClient createFrameworkClient() {
+  private static FrameworkClient localClient = null;
+
+  public static FrameworkClient createFrameworkClient(TezConfiguration 
tezConf) {
+
+    boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, 
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+    if (isLocal) {
+      // Cache local client instance
+      if (localClient == null) {
+        localClient = 
ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
+      }
+      return localClient;
+    }
     return new TezYarnClient(YarnClient.createYarnClient());
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 454aad6..ea401b8 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -596,9 +596,9 @@ public class TezClient {
   
   // for testing
   protected FrameworkClient createFrameworkClient() {
-    return FrameworkClient.createFrameworkClient();
+    return 
FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration());
   }
-  
+
   // for testing
   protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId 
appId) 
       throws TezException, IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index bf8fa39..dfe1323 100644
--- 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -76,7 +76,7 @@ public class DAGClientRPCImpl extends DAGClient {
     this.appId = appId;
     this.dagId = dagId;
     this.conf = conf;
-    frameworkClient = FrameworkClient.createFrameworkClient();
+    frameworkClient = FrameworkClient.createFrameworkClient(conf);
     frameworkClient.init(new YarnConfiguration(conf));
     frameworkClient.start();
     appReport = null;

http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java 
b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index f73f223..0b292f5 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -18,6 +18,7 @@
 package org.apache.tez.common;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -35,7 +36,6 @@ import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -61,7 +61,7 @@ public class TezUtils {
     FileInputStream confPBBinaryStream = null;
     ConfigurationProto.Builder confProtoBuilder = 
ConfigurationProto.newBuilder();
     try {
-      confPBBinaryStream = new 
FileInputStream(TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+      confPBBinaryStream = new FileInputStream(new 
File(TezConfiguration.TEZ_PB_BINARY_CONF_NAME).getAbsolutePath());
       confProtoBuilder.mergeFrom(confPBBinaryStream);
     } finally {
       if (confPBBinaryStream != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java 
b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
new file mode 100644
index 0000000..a2a2ab1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http:www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClientHandler;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.utils.EnvironmentUpdateUtils;
+
+public class LocalClient extends FrameworkClient {
+  public static final Logger LOG = Logger.getLogger(LocalClient.class);
+
+  private volatile DAGAppMaster dagAppMaster = null;
+  private volatile DAGClientHandler clientHandler = null;
+  private Thread dagAmThread;
+  private Configuration conf;
+  private final long clusterTimeStamp = System.currentTimeMillis();
+  private final long TIME_OUT = 60 * 1000;
+  private int appIdNumber = 1;
+  private boolean isSession;
+
+  public LocalClient() {
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+    this.conf.set("fs.defaultFS", "file:///");
+    isSession = conf.getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE,
+        TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT);
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public YarnClientApplication createApplication() throws YarnException, 
IOException {
+    ApplicationSubmissionContext context = 
Records.newRecord(ApplicationSubmissionContext.class);
+    ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, 
appIdNumber++);
+    context.setApplicationId(appId);
+    GetNewApplicationResponse response = 
Records.newRecord(GetNewApplicationResponse.class);
+    response.setApplicationId(appId);
+    return new YarnClientApplication(response, context);
+  }
+
+  @Override
+  public ApplicationId submitApplication(ApplicationSubmissionContext 
appContext) {
+    ApplicationId appId = appContext.getApplicationId();
+    startDAGAppMaster(appContext);
+    return appId;
+  }
+
+  @Override
+  public void killApplication(ApplicationId appId) {
+    clientHandler.shutdownAM();
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport(ApplicationId appId) {
+    ApplicationReport report = Records.newRecord(ApplicationReport.class);
+    report.setApplicationId(appId);
+    report.setCurrentApplicationAttemptId(dagAppMaster.getAttemptID());
+
+    AppContext runningAppContext = dagAppMaster.getContext();
+    if (runningAppContext != null) {
+      DAG dag = runningAppContext.getCurrentDAG();
+      if (dag != null) {
+        report.setUser(runningAppContext.getUser());
+      }
+      report.setName(runningAppContext.getApplicationName());
+      report.setStartTime(runningAppContext.getStartTime());
+    }
+
+    report.setHost(dagAppMaster.getAppNMHost());
+    report.setRpcPort(dagAppMaster.getRpcPort());
+    report.setClientToAMToken(null);
+    
report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState()));
+
+    List<String> diagnostics = dagAppMaster.getDiagnostics();
+    if (diagnostics != null) {
+      report.setDiagnostics(diagnostics.toString());
+    }
+    report.setTrackingUrl("N/A");
+    report.setFinishTime(0);
+    report.setFinalApplicationStatus(null);
+    report.setApplicationResourceUsageReport(null);
+    report.setOriginalTrackingUrl("N/A");
+    report.setProgress(dagAppMaster.getProgress());
+    report.setAMRMToken(null);
+
+    return report;
+  }
+
+  protected YarnApplicationState convertDAGAppMasterState(DAGAppMasterState 
dagAppMasterState) {
+    switch (dagAppMasterState) {
+    case NEW:
+      return YarnApplicationState.NEW;
+    case INITED:
+    case RECOVERING:
+    case IDLE:
+    case RUNNING:
+      return YarnApplicationState.RUNNING;
+    case SUCCEEDED:
+      return YarnApplicationState.FINISHED;
+    case FAILED:
+      return YarnApplicationState.FAILED;
+    case KILLED:
+      return YarnApplicationState.KILLED;
+    case ERROR:
+      return YarnApplicationState.FAILED;
+    default:
+      return YarnApplicationState.SUBMITTED;
+    }
+  }
+
+  protected void startDAGAppMaster(final ApplicationSubmissionContext 
appContext) {
+    if (dagAmThread == null) {
+      try {
+        dagAmThread = createDAGAppMaster(appContext);
+        dagAmThread.start();
+
+        // Wait until DAGAppMaster is started
+        long waitingTime = 0;
+        while (true) {
+          if (dagAppMaster != null) {
+            DAGAppMasterState dagAMState = dagAppMaster.getState();
+            LOG.info("DAGAppMaster state: " + dagAMState);
+            if (dagAMState.equals(DAGAppMasterState.NEW)) {
+              LOG.info("DAGAppMaster is not started wait for 100ms...");
+            } else if (dagAMState.equals(DAGAppMasterState.INITED)) {
+              LOG.info("DAGAppMaster is not startetd wait for 100ms...");
+            } else if (dagAMState.equals(DAGAppMasterState.ERROR)) {
+              throw new TezException("DAGAppMaster got an error during 
initialization");
+            } else if (dagAMState.equals(DAGAppMasterState.KILLED)) {
+              throw new TezException("DAGAppMaster is killed");
+            } else {
+              break;
+            }
+          }
+
+          if (waitingTime < TIME_OUT) {
+            LOG.info("DAGAppMaster is not created wait for 100ms...");
+            Thread.sleep(100);
+            waitingTime += 100;
+          } else {
+            throw new TezException("Time out creating DAGAppMaster");
+          }
+        }
+      } catch (Throwable t) {
+        LOG.fatal("Error starting DAGAppMaster", t);
+        dagAmThread.interrupt();
+        System.exit(0);
+      }
+    }
+  }
+
+  protected Thread createDAGAppMaster(final ApplicationSubmissionContext 
appContext) {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          ApplicationId appId = appContext.getApplicationId();
+
+          // Set up working directory for DAGAppMaster
+          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, 
appId.toString());
+          LOG.info("Setting current directory: " + userDir.toUri().getPath());
+          System.setProperty("user.dir", userDir.toUri().getPath());
+          FileSystem fs = FileSystem.get(conf);
+          fs.mkdirs(userDir);
+          fs.setWorkingDirectory(userDir);
+
+          // Prepare Environment
+          Path logDir = new Path(userDir, "logs");
+          Path localDir = new Path(userDir, "local");
+
+          EnvironmentUpdateUtils.put(Environment.LOG_DIRS.name(), 
logDir.toUri().getPath());
+          EnvironmentUpdateUtils.put(Environment.LOCAL_DIRS.name(), 
localDir.toUri().getPath());
+
+          // Add session specific credentials to the AM credentials.
+          UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+          ByteBuffer tokens = appContext.getAMContainerSpec().getTokens();
+
+          if (tokens != null) {
+            Credentials amCredentials = new Credentials();
+            DataInputByteBuffer dibb = new DataInputByteBuffer();
+            dibb.reset(tokens);
+            amCredentials.readTokenStorageStream(dibb);
+            tokens.rewind();
+            currentUser.addCredentials(amCredentials);
+          }
+
+          // Construct, initialize, and start the DAGAppMaster
+          ApplicationAttemptId applicationAttemptId = 
ApplicationAttemptId.newInstance(appId, 0);
+          ContainerId cId = ContainerId.newInstance(applicationAttemptId, 1);
+          String currentHost = InetAddress.getLocalHost().getHostName();
+          int nmPort = YarnConfiguration.DEFAULT_NM_PORT;
+          int nmHttpPort = YarnConfiguration.DEFAULT_NM_WEBAPP_PORT;
+          long appSubmitTime = System.currentTimeMillis();
+
+          dagAppMaster = new DAGAppMaster(applicationAttemptId, cId, 
currentHost, nmPort, nmHttpPort, appSubmitTime, isSession);
+          clientHandler = new DAGClientHandler(dagAppMaster);
+          DAGAppMaster.initAndStartAppMaster(dagAppMaster, 
currentUser.getShortUserName());
+
+          } catch (Throwable t) {
+          LOG.fatal("Error starting DAGAppMaster", t);
+          System.exit(1);
+        }
+      }
+    });
+
+    thread.setName("DAGAppMaster Thread");
+    LOG.info("DAGAppMaster thread has been created");
+
+    return thread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d7ad6f7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8f4bbab..bfb974b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -403,7 +403,7 @@ public class DAGAppMaster extends AbstractService {
       FileInputStream sessionResourcesStream = null;
       try {
         sessionResourcesStream = new FileInputStream(
-          TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
+          new 
File(TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME).getAbsolutePath());
         PlanLocalResourcesProto sessionLocalResourcesProto =
           PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream);
         PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
@@ -859,6 +859,10 @@ public class DAGAppMaster extends AbstractService {
     return nmHttpPort;
   }
 
+  public int getRpcPort() {
+    return clientRpcServer.getBindAddress().getPort();
+  }
+
   public DAGAppMasterState getState() {
     return state;
   }
@@ -1696,17 +1700,10 @@ public class DAGAppMaster extends AbstractService {
 
       long appSubmitTime = Long.parseLong(appSubmitTimeStr);
 
-      final Configuration conf = new Configuration(new YarnConfiguration());
-      TezUtils.addUserSpecifiedTezConfiguration(conf);
 
       String jobUserName = System
           .getenv(ApplicationConstants.Environment.USER.name());
 
-      // Do not automatically close FileSystem objects so that in case of
-      // SIGTERM I have a chance to write out the job history. I'll be closing
-      // the objects myself.
-      conf.setBoolean("fs.automatic.close", false);
-
       // Command line options
       Options opts = new Options();
       opts.addOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
@@ -1722,9 +1719,7 @@ public class DAGAppMaster extends AbstractService {
       ShutdownHookManager.get().addShutdownHook(
         new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 
-      Limits.setConfiguration(conf);
-      initAndStartAppMaster(appMaster, conf,
-          jobUserName);
+      initAndStartAppMaster(appMaster, jobUserName);
 
     } catch (Throwable t) {
       LOG.fatal("Error starting DAGAppMaster", t);
@@ -1775,8 +1770,8 @@ public class DAGAppMaster extends AbstractService {
       DAGPlan dagPlan = null;
 
       // Read the protobuf DAG
-      dagPBBinaryStream = new FileInputStream(
-          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
+      dagPBBinaryStream = new FileInputStream(new File(
+          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME).getAbsolutePath());
       dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
 
       startDAG(dagPlan, null);
@@ -1868,9 +1863,19 @@ public class DAGAppMaster extends AbstractService {
   }
 
   // TODO XXX Does this really need to be a YarnConfiguration ?
-  protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
-      final Configuration conf, String jobUserName) throws IOException,
+  public static void initAndStartAppMaster(final DAGAppMaster appMaster,
+      String jobUserName) throws IOException,
       InterruptedException {
+
+    final Configuration conf = new Configuration(new YarnConfiguration());
+    TezUtils.addUserSpecifiedTezConfiguration(conf);
+
+    // Do not automatically close FileSystem objects so that in case of
+    // SIGTERM I have a chance to write out the job history. I'll be closing
+    // the objects myself.
+    conf.setBoolean("fs.automatic.close", false);
+    Limits.setConfiguration(conf);
+
     UserGroupInformation.setConfiguration(conf);
     Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
 

Reply via email to