This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e35ae  Add metrics for communicator size in metrics manager (#3351)
98e35ae is described below

commit 98e35ae59a71910c6baa9f28a99db8e3d4386796
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Tue Oct 1 11:40:20 2019 -0700

    Add metrics for communicator size in metrics manager (#3351)
---
 .../apache/heron/metricsmgr/MetricsManager.java    | 11 +++---
 .../heron/metricsmgr/MetricsManagerServer.java     | 42 ++++++++++++++++------
 .../heron/metricsmgr/MetricsManagerServerTest.java | 13 ++++---
 3 files changed, 44 insertions(+), 22 deletions(-)

diff --git 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java
index 8d4ab5e..076a504 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java
@@ -199,7 +199,7 @@ public class MetricsManager {
               : TypeUtils.getInteger(restartAttempts));
 
       // Update the list of Communicator in Metrics Manager Server
-      metricsManagerServer.addSinkCommunicator(sinkExecutor.getCommunicator());
+      metricsManagerServer.addSinkCommunicator(sinkId, 
sinkExecutor.getCommunicator());
     }
   }
 
@@ -537,12 +537,11 @@ public class MetricsManager {
       // If the thread name is a key of SinkExecutors, then it is a thread 
running IMetricsSink
       if (sinkExecutors.containsKey(thread.getName())) {
         sinkId = thread.getName();
-        // Remove the old sink executor
-        SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
         // Remove the unneeded Communicator bind with Metrics Manager Server
-        
metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator());
+        metricsManagerServer.removeSinkCommunicator(sinkId);
 
-        // Close the sink
+        // Remove the old sink executor and close the sink
+        SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
         SysUtils.closeIgnoringExceptions(oldSinkExecutor);
 
         thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId);
@@ -565,7 +564,7 @@ public class MetricsManager {
         sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts);
 
         // Update the list of Communicator in Metrics Manager Server
-        
metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator());
+        metricsManagerServer.addSinkCommunicator(sinkId, 
newSinkExecutor.getCommunicator());
 
         // Restart it
         executors.execute(newSinkExecutor);
diff --git 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
index a9ff54f..8b9e6ab 100644
--- 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
+++ 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java
@@ -22,10 +22,11 @@ package org.apache.heron.metricsmgr;
 import java.net.SocketAddress;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -61,8 +62,10 @@ public class MetricsManagerServer extends HeronServer {
   private static final String SERVER_EXCEPTIONS_RECEIVED = 
"exceptions-received";
   private static final String SERVER_NEW_TMASTER_LOCATION = 
"new-tmaster-location";
   private static final String SERVER_TMASTER_LOCATION_RECEIVED = 
"tmaster-location-received";
+  private static final String SERVER_COMMUNICATOR_OFFER = "communicator-offer";
+  private static final String SERVER_COMMUNICATOR_SIZE = "communicator-size";
 
-  private final List<Communicator<MetricsRecord>> metricsSinkCommunicators;
+  private final Map<String, Communicator<MetricsRecord>> 
metricsSinkCommunicators;
 
   // A map from MetricPublisher's immutable SocketAddress to the 
MetricPublisher
   // We would fetch SocketAddress by using 
SocketChannel.socket().getRemoteSocketAddress,
@@ -96,7 +99,7 @@ public class MetricsManagerServer extends HeronServer {
     // Since we might mutate the list while iterating it
     // Consider that the iteration vastly outnumbers mutation,
     // it would barely hurt any performance
-    this.metricsSinkCommunicators = new 
CopyOnWriteArrayList<Communicator<MetricsRecord>>();
+    this.metricsSinkCommunicators = Collections.synchronizedMap(new 
HashMap<>());
 
     this.publisherMap = new HashMap<SocketAddress, Metrics.MetricPublisher>();
 
@@ -119,14 +122,24 @@ public class MetricsManagerServer extends HeronServer {
     registerOnMessage(Metrics.MetricsCacheLocationRefreshMessage.newBuilder());
   }
 
-  public void addSinkCommunicator(Communicator<MetricsRecord> communicator) {
-    LOG.info("Communicator is added: " + communicator);
-    this.metricsSinkCommunicators.add(communicator);
+  public void addSinkCommunicator(String id, Communicator<MetricsRecord> 
communicator) {
+    LOG.info("Communicator " + id + " is added: " + communicator);
+    synchronized (metricsSinkCommunicators) {
+      this.metricsSinkCommunicators.put(id, communicator);
+    }
   }
 
-  public boolean removeSinkCommunicator(Communicator<MetricsRecord> 
communicator) {
-    LOG.info("Communicator is removed: " + communicator);
-    return this.metricsSinkCommunicators.remove(communicator);
+  public boolean removeSinkCommunicator(String id) {
+    LOG.info("Remove communicator: " + id);
+    boolean found = false;
+    synchronized (metricsSinkCommunicators) {
+      if (metricsSinkCommunicators.containsKey(id)) {
+        this.metricsSinkCommunicators.remove(id);
+        found = true;
+        LOG.info("Communicator " + id + " is removed");
+      }
+    }
+    return found;
   }
 
   @Override
@@ -279,8 +292,15 @@ public class MetricsManagerServer extends HeronServer {
     MetricsRecord record = new MetricsRecord(source, metricsInfos, 
exceptionInfos);
 
     // Push MetricsRecord to Communicator, which would wake up SlaveLooper 
bind with IMetricsSink
-    for (Communicator<MetricsRecord> c : metricsSinkCommunicators) {
-      c.offer(record);
+    synchronized (metricsSinkCommunicators) {
+      Iterator<String> itr = metricsSinkCommunicators.keySet().iterator();
+      while (itr.hasNext()) {
+        String key = itr.next();
+        Communicator<MetricsRecord> c = metricsSinkCommunicators.get(key);
+        c.offer(record);
+        serverMetricsCounters.scope(SERVER_COMMUNICATOR_OFFER).incr();
+        serverMetricsCounters.scope(SERVER_COMMUNICATOR_SIZE + "-" + 
key).incrBy(c.size());
+      }
     }
   }
 
diff --git 
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
 
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
index 994f1ea..49b5d90 100644
--- 
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
+++ 
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java
@@ -83,9 +83,10 @@ public class MetricsManagerServerTest {
    */
   @Test
   public void testAddSinkCommunicator() {
+    String name = "test_communicator";
     Communicator<MetricsRecord> sinkCommunicator = new Communicator<>();
-    metricsManagerServer.addSinkCommunicator(sinkCommunicator);
-    
Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(sinkCommunicator));
+    metricsManagerServer.addSinkCommunicator(name, sinkCommunicator);
+    Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(name));
   }
 
   /**
@@ -93,9 +94,10 @@ public class MetricsManagerServerTest {
    */
   @Test
   public void testRemoveSinkCommunicator() {
+    String name = "test_communicator";
     Communicator<MetricsRecord> sinkCommunicator = new Communicator<>();
-    metricsManagerServer.addSinkCommunicator(sinkCommunicator);
-    
Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(sinkCommunicator));
+    metricsManagerServer.addSinkCommunicator(name, sinkCommunicator);
+    Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(name));
   }
 
   /**
@@ -103,10 +105,11 @@ public class MetricsManagerServerTest {
    */
   @Test
   public void testMetricsManagerServer() throws InterruptedException {
+    String name = "test_communicator";
     CountDownLatch offersLatch = new CountDownLatch(MESSAGE_SIZE);
     Communicator<MetricsRecord> sinkCommunicator =
         CommunicatorTestHelper.spyCommunicator(new 
Communicator<MetricsRecord>(), offersLatch);
-    metricsManagerServer.addSinkCommunicator(sinkCommunicator);
+    metricsManagerServer.addSinkCommunicator(name, sinkCommunicator);
 
     serverTester.start();
 

Reply via email to