Repository: giraph Updated Branches: refs/heads/trunk 17fac7292 -> 6d603ec7d
GIRAPH-868: Fix race condition with WorkerProgress (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6d603ec7 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6d603ec7 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6d603ec7 Branch: refs/heads/trunk Commit: 6d603ec7ddbb78cfa1b0bf14e3efa22a0676f5c8 Parents: 17fac72 Author: Maja Kabiljo <[email protected]> Authored: Fri Mar 7 13:38:56 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Fri Mar 7 13:38:56 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 1 + .../org/apache/giraph/worker/BspServiceWorker.java | 1 - .../apache/giraph/worker/WorkerProgressWriter.java | 16 ++++++++++++---- 3 files changed, 13 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index fa93d13..c241971 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,7 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-868: Fix race condition with WorkerProgress (majakabiljo) GIRAPH-867: Fix comments in PageRankComputation (ssc) http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index c0b28dd..6b721b3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -1180,7 +1180,6 @@ public class BspServiceWorker<I extends WritableComparable, saveEdges(); WorkerProgress.get().finishStoring(); if (workerProgressWriter != null) { - WorkerProgress.writeToZnode(getZkExt(), myProgressPath); workerProgressWriter.stop(); } getPartitionStore().shutdown(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java index 95e46e4..4ff5bb1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java @@ -35,6 +35,10 @@ public class WorkerProgressWriter { private final Thread writerThread; /** Whether worker finished application */ private volatile boolean finished = false; + /** Path where this worker's progress should be stored */ + private final String myProgressPath; + /** ZooKeeperExt */ + private final ZooKeeperExt zk; /** * Constructor, starts separate thread to periodically update worker's @@ -43,15 +47,17 @@ public class WorkerProgressWriter { * @param myProgressPath Path where this worker's progress should be stored * @param zk ZooKeeperExt */ - public WorkerProgressWriter(final String myProgressPath, - final ZooKeeperExt zk) { + public WorkerProgressWriter(String myProgressPath, ZooKeeperExt zk) { + this.myProgressPath = myProgressPath; + this.zk = zk; writerThread = new Thread(new Runnable() { @Override public void run() { try { while (!finished) { WorkerProgress.get().updateMemory(); - WorkerProgress.writeToZnode(zk, myProgressPath); + WorkerProgress.writeToZnode(WorkerProgressWriter.this.zk, + WorkerProgressWriter.this.myProgressPath); double factor = 1 + Math.random(); Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor)); } @@ -69,8 +75,10 @@ public class WorkerProgressWriter { /** * Stop the thread which writes worker's progress */ - public void stop() { + public void stop() throws InterruptedException { finished = true; writerThread.interrupt(); + writerThread.join(); + WorkerProgress.writeToZnode(zk, myProgressPath); } }
