Updated Branches: refs/heads/trunk 419445017 -> 763621a45
GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker doesn't check null (akila via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/763621a4 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/763621a4 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/763621a4 Branch: refs/heads/trunk Commit: 763621a45cd7812ee968f78961775f85f5e95cb2 Parents: 4194450 Author: Maja Kabiljo <[email protected]> Authored: Mon Feb 3 09:18:28 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Feb 3 09:20:08 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 3 + .../apache/giraph/job/JobProgressTracker.java | 91 ++++++++++---------- 2 files changed, 50 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d2f5980..e21a291 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker + doesn't check null (akila via majakabiljo) + GIRAPH-834: Metrcis missing superstep time (armax00 via claudio) GIRAPH-819: Number of Containers Required for a Job (Rafal Wojdyla via ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java index f685344..a364dc4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java @@ -66,7 +66,8 @@ public class JobProgressTracker implements Watcher { final String basePath = CounterUtils.waitAndGetCounterNameFromGroup( submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP); // Connect to ZooKeeper - zk = new ZooKeeperExt( + if (zkServer != null && basePath != null) { + zk = new ZooKeeperExt( zkServer, conf.getZooKeeperSessionTimeout(), conf.getZookeeperOpsMaxAttempts(), @@ -77,62 +78,64 @@ public class JobProgressTracker implements Watcher { public void progress() { } }); - writerThread = new Thread(new Runnable() { - @Override - public void run() { - String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES; - try { - while (!finished) { - if (zk.exists(workerProgressBasePath, false) != null) { - // Get locations of all worker progresses - List<String> workerProgressPaths = zk.getChildrenExt( + writerThread = new Thread(new Runnable() { + @Override + public void run() { + String workerProgressBasePath = basePath + + BspService.WORKER_PROGRESSES; + try { + while (!finished) { + if (zk.exists(workerProgressBasePath, false) != null) { + // Get locations of all worker progresses + List<String> workerProgressPaths = zk.getChildrenExt( workerProgressBasePath, false, false, true); - List<WorkerProgress> workerProgresses = + List<WorkerProgress> workerProgresses = new ArrayList<WorkerProgress>(workerProgressPaths.size()); - // Read all worker progresses - for (String workerProgressPath : workerProgressPaths) { - WorkerProgress workerProgress = new WorkerProgress(); - byte[] zkData = zk.getData(workerProgressPath, false, null); - WritableUtils.readFieldsFromByteArray(zkData, workerProgress); - workerProgresses.add(workerProgress); - } - // Combine and log - CombinedWorkerProgress combinedWorkerProgress = + // Read all worker progresses + for (String workerProgressPath : workerProgressPaths) { + WorkerProgress workerProgress = new WorkerProgress(); + byte[] zkData = zk.getData(workerProgressPath, false, null); + WritableUtils.readFieldsFromByteArray(zkData, workerProgress); + workerProgresses.add(workerProgress); + } + // Combine and log + CombinedWorkerProgress combinedWorkerProgress = new CombinedWorkerProgress(workerProgresses); - if (LOG.isInfoEnabled()) { - LOG.info(combinedWorkerProgress.toString()); - } - // Check if application is done - if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { - break; + if (LOG.isInfoEnabled()) { + LOG.info(combinedWorkerProgress.toString()); + } + // Check if application is done + if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { + break; + } } + Thread.sleep(UPDATE_MILLISECONDS); } - Thread.sleep(UPDATE_MILLISECONDS); - } - } catch (InterruptedException | KeeperException e) { - if (LOG.isInfoEnabled()) { - LOG.info("run: Exception occurred", e); - } - } finally { - try { - // Create a node so master knows we stopped communicating with - // ZooKeeper and it's safe to cleanup - zk.createExt( + } catch (InterruptedException | KeeperException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: Exception occurred", e); + } + } finally { + try { + // Create a node so master knows we stopped communicating with + // ZooKeeper and it's safe to cleanup + zk.createExt( basePath + BspService.CLEANED_UP_DIR + "/client", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true); - zk.close(); - } catch (InterruptedException | KeeperException e) { - if (LOG.isInfoEnabled()) { - LOG.info("run: Exception occurred", e); + zk.close(); + } catch (InterruptedException | KeeperException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: Exception occurred", e); + } } } } - } - }); - writerThread.start(); + }); + writerThread.start(); + } } /**
