This is an automated email from the ASF dual-hosted git repository.

dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 526f561  GIRAPH-799
526f561 is described below

commit 526f5619e6b115ad8db1af245fd4736125dd5c37
Author: Aanchal Dalmia <aanchaldalmia...@gmail.com>
AuthorDate: Mon Dec 9 11:26:54 2019 -0800

    GIRAPH-799
    
    closes #117
---
 .../org/apache/giraph/bsp/CentralizedServiceMaster.java |  5 ++++-
 .../java/org/apache/giraph/counters/CustomCounters.java | 10 ++++++++--
 .../java/org/apache/giraph/counters/GiraphTimers.java   | 17 ++++++++++-------
 .../java/org/apache/giraph/master/BspServiceMaster.java |  8 ++++----
 .../java/org/apache/giraph/master/MasterThread.java     |  4 +++-
 .../java/org/apache/giraph/worker/BspServiceWorker.java |  2 +-
 6 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 6f5d459..cbf44b2 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -183,7 +183,10 @@ public interface CentralizedServiceMaster<I extends 
WritableComparable,
 
   /**
    * Add the Giraph Timers to thirft counter struct, and send to the job client
+   * Counters include the Giraph Timers for setup, initialise, shutdown, total,
+   * and time for the given superstep
+   * @param superstep superstep for which the GiraphTimer will be sent
    *
    */
-  void addGiraphTimersAndSendCounters();
+  void addGiraphTimersAndSendCounters(long superstep);
 }
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java 
b/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java
index 482cbbc..4a4fb79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/CustomCounters.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.counters;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -73,11 +74,16 @@ public class CustomCounters {
 
   /**
    * Get the unique counter group and names
+   * This will also clear the counters list, to avoid duplicate
+   * counters from the previous superstep from being sent to the
+   * zookeeper again
    *
    * @return Map of unique counter names
    */
-  public static Set<CustomCounter> getCustomCounters() {
-    return COUNTER_NAMES;
+  public static Set<CustomCounter> getAndClearCustomCounters() {
+    Set<CustomCounter> counterNamesCopy = new HashSet<>(COUNTER_NAMES);
+    COUNTER_NAMES.clear();
+    return counterNamesCopy;
   }
 
   /**
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java 
b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
index f4005c4..33875e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
@@ -184,11 +184,13 @@ public class GiraphTimers extends HadoopCountersBase {
   }
 
   /**
-   * Get a map of counter names and values
-   *
+   * Get a map of counter names and values for the given superstep
+   * Counters include Setup, Initialise, Shutdown, Total, and time for
+   * the given superstep
+   * @param superstep superstep for which to fetch the GiraphTimer
    * @return Map of counter names and values
    */
-  public List<CustomCounter> getCounterList() {
+  public List<CustomCounter> getCounterList(long superstep) {
     List<CustomCounter> countersList = new ArrayList<>();
     for (GiraphHadoopCounter counter: jobCounters) {
       CustomCounter customCounter = new CustomCounter(
@@ -196,11 +198,12 @@ public class GiraphTimers extends HadoopCountersBase {
               CustomCounter.Aggregation.SUM, counter.getValue());
       countersList.add(customCounter);
     }
-    for (Map.Entry<Long, GiraphHadoopCounter> entry :
-            superstepMsec.entrySet()) {
+    GiraphHadoopCounter giraphHadoopCounter = superstepMsec.get(superstep);
+    if (giraphHadoopCounter != null) {
       CustomCounter customCounter = new CustomCounter(
-              GROUP_NAME, entry.getValue().getName(),
-              CustomCounter.Aggregation.SUM, entry.getValue().getValue());
+              GROUP_NAME, giraphHadoopCounter.getName(),
+              CustomCounter.Aggregation.SUM,
+              giraphHadoopCounter.getValue());
       countersList.add(customCounter);
     }
     return countersList;
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 153d4cc..87a5b0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1678,7 +1678,6 @@ public class BspServiceMaster<I extends 
WritableComparable,
     // are no more messages in the system, stop the computation
     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
     aggregateCountersFromWorkersAndMaster();
-    addGiraphTimersAndSendCounters();
     if (masterCompute.isHalted() ||
         (globalStats.getFinishedVertexCount() ==
         globalStats.getVertexCount() &&
@@ -1927,7 +1926,7 @@ public class BspServiceMaster<I extends 
WritableComparable,
       // we should not add them again here.
       Counter counter;
       Set<CustomCounter> masterCounterNames =
-              CustomCounters.getCustomCounters();
+              CustomCounters.getAndClearCustomCounters();
       for (CustomCounter customCounter : masterCounterNames) {
         String groupName = customCounter.getGroupName();
         String counterName = customCounter.getCounterName();
@@ -1961,12 +1960,13 @@ public class BspServiceMaster<I extends 
WritableComparable,
    * the time required for shutdown and cleanup
    * This will fetch the final Giraph Timers, and send all the counters
    * to the job client
+   * @param superstep superstep for which the GiraphTimer will be sent
    *
    */
-  public void addGiraphTimersAndSendCounters() {
+  public void addGiraphTimersAndSendCounters(long superstep) {
     List<CustomCounter> giraphCounters =
             giraphCountersThriftStruct.getCounters();
-    giraphCounters.addAll(GiraphTimers.getInstance().getCounterList());
+    
giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep));
     giraphCountersThriftStruct.setCounters(giraphCounters);
     getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct);
   }
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java 
b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index dc86ca3..7fccc34 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -147,6 +147,7 @@ public class MasterThread<I extends WritableComparable, V 
extends Writable,
               GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
                   computationName).increment(superstepMillis);
             }
+            bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep);
 
             bspServiceMaster.postSuperstep();
 
@@ -191,7 +192,8 @@ public class MasterThread<I extends WritableComparable, V 
extends Writable,
         GiraphTimers.getInstance().getTotalMs().
           increment(System.currentTimeMillis() - initializeMillis);
       }
-      bspServiceMaster.addGiraphTimersAndSendCounters();
+      bspServiceMaster.addGiraphTimersAndSendCounters(
+              bspServiceMaster.getSuperstep());
       bspServiceMaster.postApplication();
       // CHECKSTYLE: stop IllegalCatchCheck
     } catch (Exception e) {
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5cbc4e7..b6756c9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1251,7 +1251,7 @@ else[HADOOP_NON_SECURE]*/
    */
   public void storeCountersInZooKeeper(boolean allSuperstepsDone) {
     Set<CustomCounter> additionalCounters =
-            CustomCounters.getCustomCounters();
+            CustomCounters.getAndClearCustomCounters();
 
     JSONArray jsonCounters = new JSONArray();
     Mapper.Context context = getContext();

Reply via email to