Author: aching
Date: Mon Dec 19 18:47:07 2011
New Revision: 1220891
URL: http://svn.apache.org/viewvc?rev=1220891&view=rev
Log:
GIRAPH-73: A little refactoring. (ssc via aching)
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1220891&r1=1220890&r2=1220891&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Dec 19 18:47:07 2011
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-73: A little refactoring. (ssc via aching)
+
GIRAPH-106: Change prepareSuperstep() to make
setMessages(Iterable<M> messages) package-private. (aching)
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1220891&r1=1220890&r2=1220891&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
Mon Dec 19 18:47:07 2011
@@ -18,6 +18,7 @@
package org.apache.giraph.examples;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
@@ -242,11 +243,10 @@ public class SimpleShortestPathsVertex e
@Override
public int run(String[] argArray) throws Exception {
- if (argArray.length != 4) {
- throw new IllegalArgumentException(
- "run: Must have 4 arguments <input path> <output path> " +
- "<source vertex id> <# of workers>");
- }
+ Preconditions.checkArgument(argArray.length == 4,
+ "run: Must have 4 arguments <input path> <output path> " +
+ "<source vertex id> <# of workers>");
+
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
job.setVertexClass(getClass());
job.setVertexInputFormatClass(
@@ -260,11 +260,8 @@ public class SimpleShortestPathsVertex e
job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
Integer.parseInt(argArray[3]),
100.0f);
- if (job.run(true) == true) {
- return 0;
- } else {
- return -1;
- }
+
+ return job.run(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1220891&r1=1220890&r2=1220891&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
Mon Dec 19 18:47:07 2011
@@ -24,7 +24,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -34,8 +34,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.graph.GiraphJob;
import org.apache.hadoop.conf.Configuration;
@@ -88,8 +90,7 @@ public class ZooKeeperManager {
/** ZooKeeper config file path */
private String configFilePath = null;
/** ZooKeeper server list */
- private final Map<String, Integer> zkServerPortMap =
- new TreeMap<String, Integer>();
+ private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
/** ZooKeeper base port */
private int zkBasePort = -1;
/** Final ZooKeeper server port list (for clients) */
@@ -151,7 +152,7 @@ public class ZooKeeperManager {
public void run() {
InputStreamReader streamReader = new InputStreamReader(is);
BufferedReader bufferedReader = new BufferedReader(streamReader);
- String line = null;
+ String line;
try {
while ((line = bufferedReader.readLine()) != null) {
if (LOG.isDebugEnabled()) {
@@ -298,22 +299,20 @@ public class ZooKeeperManager {
private void createZooKeeperServerList()
throws IOException, InterruptedException {
int candidateRetrievalAttempt = 0;
- Map<String, Integer> hostnameTaskMap =
- new TreeMap<String, Integer>();
+ Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
while (true) {
- FileStatus [] fileStatusArray =
- fs.listStatus(taskDirectory);
+ FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
hostnameTaskMap.clear();
if (fileStatusArray.length > 0) {
- for (int i = 0; i < fileStatusArray.length; ++i) {
+ for (FileStatus fileStatus : fileStatusArray) {
String[] hostnameTaskArray =
- fileStatusArray[i].getPath().getName().split(
+ fileStatus.getPath().getName().split(
HOSTNAME_TASK_SEPARATOR);
if (hostnameTaskArray.length != 2) {
throw new RuntimeException(
"getZooKeeperServerList: Task 0 failed " +
"to parse " +
- fileStatusArray[i].getPath().getName());
+ fileStatus.getPath().getName());
}
if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
hostnameTaskMap.put(hostnameTaskArray[0],
@@ -367,8 +366,7 @@ public class ZooKeeperManager {
*/
private String getServerListFile() throws IOException {
String serverListFile = null;
- FileStatus [] fileStatusArray =
- fs.listStatus(baseDirectory);
+ FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
for (FileStatus fileStatus : fileStatusArray) {
if (fileStatus.getPath().getName().startsWith(
ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
@@ -388,7 +386,7 @@ public class ZooKeeperManager {
*/
private void getZooKeeperServerList()
throws IOException, InterruptedException {
- String serverListFile = null;
+ String serverListFile;
if (taskPartition == 0) {
serverListFile = getServerListFile();
@@ -477,7 +475,7 @@ public class ZooKeeperManager {
LOG.info("generateZooKeeperConfigFile: Delete of " +
configFile.getName() + " = " + deletedRet);
}
- if (configFile.createNewFile() == false) {
+ if (!configFile.createNewFile()) {
throw new IllegalStateException(
"generateZooKeeperConfigFile: Failed to " +
"create config file " + configFile.getName());
@@ -488,39 +486,49 @@ public class ZooKeeperManager {
"generateZooKeeperConfigFile: Failed to make writable " +
configFile.getName());
}
- OutputStreamWriter writer = new FileWriter(configFilePath);
- writer.write("tickTime=" +
- GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME + "\n");
- writer.write("dataDir=" + this.zkDir + "\n");
- writer.write("clientPort=" + zkBasePort + "\n");
- writer.write("maxClientCnxns=" +
- GiraphJob.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +"\n");
- writer.write("minSessionTimeout=" +
- GiraphJob.DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT
+"\n");
- writer.write("maxSessionTimeout=" +
- GiraphJob.DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT
+"\n");
- writer.write("initLimit=" +
- GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
- writer.write("syncLimit=" +
- GiraphJob.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
- writer.write("snapCount=" +
- GiraphJob.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
- if (serverList.size() != 1) {
- writer.write("electionAlg=0\n");
- for (int i = 0; i < serverList.size(); ++i) {
- writer.write("server." + i + "=" + serverList.get(i) +
- ":" + (zkBasePort + 1) +
- ":" + (zkBasePort + 2) + "\n");
- if (myHostname.equals(serverList.get(i))) {
- OutputStreamWriter myidWriter = new FileWriter(
- zkDir + "/myid");
- myidWriter.write(i + "\n");
- myidWriter.close();
+
+ Writer writer = null;
+ try {
+ writer = new FileWriter(configFilePath);
+ writer.write("tickTime=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME + "\n");
+ writer.write("dataDir=" + this.zkDir + "\n");
+ writer.write("clientPort=" + zkBasePort + "\n");
+ writer.write("maxClientCnxns=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
+ "\n");
+ writer.write("minSessionTimeout=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT +
+ "\n");
+ writer.write("maxSessionTimeout=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT +
+ "\n");
+ writer.write("initLimit=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
+ writer.write("syncLimit=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
+ writer.write("snapCount=" +
+ GiraphJob.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
+ if (serverList.size() != 1) {
+ writer.write("electionAlg=0\n");
+ for (int i = 0; i < serverList.size(); ++i) {
+ writer.write("server." + i + "=" + serverList.get(i) +
+ ":" + (zkBasePort + 1) +
+ ":" + (zkBasePort + 2) + "\n");
+ if (myHostname.equals(serverList.get(i))) {
+ Writer myidWriter = null;
+ try {
+ myidWriter = new FileWriter(zkDir + "/myid");
+ myidWriter.write(i + "\n");
+ } finally {
+ Closeables.closeQuietly(myidWriter);
+ }
+ }
}
}
+ } finally {
+ Closeables.closeQuietly(writer);
}
- writer.flush();
- writer.close();
} catch (IOException e) {
throw new IllegalStateException(
"generateZooKeeperConfigFile: Failed to write file", e);
@@ -543,12 +551,12 @@ public class ZooKeeperManager {
FileUtils.deleteDirectory(zkDirFile);
} catch (IOException e) {
LOG.warn("onlineZooKeeperServers: Failed to delete " +
- "directory " + this.zkDir);
+ "directory " + this.zkDir, e);
}
generateZooKeeperConfigFile(
new ArrayList<String>(zkServerPortMap.keySet()));
ProcessBuilder processBuilder = new ProcessBuilder();
- List<String> commandList = new ArrayList<String>();
+ List<String> commandList = Lists.newArrayList();
String javaHome = System.getProperty("java.home");
if (javaHome == null) {
throw new IllegalArgumentException(
@@ -600,7 +608,7 @@ public class ZooKeeperManager {
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
} catch (IOException e) {
LOG.error("onlineZooKeeperServers: Failed to start " +
- "ZooKeeper process");
+ "ZooKeeper process", e);
throw new RuntimeException(e);
}
@@ -662,7 +670,7 @@ public class ZooKeeperManager {
fs.createNewFile(myReadyPath);
} catch (IOException e) {
LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
- "task failed) to create filestamp " + myReadyPath);
+ "task failed) to create filestamp " + myReadyPath,
e);
}
}
else {
@@ -714,7 +722,7 @@ public class ZooKeeperManager {
throw new RuntimeException(e);
} catch (InterruptedException e) {
LOG.warn("onlineZooKeeperServers: Strange interrupt from "
+
- e.getMessage());
+ e.getMessage(), e);
}
}
}
@@ -756,7 +764,7 @@ public class ZooKeeperManager {
} catch (IOException e) {
LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
} catch (InterruptedException e) {
- LOG.warn("waitUntilAllTasksDone: Got InterruptedException" +
e);
+ LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
}
}
}
@@ -779,7 +787,7 @@ public class ZooKeeperManager {
waitUntilAllTasksDone(totalMapTasks);
zkProcess.destroy();
int exitValue = -1;
- File zkDirFile = null;
+ File zkDirFile;
try {
zkProcessCollector.join();
exitValue = zkProcess.waitFor();