Author: edwardyoon
Date: Tue Jun 9 08:27:29 2015
New Revision: 1684349
URL: http://svn.apache.org/r1684349
Log:
HAMA-939: Refactoring which was implement using out-of-date status response
Modified:
hama/trunk/conf/log4j.properties
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
Modified: hama/trunk/conf/log4j.properties
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/log4j.properties?rev=1684349&r1=1684348&r2=1684349&view=diff
==============================================================================
--- hama/trunk/conf/log4j.properties (original)
+++ hama/trunk/conf/log4j.properties Tue Jun 9 08:27:29 2015
@@ -83,4 +83,3 @@ log4j.appender.console.layout.Conversion
#log4j.logger.org.apache.hadoop.dfs=DEBUG
#log4j.logger.org.apache.hama=DEBUG
#log4j.logger.org.apache.zookeeper=DEBUG
-#log4j.logger.org.apache.avro=DEBUG
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1684349&r1=1684348&r2=1684349&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
Tue Jun 9 08:27:29 2015
@@ -237,16 +237,26 @@ public class ZooKeeperSyncClientImpl ext
@Override
public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
- try {
- String jobRegisterKey = constructKey(jobId, "peers");
- if (zk.exists(jobRegisterKey, false) == null) {
+ int count = 0;
+ String jobRegisterKey = constructKey(jobId, "peers");
+ Stat stat = null;
+
+ LOG.info("TaskAttemptID : " + taskId);
+ while (stat != null) {
+ try {
+ stat = zk.exists(jobRegisterKey, false);
zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ LOG.debug(e); // ignore it.
+ }
+ count++;
+
+ // retry 10 times.
+ if (count > 9) {
+ throw new RuntimeException("can't create root node.");
}
- } catch (KeeperException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
}
registerTask(jobId, hostAddress, port, taskId);
}
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java?rev=1684349&r1=1684348&r2=1684349&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
Tue Jun 9 08:27:29 2015
@@ -80,7 +80,6 @@ public class ApplicationMaster implemen
private String hostname;
private int clientPort;
private FileSystem fs;
- private static int id = 0;
private volatile long superstep;
private Counters globalCounter = new Counters();
@@ -186,6 +185,7 @@ public class ApplicationMaster implemen
LogManager.shutdown();
ExitUtil.terminate(1, t);
} finally {
+ LOG.info("Stop SyncServer and RPCServer.");
appMaster.close();
}
@@ -491,6 +491,7 @@ public class ApplicationMaster implemen
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
+
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
@@ -502,10 +503,8 @@ public class ApplicationMaster implemen
+ allocatedContainer.getResource().getMemory()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
- // + ", containerToken"
- // +allocatedContainer.getContainerToken().getIdentifier().toString());
- Thread launchThread = createLaunchContainerThread(allocatedContainer);
+ Thread launchThread = createLaunchContainerThread(allocatedContainer,
allocatedContainer.getId().getContainerId());
// launch and start the container on a separate thread to keep
// the main thread unblocked
@@ -513,7 +512,6 @@ public class ApplicationMaster implemen
launchThreads.add(launchThread);
launchedContainers.add(allocatedContainer.getId());
launchThread.start();
- id++;
}
}
@@ -621,15 +619,19 @@ public class ApplicationMaster implemen
Configuration conf;
+ long taskAttemptId;
+
/**
* @param lcontainer Allocated container
* @param containerListener Callback handler of the container
*/
public LaunchContainerRunnable(
- Container lcontainer, NMCallbackHandler containerListener,
Configuration conf) {
+ Container lcontainer, NMCallbackHandler containerListener,
+ Configuration conf, long taskAttemptId) {
this.container = lcontainer;
this.containerListener = containerListener;
this.conf = conf;
+ this.taskAttemptId = taskAttemptId;
}
/**
@@ -725,7 +727,7 @@ public class ApplicationMaster implemen
vargs.add(BSPRunner.class.getCanonicalName());
vargs.add(jobId.getJtIdentifier());
- vargs.add(Integer.toString(id));
+ vargs.add(Long.toString(taskAttemptId));
vargs.add(
new Path(jobFile).makeQualified(fs.getUri(),
fs.getWorkingDirectory())
.toString());
@@ -805,7 +807,6 @@ public class ApplicationMaster implemen
"ApplicationAttemptId not set in the environment");
}
- LOG.info("app attempt id!!!");
ContainerId containerId = ConverterUtils.toContainerId(envs
.get(ApplicationConstants.Environment.CONTAINER_ID.name()));
return containerId.getApplicationAttemptId();
@@ -930,9 +931,9 @@ public class ApplicationMaster implemen
}
@VisibleForTesting
- Thread createLaunchContainerThread(Container allocatedContainer) {
+ Thread createLaunchContainerThread(Container allocatedContainer, long
taskAttemptId) {
LaunchContainerRunnable runnableLaunchContainer =
- new LaunchContainerRunnable(allocatedContainer, containerListener,
jobConf);
+ new LaunchContainerRunnable(allocatedContainer, containerListener,
jobConf, taskAttemptId);
return new Thread(runnableLaunchContainer);
}
@@ -1001,6 +1002,7 @@ public class ApplicationMaster implemen
public void close() throws IOException {
this.clientServer.stop();
this.taskServer.stop();
+ this.syncServer.stopServer();
}
@Override