This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a3dee5  Add a cap to metrics communicator in TMasterSink and 
MetricsCacheSink (#3355)
5a3dee5 is described below

commit 5a3dee54adf60b706a8af1a800c7ec75d1042ea0
Author: Ning Wang <[email protected]>
AuthorDate: Fri Oct 4 15:44:20 2019 -0700

    Add a cap to metrics communicator in TMasterSink and MetricsCacheSink 
(#3355)
    
    * Add a cap to metrics communicator in TMasterSink and MetricsCacheSink
    
    * refactor while loop
---
 .../sink/metricscache/MetricsCacheClient.java      | 13 +++++++++--
 .../sink/metricscache/MetricsCacheSink.java        | 19 +++++++++++++++
 .../metricsmgr/sink/tmaster/TMasterClient.java     | 12 ++++++++--
 .../heron/metricsmgr/sink/tmaster/TMasterSink.java | 17 ++++++++++++++
 .../sink/metricscache/MetricsCacheSinkTest.java    | 27 ++++++++++++++++++++++
 .../metricsmgr/sink/tmaster/TMasterSinkTest.java   | 27 ++++++++++++++++++++++
 6 files changed, 111 insertions(+), 4 deletions(-)

diff --git 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
index b95ef2e..5e0a3e9 100644
--- 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
+++ 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
@@ -93,12 +93,21 @@ public class MetricsCacheClient extends HeronClient 
implements Runnable {
     Runnable task = new Runnable() {
       @Override
       public void run() {
-        while (!publishMetricsCommunicator.isEmpty()) {
-          TopologyMaster.PublishMetrics publishMetrics = 
publishMetricsCommunicator.poll();
+        TopologyMaster.PublishMetrics publishMetrics;
+        while (true) {
+          synchronized (publishMetricsCommunicator) {
+            publishMetrics = publishMetricsCommunicator.poll();
+          }
+          if (publishMetrics == null) {
+            break;  // No metrics left
+          }
+
           LOG.info(String.format("%d Metrics, %d Exceptions to send to 
MetricsCache",
               publishMetrics.getMetricsCount(), 
publishMetrics.getExceptionsCount()));
           LOG.fine("Publish Metrics sending to MetricsCache: " + 
publishMetrics.toString());
+
           sendMessage(publishMetrics);
+
         }
       }
     };
diff --git 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
index 4d2357f..0f049d1 100644
--- 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
+++ 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
@@ -83,6 +83,8 @@ import org.apache.heron.spi.metricsmgr.sink.SinkContext;
 public class MetricsCacheSink implements IMetricsSink {
   private static final Logger LOG = 
Logger.getLogger(MetricsCacheSink.class.getName());
 
+  private static final int MAX_COMMUNICATOR_SIZE = 128;
+
   // These configs would be read from metrics-sink-configs.yaml
   private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
       "metricscache-location-check-interval-sec";
@@ -232,10 +234,27 @@ public class MetricsCacheSink implements IMetricsSink {
 
     metricsCommunicator.offer(publishMetrics.build());
 
+
+
     // Update metrics
     sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
     sinkContext.exportCountMetric(METRICS_COUNT, 
publishMetrics.getMetricsCount());
     sinkContext.exportCountMetric(EXCEPTIONS_COUNT, 
publishMetrics.getExceptionsCount());
+
+    checkCommunicator(metricsCommunicator, MAX_COMMUNICATOR_SIZE);
+  }
+
+  // Check if the communicator is full/overflow. Poll and drop extra elements 
that
+  // are over the queue limit from the head.
+  public static void 
checkCommunicator(Communicator<TopologyMaster.PublishMetrics> communicator,
+                                        int maxSize) {
+    synchronized (communicator) {
+      int size = communicator.size();
+
+      for (int i = 0; i < size - maxSize; ++i) {
+        communicator.poll();
+      }
+    }
   }
 
   @Override
diff --git 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
index 7c7354f..48e0a32 100644
--- 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
+++ 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
@@ -88,11 +88,19 @@ public class TMasterClient extends HeronClient implements 
Runnable {
     Runnable task = new Runnable() {
       @Override
       public void run() {
-        while (!publishMetricsCommunicator.isEmpty()) {
-          TopologyMaster.PublishMetrics publishMetrics = 
publishMetricsCommunicator.poll();
+        TopologyMaster.PublishMetrics publishMetrics;
+        while (true) {
+          synchronized (publishMetricsCommunicator) {
+            publishMetrics = publishMetricsCommunicator.poll();
+          }
+          if (publishMetrics == null) {
+            break;  // No metrics left
+          }
+
           LOG.info(String.format("%d Metrics, %d Exceptions to send to 
TMaster",
               publishMetrics.getMetricsCount(), 
publishMetrics.getExceptionsCount()));
           LOG.fine("Publish Metrics sending to TMaster: " + 
publishMetrics.toString());
+
           sendMessage(publishMetrics);
         }
       }
diff --git 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
index f044292..129d546 100644
--- 
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
+++ 
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
@@ -80,6 +80,8 @@ import org.apache.heron.spi.metricsmgr.sink.SinkContext;
 public class TMasterSink implements IMetricsSink {
   private static final Logger LOG = 
Logger.getLogger(TMasterSink.class.getName());
 
+  private static final int MAX_COMMUNICATOR_SIZE = 128;
+
   // These configs would be read from metrics-sink-configs.yaml
   private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
       "tmaster-location-check-interval-sec";
@@ -234,6 +236,21 @@ public class TMasterSink implements IMetricsSink {
     sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
     sinkContext.exportCountMetric(METRICS_COUNT, 
publishMetrics.getMetricsCount());
     sinkContext.exportCountMetric(EXCEPTIONS_COUNT, 
publishMetrics.getExceptionsCount());
+
+    checkCommunicator(metricsCommunicator, MAX_COMMUNICATOR_SIZE);
+  }
+
+  // Check if the communicator is full/overflow. Poll and drop extra elements 
that
+  // are over the queue limit from the head.
+  public static void 
checkCommunicator(Communicator<TopologyMaster.PublishMetrics> communicator,
+                                        int maxSize) {
+    synchronized (communicator) {
+      int size = communicator.size();
+
+      for (int i = 0; i < size - maxSize; ++i) {
+        communicator.poll();
+      }
+    }
   }
 
   @Override
diff --git 
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
 
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
index d425eb3..b02669e 100644
--- 
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
+++ 
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.heron.api.metric.MultiCountMetric;
+import org.apache.heron.common.basics.Communicator;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.common.config.SystemConfig;
@@ -188,4 +189,30 @@ public class MetricsCacheSinkTest {
 
     metricsCacheSink.close();
   }
+
+  @Test
+  public void testCheckCommunicator() {
+    Communicator<TopologyMaster.PublishMetrics> communicator = new 
Communicator<>();
+    int initSize = 16;
+    int capSize = 10;
+
+    TopologyMaster.PublishMetrics.Builder publishMetrics =
+        TopologyMaster.PublishMetrics.newBuilder();
+    for (int i = 0; i < initSize; ++i) {
+      communicator.offer(publishMetrics.build());
+    }
+    assertEquals(communicator.size(), initSize);
+
+    MetricsCacheSink.checkCommunicator(communicator, initSize + 1);
+    assertEquals(communicator.size(), initSize);
+
+    MetricsCacheSink.checkCommunicator(communicator, initSize);
+    assertEquals(communicator.size(), initSize);
+
+    MetricsCacheSink.checkCommunicator(communicator, initSize - 1);
+    assertEquals(communicator.size(), initSize - 1);
+
+    MetricsCacheSink.checkCommunicator(communicator, capSize);
+    assertEquals(communicator.size(), capSize);
+  }
 }
diff --git 
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
 
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
index 5100270..3d57adf 100644
--- 
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
+++ 
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.heron.api.metric.MultiCountMetric;
+import org.apache.heron.common.basics.Communicator;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.basics.SysUtils;
 import org.apache.heron.common.config.SystemConfig;
@@ -186,5 +187,31 @@ public class TMasterSinkTest {
 
     tMasterSink.close();
   }
+
+  @Test
+  public void testCheckCommunicator() {
+    Communicator<TopologyMaster.PublishMetrics> communicator = new 
Communicator<>();
+    int initSize = 16;
+    int capSize = 10;
+
+    TopologyMaster.PublishMetrics.Builder publishMetrics =
+        TopologyMaster.PublishMetrics.newBuilder();
+    for (int i = 0; i < initSize; ++i) {
+      communicator.offer(publishMetrics.build());
+    }
+    assertEquals(communicator.size(), initSize);
+
+    TMasterSink.checkCommunicator(communicator, initSize + 1);
+    assertEquals(communicator.size(), initSize);
+
+    TMasterSink.checkCommunicator(communicator, initSize);
+    assertEquals(communicator.size(), initSize);
+
+    TMasterSink.checkCommunicator(communicator, initSize - 1);
+    assertEquals(communicator.size(), initSize - 1);
+
+    TMasterSink.checkCommunicator(communicator, capSize);
+    assertEquals(communicator.size(), capSize);
+  }
 }
 

Reply via email to