This is an automated email from the ASF dual-hosted git repository. nwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 98e35ae Add metrics for communicator size in metrics manager (#3351) 98e35ae is described below commit 98e35ae59a71910c6baa9f28a99db8e3d4386796 Author: Ning Wang <nw...@twitter.com> AuthorDate: Tue Oct 1 11:40:20 2019 -0700 Add metrics for communicator size in metrics manager (#3351) --- .../apache/heron/metricsmgr/MetricsManager.java | 11 +++--- .../heron/metricsmgr/MetricsManagerServer.java | 42 ++++++++++++++++------ .../heron/metricsmgr/MetricsManagerServerTest.java | 13 ++++--- 3 files changed, 44 insertions(+), 22 deletions(-) diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java index 8d4ab5e..076a504 100644 --- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java +++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManager.java @@ -199,7 +199,7 @@ public class MetricsManager { : TypeUtils.getInteger(restartAttempts)); // Update the list of Communicator in Metrics Manager Server - metricsManagerServer.addSinkCommunicator(sinkExecutor.getCommunicator()); + metricsManagerServer.addSinkCommunicator(sinkId, sinkExecutor.getCommunicator()); } } @@ -537,12 +537,11 @@ public class MetricsManager { // If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink if (sinkExecutors.containsKey(thread.getName())) { sinkId = thread.getName(); - // Remove the old sink executor - SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); // Remove the unneeded Communicator bind with Metrics Manager Server - metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator()); + metricsManagerServer.removeSinkCommunicator(sinkId); - // Close the sink + // Remove the old sink executor and close the sink + SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); SysUtils.closeIgnoringExceptions(oldSinkExecutor); thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); @@ -565,7 +564,7 @@ public class MetricsManager { sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts); // Update the list of Communicator in Metrics Manager Server - metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator()); + metricsManagerServer.addSinkCommunicator(sinkId, newSinkExecutor.getCommunicator()); // Restart it executors.execute(newSinkExecutor); diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java index a9ff54f..8b9e6ab 100644 --- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java +++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java @@ -22,10 +22,11 @@ package org.apache.heron.metricsmgr; import java.net.SocketAddress; import java.nio.channels.SocketChannel; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import java.util.logging.Logger; @@ -61,8 +62,10 @@ public class MetricsManagerServer extends HeronServer { private static final String SERVER_EXCEPTIONS_RECEIVED = "exceptions-received"; private static final String SERVER_NEW_TMASTER_LOCATION = "new-tmaster-location"; private static final String SERVER_TMASTER_LOCATION_RECEIVED = "tmaster-location-received"; + private static final String SERVER_COMMUNICATOR_OFFER = "communicator-offer"; + private static final String SERVER_COMMUNICATOR_SIZE = "communicator-size"; - private final List<Communicator<MetricsRecord>> metricsSinkCommunicators; + private final Map<String, Communicator<MetricsRecord>> metricsSinkCommunicators; // A map from MetricPublisher's immutable SocketAddress to the MetricPublisher // We would fetch SocketAddress by using SocketChannel.socket().getRemoteSocketAddress, @@ -96,7 +99,7 @@ public class MetricsManagerServer extends HeronServer { // Since we might mutate the list while iterating it // Consider that the iteration vastly outnumbers mutation, // it would barely hurt any performance - this.metricsSinkCommunicators = new CopyOnWriteArrayList<Communicator<MetricsRecord>>(); + this.metricsSinkCommunicators = Collections.synchronizedMap(new HashMap<>()); this.publisherMap = new HashMap<SocketAddress, Metrics.MetricPublisher>(); @@ -119,14 +122,24 @@ public class MetricsManagerServer extends HeronServer { registerOnMessage(Metrics.MetricsCacheLocationRefreshMessage.newBuilder()); } - public void addSinkCommunicator(Communicator<MetricsRecord> communicator) { - LOG.info("Communicator is added: " + communicator); - this.metricsSinkCommunicators.add(communicator); + public void addSinkCommunicator(String id, Communicator<MetricsRecord> communicator) { + LOG.info("Communicator " + id + " is added: " + communicator); + synchronized (metricsSinkCommunicators) { + this.metricsSinkCommunicators.put(id, communicator); + } } - public boolean removeSinkCommunicator(Communicator<MetricsRecord> communicator) { - LOG.info("Communicator is removed: " + communicator); - return this.metricsSinkCommunicators.remove(communicator); + public boolean removeSinkCommunicator(String id) { + LOG.info("Remove communicator: " + id); + boolean found = false; + synchronized (metricsSinkCommunicators) { + if (metricsSinkCommunicators.containsKey(id)) { + this.metricsSinkCommunicators.remove(id); + found = true; + LOG.info("Communicator " + id + " is removed"); + } + } + return found; } @Override @@ -279,8 +292,15 @@ public class MetricsManagerServer extends HeronServer { MetricsRecord record = new MetricsRecord(source, metricsInfos, exceptionInfos); // Push MetricsRecord to Communicator, which would wake up SlaveLooper bind with IMetricsSink - for (Communicator<MetricsRecord> c : metricsSinkCommunicators) { - c.offer(record); + synchronized (metricsSinkCommunicators) { + Iterator<String> itr = metricsSinkCommunicators.keySet().iterator(); + while (itr.hasNext()) { + String key = itr.next(); + Communicator<MetricsRecord> c = metricsSinkCommunicators.get(key); + c.offer(record); + serverMetricsCounters.scope(SERVER_COMMUNICATOR_OFFER).incr(); + serverMetricsCounters.scope(SERVER_COMMUNICATOR_SIZE + "-" + key).incrBy(c.size()); + } } } diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java index 994f1ea..49b5d90 100644 --- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java +++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/MetricsManagerServerTest.java @@ -83,9 +83,10 @@ public class MetricsManagerServerTest { */ @Test public void testAddSinkCommunicator() { + String name = "test_communicator"; Communicator<MetricsRecord> sinkCommunicator = new Communicator<>(); - metricsManagerServer.addSinkCommunicator(sinkCommunicator); - Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(sinkCommunicator)); + metricsManagerServer.addSinkCommunicator(name, sinkCommunicator); + Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(name)); } /** @@ -93,9 +94,10 @@ public class MetricsManagerServerTest { */ @Test public void testRemoveSinkCommunicator() { + String name = "test_communicator"; Communicator<MetricsRecord> sinkCommunicator = new Communicator<>(); - metricsManagerServer.addSinkCommunicator(sinkCommunicator); - Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(sinkCommunicator)); + metricsManagerServer.addSinkCommunicator(name, sinkCommunicator); + Assert.assertTrue(metricsManagerServer.removeSinkCommunicator(name)); } /** @@ -103,10 +105,11 @@ public class MetricsManagerServerTest { */ @Test public void testMetricsManagerServer() throws InterruptedException { + String name = "test_communicator"; CountDownLatch offersLatch = new CountDownLatch(MESSAGE_SIZE); Communicator<MetricsRecord> sinkCommunicator = CommunicatorTestHelper.spyCommunicator(new Communicator<MetricsRecord>(), offersLatch); - metricsManagerServer.addSinkCommunicator(sinkCommunicator); + metricsManagerServer.addSinkCommunicator(name, sinkCommunicator); serverTester.start();