Author: tommaso
Date: Tue Feb 5 07:59:39 2013
New Revision: 1442488
URL: http://svn.apache.org/viewvc?rev=1442488&view=rev
Log:
HAMA-572 - fixed unnecessary (un)boxing, old style loops, etc.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Feb
5 07:59:39 2013
@@ -1123,9 +1123,9 @@ public class BSPJobClient extends Config
out.writeInt(partitionID);
bytes.write(out);
WritableUtils.writeVInt(out, locations.length);
- for (int i = 0; i < locations.length; i++) {
- Text.writeString(out, locations[i]);
- }
+ for (String location : locations) {
+ Text.writeString(out, location);
+ }
}
public long getDataLength() {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Tue Feb 5
07:59:39 2013
@@ -127,7 +127,7 @@ public class BSPMaster implements JobSub
.createImmutable((short) 0700); // rwx------
// Jobs' Meta Data
- private Integer nextJobId = Integer.valueOf(1);
+ private Integer nextJobId = 1;
private int totalSubmissions = 0; // how many jobs has been submitted by
clients
private int totalTasks = 0; // currnetly running tasks
private int totalTaskCapacity; // max tasks that groom server can run
@@ -439,17 +439,17 @@ public class BSPMaster implements JobSub
void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(conf).delete(new Path(localDirs[i]), true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(conf).delete(new Path(localDir), true);
+ }
}
void deleteLocalFiles(String subdir) throws IOException {
try {
String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir), true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
+ }
} catch (NullPointerException e) {
LOG.info(e);
}
@@ -588,7 +588,7 @@ public class BSPMaster implements JobSub
int id;
synchronized (nextJobId) {
id = nextJobId;
- nextJobId = Integer.valueOf(id + 1);
+ nextJobId = id + 1;
}
return new BSPJobID(this.masterIdentifier, id);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
Tue Feb 5 07:59:39 2013
@@ -210,12 +210,12 @@ public abstract class CombineFileInputFo
// Finally, process all paths that do not belong to any pool.
ArrayList<Path> myPaths = new ArrayList<Path>();
- for (int i = 0; i < paths.length; i++) {
- if (paths[i] == null) { // already processed
- continue;
+ for (Path path : paths) {
+ if (path == null) { // already processed
+ continue;
+ }
+ myPaths.add(path);
}
- myPaths.add(paths[i]);
- }
// create splits for all files that are not in any pool.
getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize,
minSizeNode, minSizeRack, splits);
@@ -261,49 +261,48 @@ public abstract class CombineFileInputFo
// process all nodes and create splits that are local
// to a node.
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
- .entrySet().iterator(); iter.hasNext();) {
+ for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks
+ .entrySet()) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
-
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
+ nodes.add(one.getKey());
+ List<OneBlockInfo> blocksInNode = one.getValue();
+
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ for (OneBlockInfo oneblock : blocksInNode) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(job, splits, nodes, validBlocks);
+ curSplitSize = 0;
+ validBlocks.clear();
+ }
+ }
}
- }
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the same rack later
- // on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
- }
+ // if there were any blocks left over and their combined size is
+ // larger than minSplitNode, then combine them into one split.
+ // Otherwise add them back to the unprocessed pool. It is likely
+ // that they will be combined with other blocks from the same rack
later
+ // on.
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(job, splits, nodes, validBlocks);
+ } else {
+ for (OneBlockInfo oneblock : validBlocks) {
+ blockToNodes.put(oneblock, oneblock.hosts);
+ }
+ }
+ validBlocks.clear();
+ nodes.clear();
+ curSplitSize = 0;
}
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
- }
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
@@ -323,58 +322,57 @@ public abstract class CombineFileInputFo
// split size).
// iterate over all racks
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks
- .entrySet().iterator(); iter.hasNext();) {
+ for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks
+ .entrySet()) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- racks.add(one.getKey());
- List<OneBlockInfo> blocks = one.getValue();
-
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- boolean createdSplit = false;
- for (OneBlockInfo oneblock : blocks) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, getHosts(racks), validBlocks);
- createdSplit = true;
- break;
+ racks.add(one.getKey());
+ List<OneBlockInfo> blocks = one.getValue();
+
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ boolean createdSplit = false;
+ for (OneBlockInfo oneblock : blocks) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(job, splits, getHosts(racks),
validBlocks);
+ createdSplit = true;
+ break;
+ }
+ }
}
- }
- }
- // if we created a split, then just go to the next rack
- if (createdSplit) {
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- continue;
- }
+ // if we created a split, then just go to the next rack
+ if (createdSplit) {
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
+ continue;
+ }
- if (!validBlocks.isEmpty()) {
- if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
- // if there is a mimimum size specified, then create a single split
- // otherwise, store these blocks into overflow data structure
- addCreatedSplit(job, splits, getHosts(racks), validBlocks);
- } else {
- // There were a few blocks in this rack that remained to be
- // processed.
- // Keep them in 'overflow' block list. These will be combined
later.
- overflowBlocks.addAll(validBlocks);
- }
+ if (!validBlocks.isEmpty()) {
+ if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+ // if there is a mimimum size specified, then create a
single split
+ // otherwise, store these blocks into overflow data
structure
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+ } else {
+ // There were a few blocks in this rack that remained to be
+ // processed.
+ // Keep them in 'overflow' block list. These will be
combined later.
+ overflowBlocks.addAll(validBlocks);
+ }
+ }
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
}
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- }
}
assert blockToNodes.isEmpty();
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Tue
Feb 5 07:59:39 2013
@@ -187,9 +187,9 @@ public class CombineFileSplit implements
if (locations != null) {
String locs = "";
StringBuffer locsb = new StringBuffer();
- for (int i = 0; i < locations.length; i++) {
- locsb.append(locations[i]).append(":");
- }
+ for (String location : locations) {
+ locsb.append(location).append(":");
+ }
locs = locsb.toString();
sb.append(" Locations:").append(locs).append("; ");
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java Tue Feb 5
07:59:39 2013
@@ -43,9 +43,9 @@ public class Directive implements Writab
public int value() {
return this.t;
}
- };
+ }
- public Directive() {
+ public Directive() {
}
public Directive(Directive.Type type) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Tue Feb
5 07:59:39 2013
@@ -96,9 +96,9 @@ public class GroomServer implements Runn
// Constants
static enum State {
NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
- };
+ }
- private HttpServer server;
+ private HttpServer server;
private ZooKeeper zk = null;
// Running States and its related things
@@ -276,20 +276,17 @@ public class GroomServer implements Runn
LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
}
- Iterator<TaskInProgress> taskIter = outOfContactTasks.iterator();
-
- while (taskIter.hasNext()) {
- TaskInProgress tip = taskIter.next();
- try {
- LOG.debug("Purging task " + tip);
- purgeTask(tip, true);
- } catch (Exception e) {
- LOG.error(
- new StringBuilder("Error while removing a timed-out task - ")
- .append(tip.toString()), e);
+ for (TaskInProgress tip : outOfContactTasks) {
+ try {
+ LOG.debug("Purging task " + tip);
+ purgeTask(tip, true);
+ } catch (Exception e) {
+ LOG.error(
+ new StringBuilder("Error while removing a timed-out
task - ")
+ .append(tip.toString()), e);
+ }
}
- }
outOfContactTasks.clear();
}
@@ -467,15 +464,15 @@ public class GroomServer implements Runn
LOG.debug(localDirs);
if (localDirs != null) {
- for (int i = 0; i < localDirs.length; i++) {
- try {
- LOG.info(localDirs[i]);
- DiskChecker.checkDir(new File(localDirs[i]));
- writable = true;
- } catch (DiskErrorException e) {
- LOG.warn("BSP Processor local " + e.getMessage());
+ for (String localDir : localDirs) {
+ try {
+ LOG.info(localDir);
+ DiskChecker.checkDir(new File(localDir));
+ writable = true;
+ } catch (DiskErrorException e) {
+ LOG.warn("BSP Processor local " + e.getMessage());
+ }
}
- }
}
if (!writable)
@@ -488,18 +485,18 @@ public class GroomServer implements Runn
public void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
+ }
}
public void deleteLocalFiles(String subdir) throws IOException {
try {
String[] localDirs = getLocalDirs();
- for (int i = 0; i < localDirs.length; i++) {
- FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
- true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir),
+ true);
+ }
} catch (NullPointerException e) {
LOG.info(e);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
Tue Feb 5 07:59:39 2013
@@ -56,9 +56,9 @@ public abstract class GroomServerAction
/** Update information on a peer. */
UPDATE_PEER
- };
+ }
- /**
+ /**
* A factory-method to create objects of given {@link ActionType}.
*
* @param actionType the {@link ActionType} of object to create.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
Tue Feb 5 07:59:39 2013
@@ -123,14 +123,13 @@ public class GroomServerStatus implement
*/
public int countTasks() {
int taskCount = 0;
- for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
- TaskStatus ts = it.next();
- TaskStatus.State state = ts.getRunState();
- if (state == TaskStatus.State.RUNNING
- || state == TaskStatus.State.UNASSIGNED) {
- taskCount++;
+ for (TaskStatus ts : taskReports) {
+ TaskStatus.State state = ts.getRunState();
+ if (state == TaskStatus.State.RUNNING
+ || state == TaskStatus.State.UNASSIGNED) {
+ taskCount++;
+ }
}
- }
return taskCount;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Tue
Feb 5 07:59:39 2013
@@ -340,26 +340,25 @@ public class JobInProgress {
Task result = null;
BSPResource[] resources = new BSPResource[0];
- for (int i = 0; i < tasks.length; i++) {
- if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+ for (TaskInProgress task : tasks) {
+ if (!task.isRunning() && !task.isComplete()) {
- String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
- groomStatuses, taskCountInGroomMap, resources, tasks[i]);
- GroomServerStatus groomStatus = taskAllocationStrategy
- .getGroomToAllocate(groomStatuses, selectedGrooms,
- taskCountInGroomMap, resources, tasks[i]);
- if (groomStatus != null){
- result = tasks[i].constructTask(groomStatus);
- }
- else if (LOG.isDebugEnabled()){
- LOG.debug("Could not find a groom to schedule task");
- }
- if (result != null) {
- updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
- }
- break;
+ String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+ groomStatuses, taskCountInGroomMap, resources, task);
+ GroomServerStatus groomStatus = taskAllocationStrategy
+ .getGroomToAllocate(groomStatuses, selectedGrooms,
+ taskCountInGroomMap, resources, task);
+ if (groomStatus != null) {
+ result = task.constructTask(groomStatus);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not find a groom to schedule task");
+ }
+ if (result != null) {
+ updateGroomTaskDetails(task.getGroomServerStatus(), result);
+ }
+ break;
+ }
}
- }
counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
return result;
@@ -543,9 +542,9 @@ public class JobInProgress {
//
// kill all TIPs.
//
- for (int i = 0; i < tasks.length; i++) {
- tasks[i].kill();
- }
+ for (TaskInProgress task : tasks) {
+ task.kill();
+ }
garbageCollect();
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Tue Feb 5 07:59:39 2013
@@ -181,51 +181,51 @@ public class PartitioningRunner extends
// merge files into one.
// TODO if we use header info, we might able to merge files without full
// scan.
- for (int j = 0; j < status.length; j++) {
- int partitionID = Integer.parseInt(status[j].getPath().getName()
- .split("[-]")[1]);
- int denom = desiredNum / peer.getNumPeers();
- int assignedID = partitionID;
- if(denom > 1) {
- assignedID = partitionID / denom;
- }
-
- if (assignedID == peer.getNumPeers())
- assignedID = assignedID - 1;
-
- // TODO set replica factor to 1.
- // TODO and check whether we can write to specific DataNode.
- if (assignedID == peer.getPeerIndex()) {
- Path partitionFile = new Path(partitionDir + "/"
- + getPartitionName(partitionID));
-
- FileStatus[] files = fs.listStatus(status[j].getPath());
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- partitionFile, outputKeyClass, outputValueClass,
- CompressionType.NONE);
-
- for (int i = 0; i < files.length; i++) {
- LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
- + "/" + getPartitionName(partitionID));
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,
- files[i].getPath(), conf);
-
- Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
- conf);
- Writable value = (Writable) ReflectionUtils.newInstance(
- outputValueClass, conf);
-
- while (reader.next(key, value)) {
- writer.append(key, value);
+ for (FileStatus statu : status) {
+ int partitionID = Integer.parseInt(statu.getPath().getName()
+ .split("[-]")[1]);
+ int denom = desiredNum / peer.getNumPeers();
+ int assignedID = partitionID;
+ if (denom > 1) {
+ assignedID = partitionID / denom;
}
- reader.close();
- }
- writer.close();
- fs.delete(status[j].getPath(), true);
+ if (assignedID == peer.getNumPeers())
+ assignedID = assignedID - 1;
+
+ // TODO set replica factor to 1.
+ // TODO and check whether we can write to specific DataNode.
+ if (assignedID == peer.getPeerIndex()) {
+ Path partitionFile = new Path(partitionDir + "/"
+ + getPartitionName(partitionID));
+
+ FileStatus[] files = fs.listStatus(statu.getPath());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ partitionFile, outputKeyClass, outputValueClass,
+ CompressionType.NONE);
+
+ for (int i = 0; i < files.length; i++) {
+ LOG.debug("merge '" + files[i].getPath() + "' into " +
partitionDir
+ + "/" + getPartitionName(partitionID));
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ files[i].getPath(), conf);
+
+ Writable key = (Writable)
ReflectionUtils.newInstance(outputKeyClass,
+ conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(
+ outputValueClass, conf);
+
+ while (reader.next(key, value)) {
+ writer.append(key, value);
+ }
+ reader.close();
+ }
+
+ writer.close();
+ fs.delete(statu.getPath(), true);
+ }
}
- }
}
@SuppressWarnings("rawtypes")
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
Tue Feb 5 07:59:39 2013
@@ -251,18 +251,16 @@ class SimpleTaskScheduler extends TaskSc
}
// assembly into actions
- Iterator<Task> taskIter = taskSet.iterator();
- while (taskIter.hasNext()) {
- Task task = taskIter.next();
- GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
- List<GroomServerAction> taskActions = actionMap.get(groomStatus);
- if (taskActions == null) {
- taskActions = new ArrayList<GroomServerAction>(
- groomStatus.getMaxTasks());
+ for (Task task : taskSet) {
+ GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
+ List<GroomServerAction> taskActions = actionMap.get(groomStatus);
+ if (taskActions == null) {
+ taskActions = new ArrayList<GroomServerAction>(
+ groomStatus.getMaxTasks());
+ }
+ taskActions.add(new LaunchTaskAction(task));
+ actionMap.put(groomStatus, taskActions);
}
- taskActions.add(new LaunchTaskAction(task));
- actionMap.put(groomStatus, taskActions);
- }
sendDirectivesToGrooms(actionMap);
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
Tue Feb 5 07:59:39 2013
@@ -27,9 +27,9 @@ import org.apache.hadoop.io.WritableUtil
public class TaskCompletionEvent implements Writable {
static public enum Status {
FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED
- };
+ }
- private int eventId;
+ private int eventId;
private String groomServerInfo;
private int taskRunTime; // using int since runtime is the time difference
private TaskAttemptID taskId;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue
Feb 5 07:59:39 2013
@@ -145,18 +145,17 @@ public class TaskInProgress {
Map<GroomServerStatus, Integer> tasksInGroomMap,
String[] possibleLocations) {
- for (int i = 0; i < possibleLocations.length; ++i) {
- String location = possibleLocations[i];
- GroomServerStatus groom = grooms.get(location);
- if (groom == null)
- continue;
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()
- && location.equals(groom.getGroomHostName())) {
- return groom.getGroomHostName();
+ for (String location : possibleLocations) {
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
+ }
}
- }
return null;
}
@@ -169,17 +168,16 @@ public class TaskInProgress {
private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
Map<GroomServerStatus, Integer> tasksInGroomMap) {
- Iterator<String> groomIter = grooms.keySet().iterator();
- while (groomIter.hasNext()) {
- GroomServerStatus groom = grooms.get(groomIter.next());
- if (groom == null)
- continue;
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()) {
- return groom.getGroomHostName();
+ for (String s : grooms.keySet()) {
+ GroomServerStatus groom = grooms.get(s);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
+ }
}
- }
return null;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Feb 5
07:59:39 2013
@@ -112,9 +112,9 @@ public class TaskLog {
File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
purgeTimeStamp));
if (oldTaskLogs != null) {
- for (int i = 0; i < oldTaskLogs.length; ++i) {
- FileUtil.fullyDelete(oldTaskLogs[i]);
- }
+ for (File oldTaskLog : oldTaskLogs) {
+ FileUtil.fullyDelete(oldTaskLog);
+ }
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java Tue
Feb 5 07:59:39 2013
@@ -181,12 +181,12 @@ public class TaskLogServlet extends Http
String sLogOff = request.getParameter("start");
if (sLogOff != null) {
- start = Long.valueOf(sLogOff).longValue();
+ start = Long.valueOf(sLogOff);
}
String sLogEnd = request.getParameter("end");
if (sLogEnd != null) {
- end = Long.valueOf(sLogEnd).longValue();
+ end = Long.valueOf(sLogEnd);
}
String sPlainText = request.getParameter("plaintext");
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Tue Feb
5 07:59:39 2013
@@ -181,11 +181,11 @@ public class TaskRunner extends Thread {
}
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
- for (int i = 0; i < libs.length; i++) {
- // add libs from jar to classpath
- classPath.append(SYSTEM_PATH_SEPARATOR);
- classPath.append(libs[i]);
- }
+ for (File lib : libs) {
+ // add libs from jar to classpath
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(lib);
+ }
}
classPath.append(SYSTEM_PATH_SEPARATOR);
classPath.append(new File(workDir, "classes"));
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
Tue Feb 5 07:59:39 2013
@@ -131,10 +131,10 @@ public class AsyncRcvdMsgCheckpointImpl<
Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID,
TaskInProgress>(
2 * failedTasksInProgress.length);
- for (int i = 0; i < failedTasksInProgress.length; ++i) {
- recoverySet.put(failedTasksInProgress[i].getTaskId(),
- failedTasksInProgress[i]);
- }
+ for (TaskInProgress failedTasksInProgres : failedTasksInProgress) {
+ recoverySet.put(failedTasksInProgres.getTaskId(),
+ failedTasksInProgres);
+ }
long lowestSuperstepNumber = Long.MAX_VALUE;
@@ -152,31 +152,31 @@ public class AsyncRcvdMsgCheckpointImpl<
}
if (taskProgress.length == this.tasks.length) {
- for (int i = 0; i < taskProgress.length; ++i) {
- ArrayWritable progressInformation = new ArrayWritable(
- LongWritable.class);
- boolean result = this.masterSyncClient.getInformation(
- this.masterSyncClient.constructKey(jobId, "checkpoint",
- taskProgress[i]), progressInformation);
-
- if (!result) {
- lowestSuperstepNumber = -1L;
- break;
- }
+ for (String taskProgres : taskProgress) {
+ ArrayWritable progressInformation = new ArrayWritable(
+ LongWritable.class);
+ boolean result = this.masterSyncClient.getInformation(
+ this.masterSyncClient.constructKey(jobId, "checkpoint",
+ taskProgres), progressInformation);
+
+ if (!result) {
+ lowestSuperstepNumber = -1L;
+ break;
+ }
- Writable[] progressArr = progressInformation.get();
- LongWritable superstepProgress = (LongWritable) progressArr[0];
+ Writable[] progressArr = progressInformation.get();
+ LongWritable superstepProgress = (LongWritable) progressArr[0];
- if (superstepProgress != null) {
- if (superstepProgress.get() < lowestSuperstepNumber) {
- lowestSuperstepNumber = superstepProgress.get();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got superstep number " + lowestSuperstepNumber
- + " from " + taskProgress[i]);
+ if (superstepProgress != null) {
+ if (superstepProgress.get() < lowestSuperstepNumber) {
+ lowestSuperstepNumber = superstepProgress.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got superstep number " +
lowestSuperstepNumber
+ + " from " + taskProgres);
+ }
+ }
}
- }
}
- }
clearClientForSuperstep(lowestSuperstepNumber);
restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
allTasksInProgress, taskCountInGroomMap, actionMap);
@@ -225,56 +225,56 @@ public class AsyncRcvdMsgCheckpointImpl<
if (superstep >= 0) {
FileSystem fileSystem = FileSystem.get(conf);
- for (int i = 0; i < allTasks.length; ++i) {
- String[] hosts = null;
- if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
-
- // Update task count in map.
- // TODO: This should be a responsibility of GroomServerStatus
- Integer count = taskCountInGroomMap.get(allTasks[i]
- .getGroomServerStatus());
- if (count != null) {
- count = count.intValue() - 1;
- taskCountInGroomMap
- .put(allTasks[i].getGroomServerStatus(), count);
- }
-
- StringBuffer ckptPath = new StringBuffer(path);
- ckptPath.append(this.jobId.toString());
- ckptPath.append("/").append(superstep).append("/")
- .append(allTasks[i].getTaskId().getId());
- Path checkpointPath = new Path(ckptPath.toString());
- if (fileSystem.exists(checkpointPath)) {
- FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
- BlockLocation[] blocks = fileSystem.getFileBlockLocations(
- fileStatus, 0, fileStatus.getLen());
- hosts = blocks[0].getHosts();
- } else {
- hosts = new String[groomStatuses.keySet().size()];
- groomStatuses.keySet().toArray(hosts);
- }
- GroomServerStatus serverStatus = this.allocationStrategy
- .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
- new BSPResource[0], allTasks[i]);
- Task task = allTasks[i].constructTask(serverStatus);
- populateAction(task, superstep, serverStatus, actionMap);
+ for (TaskInProgress allTask : allTasks) {
+ String[] hosts = null;
+ if (recoveryMap.containsKey(allTask.getTaskId())) {
+
+ // Update task count in map.
+ // TODO: This should be a responsibility of GroomServerStatus
+ Integer count = taskCountInGroomMap.get(allTask
+ .getGroomServerStatus());
+ if (count != null) {
+ count = count.intValue() - 1;
+ taskCountInGroomMap
+ .put(allTask.getGroomServerStatus(), count);
+ }
+
+ StringBuffer ckptPath = new StringBuffer(path);
+ ckptPath.append(this.jobId.toString());
+ ckptPath.append("/").append(superstep).append("/")
+ .append(allTask.getTaskId().getId());
+ Path checkpointPath = new Path(ckptPath.toString());
+ if (fileSystem.exists(checkpointPath)) {
+ FileStatus fileStatus =
fileSystem.getFileStatus(checkpointPath);
+ BlockLocation[] blocks =
fileSystem.getFileBlockLocations(
+ fileStatus, 0, fileStatus.getLen());
+ hosts = blocks[0].getHosts();
+ } else {
+ hosts = new String[groomStatuses.keySet().size()];
+ groomStatuses.keySet().toArray(hosts);
+ }
+ GroomServerStatus serverStatus = this.allocationStrategy
+ .getGroomToAllocate(groomStatuses, hosts,
taskCountInGroomMap,
+ new BSPResource[0], allTask);
+ Task task = allTask.constructTask(serverStatus);
+ populateAction(task, superstep, serverStatus, actionMap);
- } else {
- restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+ } else {
+ restartTask(allTask, superstep, groomStatuses, actionMap);
+ }
}
- }
} else {
// Start the task from the beginning.
- for (int i = 0; i < allTasks.length; ++i) {
- if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
- this.allocationStrategy.getGroomToAllocate(groomStatuses,
- this.allocationStrategy.selectGrooms(groomStatuses,
- taskCountInGroomMap, new BSPResource[0], allTasks[i]),
- taskCountInGroomMap, new BSPResource[0], allTasks[i]);
- } else {
- restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+ for (TaskInProgress allTask : allTasks) {
+ if (recoveryMap.containsKey(allTask.getTaskId())) {
+ this.allocationStrategy.getGroomToAllocate(groomStatuses,
+ this.allocationStrategy.selectGrooms(groomStatuses,
+ taskCountInGroomMap, new BSPResource[0],
allTask),
+ taskCountInGroomMap, new BSPResource[0], allTask);
+ } else {
+ restartTask(allTask, superstep, groomStatuses, actionMap);
+ }
}
- }
}
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Feb 5 07:59:39 2013
@@ -229,35 +229,27 @@ public abstract class AbstractMessageMan
}
private void notifySentMessage(String peerName, M message) {
- Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
- .iterator();
- while (iterator.hasNext()) {
- iterator.next().onMessageSent(peerName, message);
- }
+ for (MessageEventListener<M> aMessageListenerQueue :
this.messageListenerQueue) {
+ aMessageListenerQueue.onMessageSent(peerName, message);
+ }
}
private void notifyReceivedMessage(M message) throws IOException {
- Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
- .iterator();
- while (iterator.hasNext()) {
- iterator.next().onMessageReceived(message);
- }
+ for (MessageEventListener<M> aMessageListenerQueue :
this.messageListenerQueue) {
+ aMessageListenerQueue.onMessageReceived(message);
+ }
}
private void notifyInit() {
- Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
- .iterator();
- while (iterator.hasNext()) {
- iterator.next().onInitialized();
- }
+ for (MessageEventListener<M> aMessageListenerQueue :
this.messageListenerQueue) {
+ aMessageListenerQueue.onInitialized();
+ }
}
private void notifyClose() {
- Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
- .iterator();
- while (iterator.hasNext()) {
- iterator.next().onClose();
- }
+ for (MessageEventListener<M> aMessageListenerQueue :
this.messageListenerQueue) {
+ aMessageListenerQueue.onClose();
+ }
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
Tue Feb 5 07:59:39 2013
@@ -247,9 +247,9 @@ public class SpillingDataOutputBuffer ex
startedSpilling_ = false;
bufferState_.clear();
- for (int i = 0; i < bufferList_.size(); ++i) {
- bufferList_.get(i).clear();
- }
+ for (ByteBuffer aBufferList_ : bufferList_) {
+ aBufferList_.clear();
+ }
currentBuffer_ = bufferList_.get(0);
bytesWritten_ = 0L;
bytesRemaining_ = bufferSize_;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
Tue Feb 5 07:59:39 2013
@@ -152,10 +152,9 @@ public class SpillingQueue<M extends Wri
@Override
public void addAll(MessageQueue<M> arg0) {
- Iterator<M> iter = arg0.iterator();
- while (iter.hasNext()) {
- add(iter.next());
- }
+ for (M anArg0 : arg0) {
+ add(anArg0);
+ }
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
Tue Feb 5 07:59:39 2013
@@ -58,9 +58,9 @@ public class ZKSyncEventFactory {
.append(getEventCount()-1)).toString());
}
- };
-
- public static int getSupportedEventCount(){
+ }
+
+ public static int getSupportedEventCount(){
return ZKEvent.getEventCount();
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
Tue Feb 5 07:59:39 2013
@@ -53,17 +53,16 @@ public class BestEffortDataLocalTaskAllo
private static String getAnyGroomToSchedule(Map<String, GroomServerStatus>
grooms,
Map<GroomServerStatus, Integer> tasksInGroomMap) {
- Iterator<String> groomIter = grooms.keySet().iterator();
- while (groomIter.hasNext()) {
- GroomServerStatus groom = grooms.get(groomIter.next());
- if (groom == null)
- continue;
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()) {
- return groom.getGroomHostName();
+ for (String s : grooms.keySet()) {
+ GroomServerStatus groom = grooms.get(s);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
+ }
}
- }
return null;
}
@@ -80,26 +79,25 @@ public class BestEffortDataLocalTaskAllo
Map<GroomServerStatus, Integer> tasksInGroomMap,
String[] possibleLocations) {
- for (int i = 0; i < possibleLocations.length; ++i) {
- String location = possibleLocations[i];
- GroomServerStatus groom = grooms.get(location);
- if (groom == null){
- if(LOG.isDebugEnabled()){
- LOG.debug("Could not find groom for location " + location);
- }
- continue;
- }
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if(LOG.isDebugEnabled()){
- LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " +
groom.getMaxTasks()
- + " location = " + location + " groomhostname = " +
groom.getGroomHostName());
+ for (String location : possibleLocations) {
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not find groom for location " + location);
+ }
+ continue;
+ }
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " +
groom.getMaxTasks()
+ + " location = " + location + " groomhostname = " +
groom.getGroomHostName());
+ }
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
+ }
}
- if (taskInGroom < groom.getMaxTasks()
- && location.equals(groom.getGroomHostName())) {
- return groom.getGroomHostName();
- }
- }
if(LOG.isDebugEnabled()){
LOG.debug("Returning null");
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java Tue
Feb 5 07:59:39 2013
@@ -68,7 +68,7 @@ public final class Configurator {
if(null != t) {
t.setListener(listener);
taskList.put(jarPath, t);
- repos.put(jarPath, new Long(jar.lastModified()));
+ repos.put(jarPath, jar.lastModified());
LOG.debug(jar.getName()+" is loaded.");
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java Tue Feb
5 07:59:39 2013
@@ -215,8 +215,7 @@ public final class Monitor extends Threa
// znode must exists so that child (znode/name) can be created.
if (null != this.zk.exists(znode, false)) {
String suffix = suffix(value);
- ;
- if (LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Publish name [" + name + "] and value [" + value
+ "] to zk.");
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java Tue
Feb 5 07:59:39 2013
@@ -19,6 +19,7 @@ package org.apache.hama.monitor;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -114,7 +115,7 @@ public final class ZKCollector implement
LOG.info("metrics " + name + " value:" + lv);
record.add(new Metric<Long>(name, lv));
} else if ("b".equals(dataType)) {
- LOG.info("metrics" + name + " value:" + dataInBytes);
+ LOG.info("metrics" + name + " value:" +
Arrays.toString(dataInBytes));
record.add(new Metric<byte[]>(name, dataInBytes));
} else {
LOG.warn("Unkown data type for metrics name: " + child);
Modified:
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
Tue Feb 5 07:59:39 2013
@@ -152,7 +152,7 @@ public class UDPSupervisor implements Su
if (getSamplingWindow().size() == windowSize()) {
getSamplingWindow().remove();
}
- getSamplingWindow().add(new Double(heartbeat - getLatestHeartbeat()));
+ getSamplingWindow().add((double) (heartbeat - getLatestHeartbeat()));
}
setLatestHeartbeat(heartbeat);
}