Author: apresta
Date: Fri Aug 17 09:36:02 2012
New Revision: 1374184
URL: http://svn.apache.org/viewvc?rev=1374184&view=rev
Log:
GIRAPH-303: Regression: cleanup phase happens earlier than it should.
(majakabiljo via apresta)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1374184&r1=1374183&r2=1374184&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Aug 17 09:36:02 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-303: Regression: cleanup phase happens earlier than it
+ should. (majakabiljo via apresta)
+
GIRAPH-278: Website still tries to load incubator logo (ekoontz)
GIRAPH-300) Improve netty reliability with retrying failed
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1374184&r1=1374183&r2=1374184&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Fri Aug 17 09:36:02 2012
@@ -1384,6 +1384,40 @@ public class BspServiceMaster<I extends
return true;
}
+ /**
+ * Clean up old superstep data from Zookeeper
+ *
+ * @param removeableSuperstep Supersteo to clean up
+ * @throws InterruptedException
+ */
+ private void cleanUpOldSuperstep(long removeableSuperstep) throws
+ InterruptedException {
+ if (!(getConfiguration().getBoolean(
+ GiraphJob.KEEP_ZOOKEEPER_DATA,
+ GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+ (removeableSuperstep >= 0)) {
+ String oldSuperstepPath =
+ getSuperstepPath(getApplicationAttempt()) + "/" +
+ removeableSuperstep;
+ try {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
+ oldSuperstepPath);
+ }
+ getZkExt().deleteExt(oldSuperstepPath,
+ -1,
+ true);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("coordinateBarrier: Already cleaned up " +
+ oldSuperstepPath);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "coordinateSuperstep: KeeperException on " +
+ "finalizing checkpoint", e);
+ }
+ }
+ }
+
@Override
public SuperstepState coordinateSuperstep() throws
KeeperException, InterruptedException {
@@ -1441,33 +1475,6 @@ public class BspServiceMaster<I extends
}
}
- // Clean up the old supersteps (always keep this one)
- long removeableSuperstep = getSuperstep() - 1;
- if (!(getConfiguration().getBoolean(
- GiraphJob.KEEP_ZOOKEEPER_DATA,
- GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
- (removeableSuperstep >= 0)) {
- String oldSuperstepPath =
- getSuperstepPath(getApplicationAttempt()) + "/" +
- removeableSuperstep;
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
- oldSuperstepPath);
- }
- getZkExt().deleteExt(oldSuperstepPath,
- -1,
- true);
- } catch (KeeperException.NoNodeException e) {
- LOG.warn("coordinateBarrier: Already cleaned up " +
- oldSuperstepPath);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: KeeperException on " +
- "finalizing checkpoint", e);
- }
- }
-
if (getSuperstep() == INPUT_SUPERSTEP) {
// Coordinate the workers finishing sending their vertices to the
// correct workers and signal when everything is done.
@@ -1527,6 +1534,7 @@ public class BspServiceMaster<I extends
getZkExt(), superstepFinishedNode, -1, globalStats);
updateCounters(globalStats);
+ cleanUpOldSuperstep(getSuperstep() - 1);
incrCachedSuperstep();
// Counter starts at zero, so no need to increment
if (getSuperstep() > 0) {