Author: tjungblut
Date: Mon Sep 17 16:02:13 2012
New Revision: 1386683
URL: http://svn.apache.org/viewvc?rev=1386683&view=rev
Log:
Fixing hangups
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1386683&r1=1386682&r2=1386683&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Sep
17 16:02:13 2012
@@ -233,9 +233,6 @@ public final class BSPPeerImpl<K1, V1, K
}
}
- // init the internal state
- initialize();
-
doFirstSync(superstep);
if (LOG.isDebugEnabled()) {
@@ -252,7 +249,7 @@ public final class BSPPeerImpl<K1, V1, K
*
* @throws IOException If a DistributedCache file cannot be found.
*/
- public final void moveLocalFiles() throws IOException {
+ public final void moveCacheFiles() throws IOException {
StringBuilder files = new StringBuilder();
boolean first = true;
if (DistributedCache.getCacheFiles(conf) != null) {
@@ -285,36 +282,6 @@ public final class BSPPeerImpl<K1, V1, K
}
@SuppressWarnings("unchecked")
- private final void initialize() throws Exception {
-
- initInput();
-
- String outdir = null;
- if (conf.get("bsp.output.dir") != null) {
- Path outputDir = new Path(conf.get("bsp.output.dir",
- "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
- outdir = outputDir.makeQualified(fs).toString();
- }
- outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
- final RecordWriter<K2, V2> finalOut = outWriter;
-
- collector = new OutputCollector<K2, V2>() {
- @Override
- public void collect(K2 key, V2 value) throws IOException {
- finalOut.write(key, value);
- }
- };
-
- // Move files from DistributedCache to the local cache
- // and set DistributedCache.LocalFiles
- try {
- moveLocalFiles();
- } catch (Exception e) {
- LOG.error(e);
- }
- }
-
- @SuppressWarnings("unchecked")
public final void initInput() throws IOException {
InputSplit inputSplit = null;
// reinstantiate the split
@@ -403,6 +370,12 @@ public final class BSPPeerImpl<K1, V1, K
}
};
+ try {
+ moveCacheFiles();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1386683&r1=1386682&r2=1386683&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Mon Sep 17
16:02:13 2012
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hama.HamaConfiguration;
@@ -261,25 +262,41 @@ public class TaskLog {
mergedCmd.append(tailCommand);
mergedCmd.append(" -c ");
mergedCmd.append(tailLength);
- mergedCmd.append(" >> ");
+ mergedCmd.append(" > ");
mergedCmd.append(stdout);
mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
mergedCmd.append(tailCommand);
mergedCmd.append(" -c ");
mergedCmd.append(tailLength);
- mergedCmd.append(" >> ");
+ mergedCmd.append(" > ");
mergedCmd.append(stderr);
mergedCmd.append(" ; exit $PIPESTATUS");
} else {
- mergedCmd.append(" 1>> ");
+ mergedCmd.append(" 1> ");
mergedCmd.append(stdout);
- mergedCmd.append(" 2>> ");
+ mergedCmd.append(" 2> ");
mergedCmd.append(stderr);
}
result.add(mergedCmd.toString());
return result;
}
+ public static List<String> captureOutAndErrorTee(List<String> setup,
+ List<String> cmd, File stdoutFilename, File stderrFilename,
+ long tailLength) throws IOException {
+ String stdout = FileUtil.makeShellPath(stdoutFilename);
+ List<String> result = new ArrayList<String>(3);
+ result.add(bashCommand);
+ result.add("-c");
+ StringBuilder mergedCmd = new StringBuilder();
+
+ mergedCmd.append(addCommand(cmd, true));
+ mergedCmd.append(" 2>&1 | tee " + stdout);
+
+ result.add(mergedCmd.toString());
+ return result;
+ }
+
/**
* Add quotes to each of the command strings and return as a single string
*
@@ -346,4 +363,14 @@ public class TaskLog {
return result;
}
+ /**
+ * Get the desired maximum length of task's logs.
+ *
+ * @param conf the job to look in
+ * @return the number of bytes to cap the log files at
+ */
+ public static long getTaskLogLength(Configuration conf) {
+ return conf.getLong("bsp.userlog.limit.kb", 100) * 1024;
+ }
+
} // TaskLog
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1386683&r1=1386682&r2=1386683&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
Mon Sep 17 16:02:13 2012
@@ -50,7 +50,7 @@ public class BestEffortDataLocalTaskAllo
* @param tasksInGroomMap
* @return
*/
- private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ private static String getAnyGroomToSchedule(Map<String, GroomServerStatus>
grooms,
Map<GroomServerStatus, Integer> tasksInGroomMap) {
Iterator<String> groomIter = grooms.keySet().iterator();