Author: aching
Date: Thu Jul 26 04:06:35 2012
New Revision: 1365878
URL: http://svn.apache.org/viewvc?rev=1365878&view=rev
Log:
GIRAPH-267: Jobs can get killed for not reporting status during INPUT
SUPERSTEP (netj via aching).
Removed:
giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Jul 26 04:06:35 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-267: Jobs can get killed for not reporting status during
+ INPUT SUPERSTEP (netj via aching).
+
GIRAPH-266: Average aggregators don't calculate real average
(majakabiljo via aching).
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Thu Jul
26 04:06:35 2012
@@ -208,37 +208,27 @@ public abstract class BspService<I exten
/** Private ZooKeeper instance that implements the service */
private final ZooKeeperExt zk;
/** Has the Connection occurred? */
- private final BspEvent connectedEvent = new PredicateLock();
+ private final BspEvent connectedEvent;
/** Has worker registration changed (either healthy or unhealthy) */
- private final BspEvent workerHealthRegistrationChanged =
- new PredicateLock();
+ private final BspEvent workerHealthRegistrationChanged;
/** InputSplits are ready for consumption by workers */
- private final BspEvent inputSplitsAllReadyChanged =
- new PredicateLock();
+ private final BspEvent inputSplitsAllReadyChanged;
/** InputSplit reservation or finished notification and synchronization */
- private final BspEvent inputSplitsStateChanged =
- new PredicateLock();
+ private final BspEvent inputSplitsStateChanged;
/** InputSplits are done being processed by workers */
- private final BspEvent inputSplitsAllDoneChanged =
- new PredicateLock();
+ private final BspEvent inputSplitsAllDoneChanged;
/** InputSplit done by a worker finished notification and synchronization */
- private final BspEvent inputSplitsDoneStateChanged =
- new PredicateLock();
+ private final BspEvent inputSplitsDoneStateChanged;
/** Are the partition assignments to workers ready? */
- private final BspEvent partitionAssignmentsReadyChanged =
- new PredicateLock();
+ private final BspEvent partitionAssignmentsReadyChanged;
/** Application attempt changed */
- private final BspEvent applicationAttemptChanged =
- new PredicateLock();
+ private final BspEvent applicationAttemptChanged;
/** Superstep finished synchronization */
- private final BspEvent superstepFinished =
- new PredicateLock();
+ private final BspEvent superstepFinished;
/** Master election changed for any waited on attempt */
- private final BspEvent masterElectionChildrenChanged =
- new PredicateLock();
+ private final BspEvent masterElectionChildrenChanged;
/** Cleaned up directory children changed*/
- private final BspEvent cleanedUpChildrenChanged =
- new PredicateLock();
+ private final BspEvent cleanedUpChildrenChanged;
/** Registered list of BspEvents */
private final List<BspEvent> registeredBspEvents =
new ArrayList<BspEvent>();
@@ -284,6 +274,18 @@ public abstract class BspService<I exten
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphMapper<I, V, E, M> graphMapper) {
+ this.connectedEvent = new PredicateLock(context);
+ this.workerHealthRegistrationChanged = new PredicateLock(context);
+ this.inputSplitsAllReadyChanged = new PredicateLock(context);
+ this.inputSplitsStateChanged = new PredicateLock(context);
+ this.inputSplitsAllDoneChanged = new PredicateLock(context);
+ this.inputSplitsDoneStateChanged = new PredicateLock(context);
+ this.partitionAssignmentsReadyChanged = new PredicateLock(context);
+ this.applicationAttemptChanged = new PredicateLock(context);
+ this.superstepFinished = new PredicateLock(context);
+ this.masterElectionChildrenChanged = new PredicateLock(context);
+ this.cleanedUpChildrenChanged = new PredicateLock(context);
+
registerBspEvent(connectedEvent);
registerBspEvent(workerHealthRegistrationChanged);
registerBspEvent(inputSplitsAllReadyChanged);
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Thu Jul 26 04:06:35 2012
@@ -121,8 +121,7 @@ public class BspServiceMaster<I extends
/** Last finalized checkpoint */
private long lastCheckpointedSuperstep = -1;
/** State of the superstep changed */
- private final BspEvent superstepStateChanged =
- new PredicateLock();
+ private final BspEvent superstepStateChanged;
/** Master graph partitioner */
private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
/** All the partition stats from the last superstep */
@@ -147,6 +146,7 @@ public class BspServiceMaster<I extends
Mapper<?, ?, ?, ?>.Context context,
GraphMapper<I, V, E, M> graphMapper) {
super(serverPortList, sessionMsecTimeout, context, graphMapper);
+ superstepStateChanged = new PredicateLock(context);
registerBspEvent(superstepStateChanged);
maxWorkers =
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Thu Jul 26 04:06:35 2012
@@ -106,8 +106,7 @@ public class BspServiceWorker<I extends
private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
new HashMap<Integer, Partition<I, V, E, M>>();
/** Have the partition exchange children (workers) changed? */
- private final BspEvent partitionExchangeChildrenChanged =
- new PredicateLock();
+ private final BspEvent partitionExchangeChildrenChanged;
/** Max vertices per partition before sending */
private final int maxVerticesPerPartition;
/** Worker Context */
@@ -139,6 +138,7 @@ public class BspServiceWorker<I extends
GraphState<I, V, E, M> graphState)
throws IOException, InterruptedException {
super(serverPortList, sessionMsecTimeout, context, graphMapper);
+ partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
maxVerticesPerPartition =
getConfiguration().getInt(
@@ -654,7 +654,7 @@ public class BspServiceWorker<I extends
}
// At this point all vertices have been sent to their destinations.
- // Move them to the worker, creating creating the empty partitions
+ // Move them to the worker, creating the empty partitions
movePartitionsToWorker(commService);
for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Thu Jul
26 04:06:35 2012
@@ -23,14 +23,20 @@ import java.util.concurrent.locks.Condit
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
/**
- * A lock with a predicate that was be used to synchronize events.
+ * A lock with a predicate that was be used to synchronize events and keep the
+ * job context updated while waiting.
*/
public class PredicateLock implements BspEvent {
/** Class logger */
private static final Logger LOG = Logger.getLogger(PredicateLock.class);
+ /** Msecs to refresh the progress meter */
+ private static final int MSEC_PERIOD = 10000;
+ /** Progressable for reporting progress (Job context) */
+ protected final Progressable progressable;
/** Lock */
private Lock lock = new ReentrantLock();
/** Condition associated with lock */
@@ -38,6 +44,15 @@ public class PredicateLock implements Bs
/** Predicate */
private boolean eventOccurred = false;
+ /**
+ * Constructor.
+ *
+ * @param progressable used to report progress() (usually a Mapper.Context)
+ */
+ public PredicateLock(Progressable progressable) {
+ this.progressable = progressable;
+ }
+
@Override
public void reset() {
lock.lock();
@@ -111,6 +126,8 @@ public class PredicateLock implements Bs
@Override
public void waitForever() {
- waitMsecs(-1);
+ while (!waitMsecs(MSEC_PERIOD)) {
+ progressable.progress();
+ }
}
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java?rev=1365878&r1=1365877&r2=1365878&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestPredicateLock.java Thu Jul
26 04:06:35 2012
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.util.Progressable;
import org.junit.Test;
/**
@@ -43,12 +44,25 @@ public class TestPredicateLock {
}
}
+ private Progressable stubContext;
+
+ private Progressable getStubProgressable() {
+ if (stubContext == null)
+ stubContext = new Progressable() {
+ @Override
+ public void progress() {
+ System.out.println("progress received");
+ }
+ };
+ return stubContext;
+ }
+
/**
* Make sure the the event is not signaled.
*/
@Test
public void testWaitMsecsNoEvent() {
- BspEvent event = new PredicateLock();
+ BspEvent event = new PredicateLock(getStubProgressable());
boolean gotPredicate = event.waitMsecs(50);
assertFalse(gotPredicate);
}
@@ -58,7 +72,7 @@ public class TestPredicateLock {
*/
@Test
public void testEvent() {
- BspEvent event = new PredicateLock();
+ BspEvent event = new PredicateLock(getStubProgressable());
event.signal();
boolean gotPredicate = event.waitMsecs(-1);
assertTrue(gotPredicate );
@@ -74,7 +88,7 @@ public class TestPredicateLock {
@Test
public void testWaitMsecs() {
System.out.println("testWaitMsecs:");
- BspEvent event = new PredicateLock();
+ BspEvent event = new PredicateLock(getStubProgressable());
Thread signalThread = new SignalThread(event);
signalThread.start();
boolean gotPredicate = event.waitMsecs(2000);