This is an automated email from the ASF dual-hosted git repository. dionysios pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push: new 526f561 GIRAPH-799 526f561 is described below commit 526f5619e6b115ad8db1af245fd4736125dd5c37 Author: Aanchal Dalmia <aanchaldalmia...@gmail.com> AuthorDate: Mon Dec 9 11:26:54 2019 -0800 GIRAPH-799 closes #117 --- .../org/apache/giraph/bsp/CentralizedServiceMaster.java | 5 ++++- .../java/org/apache/giraph/counters/CustomCounters.java | 10 ++++++++-- .../java/org/apache/giraph/counters/GiraphTimers.java | 17 ++++++++++------- .../java/org/apache/giraph/master/BspServiceMaster.java | 8 ++++---- .../java/org/apache/giraph/master/MasterThread.java | 4 +++- .../java/org/apache/giraph/worker/BspServiceWorker.java | 2 +- 6 files changed, 30 insertions(+), 16 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index 6f5d459..cbf44b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -183,7 +183,10 @@ public interface CentralizedServiceMaster<I extends WritableComparable, /** * Add the Giraph Timers to thirft counter struct, and send to the job client + * Counters include the Giraph Timers for setup, initialise, shutdown, total, + * and time for the given superstep + * @param superstep superstep for which the GiraphTimer will be sent * */ - void addGiraphTimersAndSendCounters(); + void addGiraphTimersAndSendCounters(long superstep); } diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java b/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java index 482cbbc..4a4fb79 100644 --- a/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java +++ b/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java @@ -19,6 +19,7 @@ package org.apache.giraph.counters; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -73,11 +74,16 @@ public class CustomCounters { /** * Get the unique counter group and names + * This will also clear the counters list, to avoid duplicate + * counters from the previous superstep from being sent to the + * zookeeper again * * @return Map of unique counter names */ - public static Set<CustomCounter> getCustomCounters() { - return COUNTER_NAMES; + public static Set<CustomCounter> getAndClearCustomCounters() { + Set<CustomCounter> counterNamesCopy = new HashSet<>(COUNTER_NAMES); + COUNTER_NAMES.clear(); + return counterNamesCopy; } /** diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java index f4005c4..33875e1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java +++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java @@ -184,11 +184,13 @@ public class GiraphTimers extends HadoopCountersBase { } /** - * Get a map of counter names and values - * + * Get a map of counter names and values for the given superstep + * Counters include Setup, Initialise, Shutdown, Total, and time for + * the given superstep + * @param superstep superstep for which to fetch the GiraphTimer * @return Map of counter names and values */ - public List<CustomCounter> getCounterList() { + public List<CustomCounter> getCounterList(long superstep) { List<CustomCounter> countersList = new ArrayList<>(); for (GiraphHadoopCounter counter: jobCounters) { CustomCounter customCounter = new CustomCounter( @@ -196,11 +198,12 @@ public class GiraphTimers extends HadoopCountersBase { CustomCounter.Aggregation.SUM, counter.getValue()); countersList.add(customCounter); } - for (Map.Entry<Long, GiraphHadoopCounter> entry : - superstepMsec.entrySet()) { + GiraphHadoopCounter giraphHadoopCounter = superstepMsec.get(superstep); + if (giraphHadoopCounter != null) { CustomCounter customCounter = new CustomCounter( - GROUP_NAME, entry.getValue().getName(), - CustomCounter.Aggregation.SUM, entry.getValue().getValue()); + GROUP_NAME, giraphHadoopCounter.getName(), + CustomCounter.Aggregation.SUM, + giraphHadoopCounter.getValue()); countersList.add(customCounter); } return countersList; diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 153d4cc..87a5b0c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1678,7 +1678,6 @@ public class BspServiceMaster<I extends WritableComparable, // are no more messages in the system, stop the computation GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); aggregateCountersFromWorkersAndMaster(); - addGiraphTimersAndSendCounters(); if (masterCompute.isHalted() || (globalStats.getFinishedVertexCount() == globalStats.getVertexCount() && @@ -1927,7 +1926,7 @@ public class BspServiceMaster<I extends WritableComparable, // we should not add them again here. Counter counter; Set<CustomCounter> masterCounterNames = - CustomCounters.getCustomCounters(); + CustomCounters.getAndClearCustomCounters(); for (CustomCounter customCounter : masterCounterNames) { String groupName = customCounter.getGroupName(); String counterName = customCounter.getCounterName(); @@ -1961,12 +1960,13 @@ public class BspServiceMaster<I extends WritableComparable, * the time required for shutdown and cleanup * This will fetch the final Giraph Timers, and send all the counters * to the job client + * @param superstep superstep for which the GiraphTimer will be sent * */ - public void addGiraphTimersAndSendCounters() { + public void addGiraphTimersAndSendCounters(long superstep) { List<CustomCounter> giraphCounters = giraphCountersThriftStruct.getCounters(); - giraphCounters.addAll(GiraphTimers.getInstance().getCounterList()); + giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep)); giraphCountersThriftStruct.setCounters(giraphCounters); getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct); } diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java index dc86ca3..7fccc34 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java @@ -147,6 +147,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable, GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep, computationName).increment(superstepMillis); } + bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep); bspServiceMaster.postSuperstep(); @@ -191,7 +192,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable, GiraphTimers.getInstance().getTotalMs(). increment(System.currentTimeMillis() - initializeMillis); } - bspServiceMaster.addGiraphTimersAndSendCounters(); + bspServiceMaster.addGiraphTimersAndSendCounters( + bspServiceMaster.getSuperstep()); bspServiceMaster.postApplication(); // CHECKSTYLE: stop IllegalCatchCheck } catch (Exception e) { diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 5cbc4e7..b6756c9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -1251,7 +1251,7 @@ else[HADOOP_NON_SECURE]*/ */ public void storeCountersInZooKeeper(boolean allSuperstepsDone) { Set<CustomCounter> additionalCounters = - CustomCounters.getCustomCounters(); + CustomCounters.getAndClearCustomCounters(); JSONArray jsonCounters = new JSONArray(); Mapper.Context context = getContext();