Author: edwardyoon
Date: Wed Oct  5 03:38:57 2011
New Revision: 1179045

URL: http://svn.apache.org/viewvc?rev=1179045&view=rev
Log:
Re-formatting

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1179045&r1=1179044&r2=1179045&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java 
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java 
Wed Oct  5 03:38:57 2011
@@ -46,290 +46,283 @@ import org.apache.hama.checkpoint.Checkp
  */
 public class TaskRunner extends Thread {
 
-       public static final Log LOG = LogFactory.getLog(TaskRunner.class);
-       private static final String SYSTEM_PATH_SEPARATOR = System
-                       .getProperty("path.separator");
-
-       private enum LogType {
-               STDOUT, ERROR
-       }
-
-       boolean bspKilled = false;
-       private Process bspProcess;
-
-       private final Task task;
-       private final BSPJob conf;
-       private final GroomServer groomServer;
-
-       private File logDir;
-
-       class BspChildRunner implements Callable<Object> {
-               private final List<String> commands;
-               private final File workDir;
-               private final ScheduledExecutorService sched;
-               private final AtomicReference<ScheduledFuture<Object>> future;
-
-               BspChildRunner(List<String> commands, File workDir) {
-                       this.commands = commands;
-                       this.workDir = workDir;
-                       this.sched = Executors.newScheduledThreadPool(1);
-                       this.future = new 
AtomicReference<ScheduledFuture<Object>>();
-               }
-
-               void start() {
-                       this.future.set(this.sched.schedule(this, 0, SECONDS));
-                       LOG.info("Start building BSPPeer process.");
-               }
-
-               void stop() {
-                       killBsp();
-                       this.sched.schedule(this, 0, SECONDS);
-                       LOG.info("Stop BSPPeer process.");
-               }
-
-               void join() throws InterruptedException, ExecutionException {
-                       this.future.get().get();
-               }
-
-               public Object call() throws Exception {
-                       ProcessBuilder builder = new ProcessBuilder(commands);
-                       builder.directory(workDir);
-                       try {
-                               bspProcess = builder.start();
-                               new Thread() {
-                                       public void run() {
-                                               
logStream(bspProcess.getErrorStream(), LogType.ERROR);
-                                       }
-                               }.start();
-
-                               new Thread() {
-                                       public void run() {
-                                               
logStream(bspProcess.getInputStream(), LogType.STDOUT);
-                                       }
-                               }.start();
-
-                               int exit_code = bspProcess.waitFor();
-                               if (!bspKilled && exit_code != 0) {
-                                       throw new IOException(
-                                                       "BSP task process exit 
with nonzero status of "
-                                                                       + 
exit_code + ".");
-                               }
-                       } catch (InterruptedException e) {
-                               LOG.warn(
-                                               "Thread is interrupted when 
execeuting Checkpointer process.",
-                                               e);
-                       } catch (IOException ioe) {
-                               LOG.error("Error when executing BSPPeer 
process.", ioe);
-                       } finally {
-                               killBsp();
-                       }
-                       return null;
-               }
-       }
-
-       public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
-               this.task = bspTask;
-               this.conf = conf;
-               this.groomServer = groom;
-       }
-
-       public Task getTask() {
-               return task;
-       }
-
-       /**
-        * Called to assemble this task's input. This method is run in the 
parent
-        * process before the child is spawned. It should not execute user code,
-        * only system code.
-        */
-       public boolean prepare() throws IOException {
-               return true;
-       }
-
-       private File createWorkDirectory() {
-               File workDir = new File(new 
File(task.getJobFile()).getParent(), "work");
-               boolean isCreated = workDir.mkdirs();
-               if (isCreated) {
-                       LOG.debug("TaskRunner.workDir : " + workDir);
-               }
-               return workDir;
-       }
-
-       private String assembleClasspath(BSPJob jobConf, File workDir) {
-               StringBuffer classPath = new StringBuffer();
-               // start with same classpath as parent process
-               classPath.append(System.getProperty("java.class.path"));
-               classPath.append(SYSTEM_PATH_SEPARATOR);
-
-               String jar = jobConf.getJar();
-               if (jar != null) { // if jar exists, it into workDir
-                       try {
-                               RunJar.unJar(new File(jar), workDir);
-                       } catch (IOException ioe) {
-                               LOG.error(
-                                               "Unable to uncompressing file 
to " + workDir.toString(),
-                                               ioe);
-                       }
-                       File[] libs = new File(workDir, "lib").listFiles();
-                       if (libs != null) {
-                               for (int i = 0; i < libs.length; i++) {
-                                       // add libs from jar to classpath
-                                       classPath.append(SYSTEM_PATH_SEPARATOR);
-                                       classPath.append(libs[i]);
-                               }
-                       }
-                       classPath.append(SYSTEM_PATH_SEPARATOR);
-                       classPath.append(new File(workDir, "classes"));
-                       classPath.append(SYSTEM_PATH_SEPARATOR);
-                       classPath.append(workDir);
-               }
-               return classPath.toString();
-       }
-
-       private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
-                       Class<?> child) {
-               // Build exec child jmv args.
-               List<String> vargs = new ArrayList<String>();
-               File jvm = // use same jvm as parent
-               new File(new File(System.getProperty("java.home"), "bin"), 
"java");
-               vargs.add(jvm.toString());
-
-               // bsp.child.java.opts
-               String javaOpts = jobConf.getConf().get("bsp.child.java.opts",
-                               "-Xmx200m");
-               javaOpts = javaOpts.replace("@taskid@", 
task.getTaskID().toString());
-
-               String[] javaOptsSplit = javaOpts.split(" ");
-               for (int i = 0; i < javaOptsSplit.length; i++) {
-                       vargs.add(javaOptsSplit[i]);
-               }
-
-               // Add classpath.
-               vargs.add("-classpath");
-               vargs.add(classPath.toString());
-               // Add main class and its arguments
-               LOG.debug("Executing child Process " + child.getName());
-               vargs.add(child.getName()); // main of bsp or checkpointer Child
-
-               if (GroomServer.BSPPeerChild.class.equals(child)) {
-                       InetSocketAddress addr = 
groomServer.getTaskTrackerReportAddress();
-                       vargs.add(addr.getHostName());
-                       vargs.add(Integer.toString(addr.getPort()));
-                       vargs.add(task.getTaskID().toString());
-                       vargs.add(groomServer.groomHostName);
-               }
-
-               if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", 
false)) {
-                       String ckptPort = 
jobConf.getConf().get("bsp.checkpoint.port",
-                                       CheckpointRunner.DEFAULT_PORT);
-                       LOG.debug("Checkpointer's port:" + ckptPort);
-                       vargs.add(ckptPort);
-               }
-
-               return vargs;
-       }
-
-       /**
-        * Build working environment and launch BSPPeer and Checkpointer 
processes.
-        * And transmit data from BSPPeer's inputstream to Checkpointer's
-        * OutputStream.
-        */
-       public void run() {
-               File workDir = createWorkDirectory();
-               logDir = createLogDirectory();
-               String classPath = assembleClasspath(conf, workDir);
-               LOG.debug("Spawned child's classpath " + classPath);
-               List<String> bspArgs = buildJvmArgs(conf, classPath,
-                               GroomServer.BSPPeerChild.class);
-
-               BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
-               bspPeer.start();
-               try {
-                       bspPeer.join();
-               } catch (InterruptedException ie) {
-                       LOG.error("BSPPeer child process is interrupted.", ie);
-               } catch (ExecutionException ee) {
-                       LOG.error("Failure occurs when retrieving tasks 
result.", ee);
-               }
-               LOG.info("Finishes executing BSPPeer child process.");
-       }
-
-       /**
-        * Creates the tasks log directory if needed.
-        * 
-        * @return the top directory of the tasks logging area.
-        */
-       private File createLogDirectory() {
-               // our log dir looks following: log/tasklogs/job_id/
-               File f = new File(System.getProperty("hama.log.dir") + 
File.separator
-                               + "tasklogs" + File.separator + task.jobId.id);
-               // TODO if we have attemps: + File.separator+ task.getTaskID());
-
-               if (!f.exists()) {
-                       f.mkdirs();
-               }
-
-               return f;
-       }
-
-       /**
-        * Kill bsppeer child process.
-        */
-       public void killBsp() {
-               if (bspProcess != null) {
-                       bspProcess.destroy();
-               }
-               bspKilled = true;
-       }
-
-       /**
-        * Log process's stream.
-        * 
-        * @param input
-        *            stream to be logged.
-        * @param stdout
-        *            type of the log
-        */
-       private void logStream(InputStream input, LogType type) {
-               // STDOUT file can be found under LOG_DIR/task_attempt_id.log
-               // ERROR file can be found under LOG_DIR/task_attempt_id.err
-               File taskLogFile = new File(logDir, task.getTaskAttemptId()
-                               + getFileEndingForType(type));
-               BufferedWriter writer = null;
-               try {
-                       writer = new BufferedWriter(new 
FileWriter(taskLogFile));
-                       BufferedReader in = new BufferedReader(new 
InputStreamReader(input));
-                       String line;
-                       while ((line = in.readLine()) != null) {
-                               writer.write(line);
-                               writer.newLine();
-                       }
-               } catch (IOException e) {
-                       LOG.warn(task.getTaskID() + " Error reading child 
output", e);
-               } finally {
-                       try {
-                               input.close();
-                       } catch (IOException e) {
-                               LOG.warn(task.getTaskID() + " Error closing 
child output", e);
-                       }
-                       try {
-                               writer.close();
-                       } catch (IOException e) {
-                               LOG.warn(task.getTaskID() + " Error closing log 
file", e);
-                       }
-               }
-       }
-
-       /**
-        * Returns the ending of the logfile for each LogType. e.G. ".log".
-        * 
-        * @param type
-        * @return an ending, including a dot.
-        */
-       private String getFileEndingForType(LogType type) {
-               if (type != LogType.ERROR)
-                       return ".err";
-               else
-                       return ".log";
-       }
+  public static final Log LOG = LogFactory.getLog(TaskRunner.class);
+  private static final String SYSTEM_PATH_SEPARATOR = System
+      .getProperty("path.separator");
+
+  private enum LogType {
+    STDOUT, ERROR
+  }
+
+  boolean bspKilled = false;
+  private Process bspProcess;
+
+  private final Task task;
+  private final BSPJob conf;
+  private final GroomServer groomServer;
+
+  private File logDir;
+
+  class BspChildRunner implements Callable<Object> {
+    private final List<String> commands;
+    private final File workDir;
+    private final ScheduledExecutorService sched;
+    private final AtomicReference<ScheduledFuture<Object>> future;
+
+    BspChildRunner(List<String> commands, File workDir) {
+      this.commands = commands;
+      this.workDir = workDir;
+      this.sched = Executors.newScheduledThreadPool(1);
+      this.future = new AtomicReference<ScheduledFuture<Object>>();
+    }
+
+    void start() {
+      this.future.set(this.sched.schedule(this, 0, SECONDS));
+      LOG.info("Start building BSPPeer process.");
+    }
+
+    void stop() {
+      killBsp();
+      this.sched.schedule(this, 0, SECONDS);
+      LOG.info("Stop BSPPeer process.");
+    }
+
+    void join() throws InterruptedException, ExecutionException {
+      this.future.get().get();
+    }
+
+    public Object call() throws Exception {
+      ProcessBuilder builder = new ProcessBuilder(commands);
+      builder.directory(workDir);
+      try {
+        bspProcess = builder.start();
+        new Thread() {
+          public void run() {
+            logStream(bspProcess.getErrorStream(), LogType.ERROR);
+          }
+        }.start();
+
+        new Thread() {
+          public void run() {
+            logStream(bspProcess.getInputStream(), LogType.STDOUT);
+          }
+        }.start();
+
+        int exit_code = bspProcess.waitFor();
+        if (!bspKilled && exit_code != 0) {
+          throw new IOException("BSP task process exit with nonzero status of "
+              + exit_code + ".");
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Thread is interrupted when execeuting Checkpointer process.",
+            e);
+      } catch (IOException ioe) {
+        LOG.error("Error when executing BSPPeer process.", ioe);
+      } finally {
+        killBsp();
+      }
+      return null;
+    }
+  }
+
+  public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+    this.task = bspTask;
+    this.conf = conf;
+    this.groomServer = groom;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  /**
+   * Called to assemble this task's input. This method is run in the parent
+   * process before the child is spawned. It should not execute user code, only
+   * system code.
+   */
+  public boolean prepare() throws IOException {
+    return true;
+  }
+
+  private File createWorkDirectory() {
+    File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+    boolean isCreated = workDir.mkdirs();
+    if (isCreated) {
+      LOG.debug("TaskRunner.workDir : " + workDir);
+    }
+    return workDir;
+  }
+
+  private String assembleClasspath(BSPJob jobConf, File workDir) {
+    StringBuffer classPath = new StringBuffer();
+    // start with same classpath as parent process
+    classPath.append(System.getProperty("java.class.path"));
+    classPath.append(SYSTEM_PATH_SEPARATOR);
+
+    String jar = jobConf.getJar();
+    if (jar != null) { // if jar exists, it into workDir
+      try {
+        RunJar.unJar(new File(jar), workDir);
+      } catch (IOException ioe) {
+        LOG.error("Unable to uncompressing file to " + workDir.toString(), 
ioe);
+      }
+      File[] libs = new File(workDir, "lib").listFiles();
+      if (libs != null) {
+        for (int i = 0; i < libs.length; i++) {
+          // add libs from jar to classpath
+          classPath.append(SYSTEM_PATH_SEPARATOR);
+          classPath.append(libs[i]);
+        }
+      }
+      classPath.append(SYSTEM_PATH_SEPARATOR);
+      classPath.append(new File(workDir, "classes"));
+      classPath.append(SYSTEM_PATH_SEPARATOR);
+      classPath.append(workDir);
+    }
+    return classPath.toString();
+  }
+
+  private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
+      Class<?> child) {
+    // Build exec child jmv args.
+    List<String> vargs = new ArrayList<String>();
+    File jvm = // use same jvm as parent
+    new File(new File(System.getProperty("java.home"), "bin"), "java");
+    vargs.add(jvm.toString());
+
+    // bsp.child.java.opts
+    String javaOpts = jobConf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+    javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+
+    String[] javaOptsSplit = javaOpts.split(" ");
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+
+    // Add classpath.
+    vargs.add("-classpath");
+    vargs.add(classPath.toString());
+    // Add main class and its arguments
+    LOG.debug("Executing child Process " + child.getName());
+    vargs.add(child.getName()); // main of bsp or checkpointer Child
+
+    if (GroomServer.BSPPeerChild.class.equals(child)) {
+      InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
+      vargs.add(addr.getHostName());
+      vargs.add(Integer.toString(addr.getPort()));
+      vargs.add(task.getTaskID().toString());
+      vargs.add(groomServer.groomHostName);
+    }
+
+    if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
+      String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
+          CheckpointRunner.DEFAULT_PORT);
+      LOG.debug("Checkpointer's port:" + ckptPort);
+      vargs.add(ckptPort);
+    }
+
+    return vargs;
+  }
+
+  /**
+   * Build working environment and launch BSPPeer and Checkpointer processes.
+   * And transmit data from BSPPeer's inputstream to Checkpointer's
+   * OutputStream.
+   */
+  public void run() {
+    File workDir = createWorkDirectory();
+    logDir = createLogDirectory();
+    String classPath = assembleClasspath(conf, workDir);
+    LOG.debug("Spawned child's classpath " + classPath);
+    List<String> bspArgs = buildJvmArgs(conf, classPath,
+        GroomServer.BSPPeerChild.class);
+
+    BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
+    bspPeer.start();
+    try {
+      bspPeer.join();
+    } catch (InterruptedException ie) {
+      LOG.error("BSPPeer child process is interrupted.", ie);
+    } catch (ExecutionException ee) {
+      LOG.error("Failure occurs when retrieving tasks result.", ee);
+    }
+    LOG.info("Finishes executing BSPPeer child process.");
+  }
+
+  /**
+   * Creates the tasks log directory if needed.
+   * 
+   * @return the top directory of the tasks logging area.
+   */
+  private File createLogDirectory() {
+    // our log dir looks following: log/tasklogs/job_id/
+    File f = new File(System.getProperty("hama.log.dir") + File.separator
+        + "tasklogs" + File.separator + task.jobId.id);
+    // TODO if we have attemps: + File.separator+ task.getTaskID());
+
+    if (!f.exists()) {
+      f.mkdirs();
+    }
+
+    return f;
+  }
+
+  /**
+   * Kill bsppeer child process.
+   */
+  public void killBsp() {
+    if (bspProcess != null) {
+      bspProcess.destroy();
+    }
+    bspKilled = true;
+  }
+
+  /**
+   * Log process's stream.
+   * 
+   * @param input stream to be logged.
+   * @param stdout type of the log
+   */
+  private void logStream(InputStream input, LogType type) {
+    // STDOUT file can be found under LOG_DIR/task_attempt_id.log
+    // ERROR file can be found under LOG_DIR/task_attempt_id.err
+    File taskLogFile = new File(logDir, task.getTaskAttemptId()
+        + getFileEndingForType(type));
+    BufferedWriter writer = null;
+    try {
+      writer = new BufferedWriter(new FileWriter(taskLogFile));
+      BufferedReader in = new BufferedReader(new InputStreamReader(input));
+      String line;
+      while ((line = in.readLine()) != null) {
+        writer.write(line);
+        writer.newLine();
+      }
+    } catch (IOException e) {
+      LOG.warn(task.getTaskID() + " Error reading child output", e);
+    } finally {
+      try {
+        input.close();
+      } catch (IOException e) {
+        LOG.warn(task.getTaskID() + " Error closing child output", e);
+      }
+      try {
+        writer.close();
+      } catch (IOException e) {
+        LOG.warn(task.getTaskID() + " Error closing log file", e);
+      }
+    }
+  }
+
+  /**
+   * Returns the ending of the logfile for each LogType. e.G. ".log".
+   * 
+   * @param type
+   * @return an ending, including a dot.
+   */
+  private String getFileEndingForType(LogType type) {
+    if (type != LogType.ERROR)
+      return ".err";
+    else
+      return ".log";
+  }
 }


Reply via email to