Author: edwardyoon
Date: Tue Dec 27 00:37:41 2011
New Revision: 1224844
URL: http://svn.apache.org/viewvc?rev=1224844&view=rev
Log:
merge -r 1222197:1222075
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1224844&r1=1224843&r2=1224844&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Tue Dec 27 00:37:41 2011
@@ -45,7 +45,7 @@ import org.apache.hama.util.KeyValuePair
/**
* This class represents a BSP peer.
*/
-public final class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
+public class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
@@ -146,7 +146,7 @@ public final class BSPPeerImpl<KEYIN, VA
}
@SuppressWarnings("unchecked")
- public final void initialize() throws Exception {
+ public void initialize() throws Exception {
syncClient = SyncServiceFactory.getSyncClient(conf);
syncClient.init(conf, taskId.getJobID(), taskId);
@@ -154,8 +154,8 @@ public final class BSPPeerImpl<KEYIN, VA
// just output something when the user configured it
if (conf.get("bsp.output.dir") != null) {
- Path outdir = new Path(conf.get("bsp.output.dir"),
- Task.getOutputName(partition));
+ Path outdir = new Path(conf.get("bsp.output.dir"), Task
+ .getOutputName(partition));
outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
outdir.makeQualified(fs).toString());
final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
@@ -170,7 +170,7 @@ public final class BSPPeerImpl<KEYIN, VA
}
@SuppressWarnings("unchecked")
- public final void initInput() throws IOException {
+ public void initInput() throws IOException {
// just read input if the user defined one
if (conf.get("bsp.input.dir") != null) {
InputSplit inputSplit = null;
@@ -196,16 +196,16 @@ public final class BSPPeerImpl<KEYIN, VA
}
@Override
- public final BSPMessage getCurrentMessage() throws IOException {
+ public BSPMessage getCurrentMessage() throws IOException {
return messenger.getCurrentMessage();
}
@Override
- public final void send(String peerName, BSPMessage msg) throws IOException {
+ public void send(String peerName, BSPMessage msg) throws IOException {
messenger.send(peerName, msg);
}
- private final String checkpointedPath() {
+ private String checkpointedPath() {
String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
String ckptPath = backup + bspJob.getJobID().toString() + "/"
+ getSuperstepCount() + "/" + this.taskId.toString();
@@ -214,7 +214,7 @@ public final class BSPPeerImpl<KEYIN, VA
return ckptPath;
}
- final void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
+ void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
FSDataOutputStream out = null;
try {
out = this.fs.create(new Path(checkpointedPath));
@@ -236,8 +236,7 @@ public final class BSPPeerImpl<KEYIN, VA
* @see org.apache.hama.bsp.BSPPeerInterface#sync()
*/
@Override
- public final void sync() throws IOException, SyncException,
- InterruptedException {
+ public void sync() throws IOException, SyncException, InterruptedException {
enterBarrier();
Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> it = messenger
.getMessageIterator();
@@ -260,7 +259,7 @@ public final class BSPPeerImpl<KEYIN, VA
}
leaveBarrier();
-
+
incrCounter(PeerCounter.SUPERSTEPS, 1);
currentTaskStatus.setCounters(counters);
@@ -269,11 +268,11 @@ public final class BSPPeerImpl<KEYIN, VA
messenger.clearOutgoingQueues();
}
- private final BSPMessageBundle combineMessages(Iterable<BSPMessage>
messages) {
+ private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
Combiner.class)) {
- Combiner combiner = (Combiner) ReflectionUtils.newInstance(
- conf.getClass("bsp.combiner.class", Combiner.class), conf);
+ Combiner combiner = (Combiner) ReflectionUtils.newInstance(conf.getClass(
+ "bsp.combiner.class", Combiner.class), conf);
return combiner.combine(messages);
} else {
@@ -285,18 +284,17 @@ public final class BSPPeerImpl<KEYIN, VA
}
}
- protected final void enterBarrier() throws SyncException {
- syncClient.enterBarrier(taskId.getJobID(), taskId,
- currentTaskStatus.getSuperstepCount());
+ protected void enterBarrier() throws SyncException {
+ syncClient.enterBarrier(taskId.getJobID(), taskId, currentTaskStatus
+ .getSuperstepCount());
}
- protected final void leaveBarrier() throws SyncException {
- syncClient.leaveBarrier(taskId.getJobID(), taskId,
- currentTaskStatus.getSuperstepCount());
+ protected void leaveBarrier() throws SyncException {
+ syncClient.leaveBarrier(taskId.getJobID(), taskId, currentTaskStatus
+ .getSuperstepCount());
}
- public final void close() throws SyncException, IOException,
- InterruptedException {
+ public void close() throws SyncException, IOException, InterruptedException {
if (in != null) {
in.close();
}
@@ -310,36 +308,36 @@ public final class BSPPeerImpl<KEYIN, VA
}
@Override
- public final void clear() {
+ public void clear() {
messenger.clearOutgoingQueues();
}
/**
* @return the string as host:port of this Peer
*/
- public final String getPeerName() {
+ public String getPeerName() {
return peerAddress.getHostName() + ":" + peerAddress.getPort();
}
@Override
- public final String[] getAllPeerNames() {
+ public String[] getAllPeerNames() {
initPeerNames();
return allPeers;
}
@Override
- public final String getPeerName(int index) {
+ public String getPeerName(int index) {
initPeerNames();
return allPeers[index];
}
@Override
- public final int getNumPeers() {
+ public int getNumPeers() {
initPeerNames();
return allPeers.length;
}
- private final void initPeerNames() {
+ private void initPeerNames() {
if (allPeers == null) {
allPeers = syncClient.getAllPeerNames(taskId);
}
@@ -349,7 +347,7 @@ public final class BSPPeerImpl<KEYIN, VA
* @return the number of messages
*/
@Override
- public final int getNumCurrentMessages() {
+ public int getNumCurrentMessages() {
return messenger.getNumCurrentMessages();
}
@@ -358,14 +356,14 @@ public final class BSPPeerImpl<KEYIN, VA
*
* @param currentTaskStatus
*/
- public final void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+ public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
this.currentTaskStatus = currentTaskStatus;
}
/**
* @return the count of current super-step
*/
- public final long getSuperstepCount() {
+ public long getSuperstepCount() {
return currentTaskStatus.getSuperstepCount();
}
@@ -374,7 +372,7 @@ public final class BSPPeerImpl<KEYIN, VA
*
* @return the conf
*/
- public final Configuration getConfiguration() {
+ public Configuration getConfiguration() {
return conf;
}
@@ -383,17 +381,17 @@ public final class BSPPeerImpl<KEYIN, VA
*/
@Override
- public final void write(KEYOUT key, VALUEOUT value) throws IOException {
+ public void write(KEYOUT key, VALUEOUT value) throws IOException {
collector.collect(key, value);
}
@Override
- public final boolean readNext(KEYIN key, VALUEIN value) throws IOException {
+ public boolean readNext(KEYIN key, VALUEIN value) throws IOException {
return in.next(key, value);
}
@Override
- public final KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
+ public KeyValuePair<KEYIN, VALUEIN> readNext() throws IOException {
KEYIN k = in.createKey();
VALUEIN v = in.createValue();
if (in.next(k, v)) {
@@ -404,17 +402,17 @@ public final class BSPPeerImpl<KEYIN, VA
}
@Override
- public final void reopenInput() throws IOException {
+ public void reopenInput() throws IOException {
initInput();
}
@Override
- public final Counter getCounter(Enum<?> name) {
+ public Counter getCounter(Enum<?> name) {
return counters == null ? null : counters.findCounter(name);
}
@Override
- public final Counter getCounter(String group, String name) {
+ public Counter getCounter(String group, String name) {
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(group, name);
@@ -423,14 +421,14 @@ public final class BSPPeerImpl<KEYIN, VA
}
@Override
- public final void incrCounter(Enum<?> key, long amount) {
+ public void incrCounter(Enum<?> key, long amount) {
if (counters != null) {
counters.incrCounter(key, amount);
}
}
@Override
- public final void incrCounter(String group, String counter, long amount) {
+ public void incrCounter(String group, String counter, long amount) {
if (counters != null) {
counters.incrCounter(group, counter, amount);
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1224844&r1=1224843&r2=1224844&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
Tue Dec 27 00:37:41 2011
@@ -32,7 +32,7 @@ import org.apache.hama.ipc.BSPPeerProtoc
/**
* Base class for tasks.
*/
-public final class BSPTask extends Task {
+public class BSPTask extends Task {
public static final Log LOG = LogFactory.getLog(BSPTask.class);
@@ -55,12 +55,12 @@ public final class BSPTask extends Task
}
@Override
- public final BSPTaskRunner createRunner(GroomServer groom) {
+ public BSPTaskRunner createRunner(GroomServer groom) {
return new BSPTaskRunner(this, groom, this.conf);
}
@Override
- public final void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
+ public void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
BSPPeerProtocol umbilical) throws IOException, SyncException,
ClassNotFoundException, InterruptedException {
runBSP(job, bspPeer, split, umbilical);
@@ -69,15 +69,15 @@ public final class BSPTask extends Task
}
@SuppressWarnings("unchecked")
- private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runBSP(
- final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bspPeer,
+ private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runBSP(final BSPJob job,
+ BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bspPeer,
final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
throws IOException, SyncException, ClassNotFoundException,
InterruptedException {
BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT> bsp = (BSP<KEYIN, VALUEIN, KEYOUT,
VALUEOUT>) ReflectionUtils
- .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
- job.getConf());
+ .newInstance(job.getConf().getClass("bsp.work.class", BSP.class), job
+ .getConf());
bsp.setup(bspPeer);
bsp.bsp(bspPeer);
@@ -86,16 +86,16 @@ public final class BSPTask extends Task
bspPeer.close();
}
- public final BSPJob getConf() {
+ public BSPJob getConf() {
return conf;
}
- public final void setConf(BSPJob conf) {
+ public void setConf(BSPJob conf) {
this.conf = conf;
}
@Override
- public final void write(DataOutput out) throws IOException {
+ public void write(DataOutput out) throws IOException {
super.write(out);
if (split != null) {
out.writeBoolean(true);
@@ -108,7 +108,7 @@ public final class BSPTask extends Task
}
@Override
- public final void readFields(DataInput in) throws IOException {
+ public void readFields(DataInput in) throws IOException {
super.readFields(in);
if (in.readBoolean()) {
splitClass = Text.readString(in);
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1224844&r1=1224843&r2=1224844&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Tue Dec 27 00:37:41 2011
@@ -227,8 +227,8 @@ public class GroomServer implements Runn
// this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
try {
- zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
- conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+ zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+ .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
} catch (IOException e) {
LOG.error("Exception during reinitialization!", e);
}
@@ -240,9 +240,8 @@ public class GroomServer implements Runn
}
if (localHostname == null) {
- this.localHostname = DNS.getDefaultHost(
- conf.get("bsp.dns.interface", "default"),
- conf.get("bsp.dns.nameserver", "default"));
+ this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+ "default"), conf.get("bsp.dns.nameserver", "default"));
}
// check local disk
checkLocalDirs(getLocalDirs());
@@ -518,17 +517,17 @@ public class GroomServer implements Runn
synchronized (rjob) {
if (!rjob.localized) {
-
+
FileSystem localFs = FileSystem.getLocal(conf);
Path jobDir = localJobFile.getParent();
- if (localFs.exists(jobDir)) {
+ if (localFs.exists(jobDir)){
localFs.delete(jobDir, true);
boolean b = localFs.mkdirs(jobDir);
if (!b)
throw new IOException("Not able to create job directory "
- + jobDir.toString());
+ + jobDir.toString());
}
-
+
Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/" + "job.jar");
systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
@@ -931,13 +930,13 @@ public class GroomServer implements Runn
/**
* The main() for BSPPeer child processes.
*/
- public static final class BSPPeerChild {
+ public static class BSPPeerChild {
public static void main(String[] args) throws Throwable {
if (LOG.isDebugEnabled())
LOG.debug("BSPPeerChild starting");
- final HamaConfiguration defaultConf = new HamaConfiguration();
+ HamaConfiguration defaultConf = new HamaConfiguration();
// report address
String host = args[0];
int port = Integer.parseInt(args[1]);
@@ -949,7 +948,7 @@ public class GroomServer implements Runn
BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
defaultConf);
- final BSPTask task = (BSPTask) umbilical.getTask(taskid);
+ BSPTask task = (BSPTask) umbilical.getTask(taskid);
int peerPort = umbilical.getAssignedPortNum(taskid);
defaultConf.addResource(new Path(task.getJobFile()));
@@ -968,9 +967,9 @@ public class GroomServer implements Runn
// instantiate and init our peer
@SuppressWarnings("rawtypes")
- final BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
- defaultConf, taskid, umbilical, task.partition, task.splitClass,
- task.split, task.getCounters());
+ BSPPeerImpl<?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, defaultConf,
+ taskid, umbilical, task.partition, task.splitClass, task.split,
+ task.getCounters());
task.run(job, bspPeer, umbilical); // run the task
@@ -980,7 +979,7 @@ public class GroomServer implements Runn
} catch (SyncException e) {
LOG.fatal("SyncError from child", e);
umbilical.fatalError(taskid, e.toString());
-
+
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(baos));
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1224844&r1=1224843&r2=1224844&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Tue Dec 27 00:37:41 2011
@@ -39,7 +39,7 @@ import org.apache.hama.util.BSPNetUtils;
* Implementation of the {@link HadoopMessageManager}.
*
*/
-public final class HadoopMessageManagerImpl implements MessageManager,
+public class HadoopMessageManagerImpl implements MessageManager,
HadoopMessageManager {
private static final Log LOG = LogFactory
@@ -57,13 +57,12 @@ public final class HadoopMessageManagerI
private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration =
new ConcurrentLinkedQueue<BSPMessage>();
@Override
- public final void init(Configuration conf, InetSocketAddress peerAddress) {
+ public void init(Configuration conf, InetSocketAddress peerAddress) {
this.conf = conf;
startRPCServer(conf, peerAddress);
}
- private final void startRPCServer(Configuration conf,
- InetSocketAddress peerAddress) {
+ private void startRPCServer(Configuration conf, InetSocketAddress
peerAddress) {
try {
this.server = RPC.getServer(this, peerAddress.getHostName(),
peerAddress.getPort(), conf);
@@ -77,19 +76,19 @@ public final class HadoopMessageManagerI
}
@Override
- public final void close() {
+ public void close() {
if (server != null) {
server.stop();
}
}
@Override
- public final BSPMessage getCurrentMessage() throws IOException {
+ public BSPMessage getCurrentMessage() throws IOException {
return localQueue.poll();
}
@Override
- public final void send(String peerName, BSPMessage msg) throws IOException {
+ public void send(String peerName, BSPMessage msg) throws IOException {
LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
InetSocketAddress targetPeerAddress = null;
// Get socket for target peer.
@@ -108,12 +107,12 @@ public final class HadoopMessageManagerI
}
@Override
- public final Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>>
getMessageIterator() {
+ public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>>
getMessageIterator() {
return this.outgoingQueues.entrySet().iterator();
}
- protected final HadoopMessageManager getBSPPeerConnection(
- InetSocketAddress addr) throws IOException {
+ protected HadoopMessageManager getBSPPeerConnection(InetSocketAddress addr)
+ throws IOException {
HadoopMessageManager peer = peers.get(addr);
if (peer == null) {
peer = (HadoopMessageManager) RPC.getProxy(HadoopMessageManager.class,
@@ -124,7 +123,7 @@ public final class HadoopMessageManagerI
}
@Override
- public final void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+ public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
throws IOException {
HadoopMessageManager bspPeerConnection = this.getBSPPeerConnection(addr);
@@ -138,32 +137,31 @@ public final class HadoopMessageManagerI
}
@Override
- public final void clearOutgoingQueues() {
+ public void clearOutgoingQueues() {
this.outgoingQueues.clear();
localQueue.addAll(localQueueForNextIteration);
localQueueForNextIteration.clear();
}
@Override
- public final void put(BSPMessage msg) {
+ public void put(BSPMessage msg) {
this.localQueueForNextIteration.add(msg);
}
@Override
- public final void put(BSPMessageBundle messages) {
+ public void put(BSPMessageBundle messages) {
for (BSPMessage message : messages.getMessages()) {
this.localQueueForNextIteration.add(message);
}
}
@Override
- public final int getNumCurrentMessages() {
+ public int getNumCurrentMessages() {
return localQueue.size();
}
@Override
- public final long getProtocolVersion(String arg0, long arg1)
- throws IOException {
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
return versionID;
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1224844&r1=1224843&r2=1224844&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
Tue Dec 27 00:37:41 2011
@@ -25,8 +25,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
-import java.util.Map.Entry;
import java.util.TreeMap;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,15 +39,15 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* This client class abstracts the use of our zookeeper sync code.
*
*/
-public final class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
+public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
/*
* TODO maybe extract an abstract class and let the subclasses implement
@@ -69,8 +69,8 @@ public final class ZooKeeperSyncClientIm
private String[] allPeers;
@Override
- public final void init(Configuration conf, BSPJobID jobId,
- TaskAttemptID taskId) throws Exception {
+ public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+ throws Exception {
quorumServers = QuorumPeer.getZKQuorumServersString(conf);
this.zk = new ZooKeeper(quorumServers, conf.getInt(
Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
@@ -86,8 +86,8 @@ public final class ZooKeeperSyncClientIm
}
@Override
- public final void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
- long superstep) throws SyncException {
+ public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long
superstep)
+ throws SyncException {
LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + superstep);
try {
@@ -100,9 +100,8 @@ public final class ZooKeeperSyncClientIm
createZnode(pathToSuperstepZnode);
BarrierWatcher barrierWatcher = new BarrierWatcher();
- // TODO not used?
- // Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
- // barrierWatcher);
+ Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+ barrierWatcher);
zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
@@ -141,15 +140,16 @@ public final class ZooKeeperSyncClientIm
}
@Override
- public final void leaveBarrier(final BSPJobID jobId,
- final TaskAttemptID taskId, final long superstep) throws SyncException {
+ public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
+ final long superstep) throws SyncException {
try {
final String pathToSuperstepZnode = bspRoot + "/"
+ taskId.getJobID().toString() + "/" + superstep;
while (true) {
List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
- LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or
not: at superstep:"
- + superstep + " znode:" + znodes);
+ LOG
+ .debug("leaveBarrier() !!! checking znodes contnains /ready node
or not: at superstep:"
+ + superstep + " znode:" + znodes);
if (znodes.contains("ready")) {
znodes.remove("ready");
}
@@ -234,7 +234,7 @@ public final class ZooKeeperSyncClientIm
}
@Override
- public final void register(BSPJobID jobId, TaskAttemptID taskId,
+ public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
try {
if (zk.exists("/" + jobId.toString(), false) == null) {
@@ -260,7 +260,7 @@ public final class ZooKeeperSyncClientIm
* @param port
* @param taskId
*/
- public final static void registerTask(ZooKeeper zk, BSPJobID jobId,
+ public static void registerTask(ZooKeeper zk, BSPJobID jobId,
String hostAddress, long port, TaskAttemptID taskId) {
byte[] taskIdBytes = serializeTaskId(taskId);
@@ -275,7 +275,7 @@ public final class ZooKeeperSyncClientIm
}
}
- private final static byte[] serializeTaskId(TaskAttemptID taskId) {
+ private static byte[] serializeTaskId(TaskAttemptID taskId) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bos);
try {
@@ -292,7 +292,7 @@ public final class ZooKeeperSyncClientIm
return bos.toByteArray();
}
- public final static TaskAttemptID deserializeTaskId(byte[] arr) {
+ public static TaskAttemptID deserializeTaskId(byte[] arr) {
ByteArrayInputStream bis = new ByteArrayInputStream(arr);
DataInputStream in = new DataInputStream(bis);
TaskAttemptID id = new TaskAttemptID();
@@ -311,7 +311,7 @@ public final class ZooKeeperSyncClientIm
}
@Override
- public final String[] getAllPeerNames(TaskAttemptID taskId) {
+ public String[] getAllPeerNames(TaskAttemptID taskId) {
if (allPeers == null) {
TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
try {
@@ -345,25 +345,25 @@ public final class ZooKeeperSyncClientIm
}
@Override
- public final void close() throws InterruptedException {
- zk.close();
+ public void close() throws InterruptedException {
+ zk.close();
}
@Override
- public final void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+ public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
// TODO
throw new UnsupportedOperationException();
}
@Override
- public final void stopServer() {
+ public void stopServer() {
// TODO
throw new UnsupportedOperationException();
}
@Override
- public final void process(WatchedEvent event) {
+ public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
@@ -376,26 +376,26 @@ public final class ZooKeeperSyncClientIm
/**
* @return the string as host:port of this Peer
*/
- public final String getPeerName() {
+ public String getPeerName() {
return peerAddress.getHostName() + ":" + peerAddress.getPort();
}
- private final String getNodeName(TaskAttemptID taskId, long superstep) {
+ private String getNodeName(TaskAttemptID taskId, long superstep) {
return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+ taskId.toString();
}
- private final void createZnode(final String path) throws KeeperException,
+ private void createZnode(final String path) throws KeeperException,
InterruptedException {
createZnode(path, CreateMode.PERSISTENT);
}
- private final void createEphemeralZnode(final String path)
- throws KeeperException, InterruptedException {
+ private void createEphemeralZnode(final String path) throws KeeperException,
+ InterruptedException {
createZnode(path, CreateMode.EPHEMERAL);
}
- private final void createZnode(final String path, final CreateMode mode)
+ private void createZnode(final String path, final CreateMode mode)
throws KeeperException, InterruptedException {
synchronized (zk) {
Stat s = zk.exists(path, false);