Author: tjungblut
Date: Mon Sep 17 16:02:13 2012
New Revision: 1386683

URL: http://svn.apache.org/viewvc?rev=1386683&view=rev
Log:
Fixing hangups 

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1386683&r1=1386682&r2=1386683&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java 
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Sep 
17 16:02:13 2012
@@ -233,9 +233,6 @@ public final class BSPPeerImpl<K1, V1, K
       }
     }
 
-    // init the internal state
-    initialize();
-
     doFirstSync(superstep);
 
     if (LOG.isDebugEnabled()) {
@@ -252,7 +249,7 @@ public final class BSPPeerImpl<K1, V1, K
    * 
    * @throws IOException If a DistributedCache file cannot be found.
    */
-  public final void moveLocalFiles() throws IOException {
+  public final void moveCacheFiles() throws IOException {
     StringBuilder files = new StringBuilder();
     boolean first = true;
     if (DistributedCache.getCacheFiles(conf) != null) {
@@ -285,36 +282,6 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   @SuppressWarnings("unchecked")
-  private final void initialize() throws Exception {
-
-    initInput();
-
-    String outdir = null;
-    if (conf.get("bsp.output.dir") != null) {
-      Path outputDir = new Path(conf.get("bsp.output.dir",
-          "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
-      outdir = outputDir.makeQualified(fs).toString();
-    }
-    outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
-    final RecordWriter<K2, V2> finalOut = outWriter;
-
-    collector = new OutputCollector<K2, V2>() {
-      @Override
-      public void collect(K2 key, V2 value) throws IOException {
-        finalOut.write(key, value);
-      }
-    };
-
-    // Move files from DistributedCache to the local cache
-    // and set DistributedCache.LocalFiles
-    try {
-      moveLocalFiles();
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
   public final void initInput() throws IOException {
     InputSplit inputSplit = null;
     // reinstantiate the split
@@ -403,6 +370,12 @@ public final class BSPPeerImpl<K1, V1, K
       }
     };
 
+    try {
+      moveCacheFiles();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1386683&r1=1386682&r2=1386683&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Mon Sep 17 
16:02:13 2012
@@ -27,6 +27,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hama.HamaConfiguration;
 
@@ -261,25 +262,41 @@ public class TaskLog {
       mergedCmd.append(tailCommand);
       mergedCmd.append(" -c ");
       mergedCmd.append(tailLength);
-      mergedCmd.append(" >> ");
+      mergedCmd.append(" > ");
       mergedCmd.append(stdout);
       mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
       mergedCmd.append(tailCommand);
       mergedCmd.append(" -c ");
       mergedCmd.append(tailLength);
-      mergedCmd.append(" >> ");
+      mergedCmd.append(" > ");
       mergedCmd.append(stderr);
       mergedCmd.append(" ; exit $PIPESTATUS");
     } else {
-      mergedCmd.append(" 1>> ");
+      mergedCmd.append(" 1> ");
       mergedCmd.append(stdout);
-      mergedCmd.append(" 2>> ");
+      mergedCmd.append(" 2> ");
       mergedCmd.append(stderr);
     }
     result.add(mergedCmd.toString());
     return result;
   }
 
+  public static List<String> captureOutAndErrorTee(List<String> setup,
+      List<String> cmd, File stdoutFilename, File stderrFilename,
+      long tailLength) throws IOException {
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuilder mergedCmd = new StringBuilder();
+
+    mergedCmd.append(addCommand(cmd, true));
+    mergedCmd.append(" 2>&1 | tee " + stdout);
+
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
   /**
    * Add quotes to each of the command strings and return as a single string
    * 
@@ -346,4 +363,14 @@ public class TaskLog {
     return result;
   }
 
+  /**
+   * Get the desired maximum length of task's logs.
+   * 
+   * @param conf the job to look in
+   * @return the number of bytes to cap the log files at
+   */
+  public static long getTaskLogLength(Configuration conf) {
+    return conf.getLong("bsp.userlog.limit.kb", 100) * 1024;
+  }
+
 } // TaskLog

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1386683&r1=1386682&r2=1386683&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
 Mon Sep 17 16:02:13 2012
@@ -50,7 +50,7 @@ public class BestEffortDataLocalTaskAllo
    * @param tasksInGroomMap
    * @return
    */
-  private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+  private static String getAnyGroomToSchedule(Map<String, GroomServerStatus> 
grooms,
       Map<GroomServerStatus, Integer> tasksInGroomMap) {
 
     Iterator<String> groomIter = grooms.keySet().iterator();


Reply via email to