This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 5a3dee5 Add a cap to metrics communicator in TMasterSink and
MetricsCacheSink (#3355)
5a3dee5 is described below
commit 5a3dee54adf60b706a8af1a800c7ec75d1042ea0
Author: Ning Wang <[email protected]>
AuthorDate: Fri Oct 4 15:44:20 2019 -0700
Add a cap to metrics communicator in TMasterSink and MetricsCacheSink
(#3355)
* Add a cap to metrics communicator in TMasterSink and MetricsCacheSink
* refactor while loop
---
.../sink/metricscache/MetricsCacheClient.java | 13 +++++++++--
.../sink/metricscache/MetricsCacheSink.java | 19 +++++++++++++++
.../metricsmgr/sink/tmaster/TMasterClient.java | 12 ++++++++--
.../heron/metricsmgr/sink/tmaster/TMasterSink.java | 17 ++++++++++++++
.../sink/metricscache/MetricsCacheSinkTest.java | 27 ++++++++++++++++++++++
.../metricsmgr/sink/tmaster/TMasterSinkTest.java | 27 ++++++++++++++++++++++
6 files changed, 111 insertions(+), 4 deletions(-)
diff --git
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
index b95ef2e..5e0a3e9 100644
---
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
+++
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheClient.java
@@ -93,12 +93,21 @@ public class MetricsCacheClient extends HeronClient
implements Runnable {
Runnable task = new Runnable() {
@Override
public void run() {
- while (!publishMetricsCommunicator.isEmpty()) {
- TopologyMaster.PublishMetrics publishMetrics =
publishMetricsCommunicator.poll();
+ TopologyMaster.PublishMetrics publishMetrics;
+ while (true) {
+ synchronized (publishMetricsCommunicator) {
+ publishMetrics = publishMetricsCommunicator.poll();
+ }
+ if (publishMetrics == null) {
+ break; // No metrics left
+ }
+
LOG.info(String.format("%d Metrics, %d Exceptions to send to
MetricsCache",
publishMetrics.getMetricsCount(),
publishMetrics.getExceptionsCount()));
LOG.fine("Publish Metrics sending to MetricsCache: " +
publishMetrics.toString());
+
sendMessage(publishMetrics);
+
}
}
};
diff --git
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
index 4d2357f..0f049d1 100644
---
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
+++
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSink.java
@@ -83,6 +83,8 @@ import org.apache.heron.spi.metricsmgr.sink.SinkContext;
public class MetricsCacheSink implements IMetricsSink {
private static final Logger LOG =
Logger.getLogger(MetricsCacheSink.class.getName());
+ private static final int MAX_COMMUNICATOR_SIZE = 128;
+
// These configs would be read from metrics-sink-configs.yaml
private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
"metricscache-location-check-interval-sec";
@@ -232,10 +234,27 @@ public class MetricsCacheSink implements IMetricsSink {
metricsCommunicator.offer(publishMetrics.build());
+
+
// Update metrics
sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
sinkContext.exportCountMetric(METRICS_COUNT,
publishMetrics.getMetricsCount());
sinkContext.exportCountMetric(EXCEPTIONS_COUNT,
publishMetrics.getExceptionsCount());
+
+ checkCommunicator(metricsCommunicator, MAX_COMMUNICATOR_SIZE);
+ }
+
+ // Check if the communicator is full/overflow. Poll and drop extra elements
that
+ // are over the queue limit from the head.
+ public static void
checkCommunicator(Communicator<TopologyMaster.PublishMetrics> communicator,
+ int maxSize) {
+ synchronized (communicator) {
+ int size = communicator.size();
+
+ for (int i = 0; i < size - maxSize; ++i) {
+ communicator.poll();
+ }
+ }
}
@Override
diff --git
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
index 7c7354f..48e0a32 100644
---
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
+++
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterClient.java
@@ -88,11 +88,19 @@ public class TMasterClient extends HeronClient implements
Runnable {
Runnable task = new Runnable() {
@Override
public void run() {
- while (!publishMetricsCommunicator.isEmpty()) {
- TopologyMaster.PublishMetrics publishMetrics =
publishMetricsCommunicator.poll();
+ TopologyMaster.PublishMetrics publishMetrics;
+ while (true) {
+ synchronized (publishMetricsCommunicator) {
+ publishMetrics = publishMetricsCommunicator.poll();
+ }
+ if (publishMetrics == null) {
+ break; // No metrics left
+ }
+
LOG.info(String.format("%d Metrics, %d Exceptions to send to
TMaster",
publishMetrics.getMetricsCount(),
publishMetrics.getExceptionsCount()));
LOG.fine("Publish Metrics sending to TMaster: " +
publishMetrics.toString());
+
sendMessage(publishMetrics);
}
}
diff --git
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
index f044292..129d546 100644
---
a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
+++
b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSink.java
@@ -80,6 +80,8 @@ import org.apache.heron.spi.metricsmgr.sink.SinkContext;
public class TMasterSink implements IMetricsSink {
private static final Logger LOG =
Logger.getLogger(TMasterSink.class.getName());
+ private static final int MAX_COMMUNICATOR_SIZE = 128;
+
// These configs would be read from metrics-sink-configs.yaml
private static final String KEY_TMASTER_LOCATION_CHECK_INTERVAL_SEC =
"tmaster-location-check-interval-sec";
@@ -234,6 +236,21 @@ public class TMasterSink implements IMetricsSink {
sinkContext.exportCountMetric(RECORD_PROCESS_COUNT, 1);
sinkContext.exportCountMetric(METRICS_COUNT,
publishMetrics.getMetricsCount());
sinkContext.exportCountMetric(EXCEPTIONS_COUNT,
publishMetrics.getExceptionsCount());
+
+ checkCommunicator(metricsCommunicator, MAX_COMMUNICATOR_SIZE);
+ }
+
+ // Check if the communicator is full/overflow. Poll and drop extra elements
that
+ // are over the queue limit from the head.
+ public static void
checkCommunicator(Communicator<TopologyMaster.PublishMetrics> communicator,
+ int maxSize) {
+ synchronized (communicator) {
+ int size = communicator.size();
+
+ for (int i = 0; i < size - maxSize; ++i) {
+ communicator.poll();
+ }
+ }
}
@Override
diff --git
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
index d425eb3..b02669e 100644
---
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
+++
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/metricscache/MetricsCacheSinkTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.heron.api.metric.MultiCountMetric;
+import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.config.SystemConfig;
@@ -188,4 +189,30 @@ public class MetricsCacheSinkTest {
metricsCacheSink.close();
}
+
+ @Test
+ public void testCheckCommunicator() {
+ Communicator<TopologyMaster.PublishMetrics> communicator = new
Communicator<>();
+ int initSize = 16;
+ int capSize = 10;
+
+ TopologyMaster.PublishMetrics.Builder publishMetrics =
+ TopologyMaster.PublishMetrics.newBuilder();
+ for (int i = 0; i < initSize; ++i) {
+ communicator.offer(publishMetrics.build());
+ }
+ assertEquals(communicator.size(), initSize);
+
+ MetricsCacheSink.checkCommunicator(communicator, initSize + 1);
+ assertEquals(communicator.size(), initSize);
+
+ MetricsCacheSink.checkCommunicator(communicator, initSize);
+ assertEquals(communicator.size(), initSize);
+
+ MetricsCacheSink.checkCommunicator(communicator, initSize - 1);
+ assertEquals(communicator.size(), initSize - 1);
+
+ MetricsCacheSink.checkCommunicator(communicator, capSize);
+ assertEquals(communicator.size(), capSize);
+ }
}
diff --git
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
index 5100270..3d57adf 100644
---
a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
+++
b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/tmaster/TMasterSinkTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.heron.api.metric.MultiCountMetric;
+import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.config.SystemConfig;
@@ -186,5 +187,31 @@ public class TMasterSinkTest {
tMasterSink.close();
}
+
+ @Test
+ public void testCheckCommunicator() {
+ Communicator<TopologyMaster.PublishMetrics> communicator = new
Communicator<>();
+ int initSize = 16;
+ int capSize = 10;
+
+ TopologyMaster.PublishMetrics.Builder publishMetrics =
+ TopologyMaster.PublishMetrics.newBuilder();
+ for (int i = 0; i < initSize; ++i) {
+ communicator.offer(publishMetrics.build());
+ }
+ assertEquals(communicator.size(), initSize);
+
+ TMasterSink.checkCommunicator(communicator, initSize + 1);
+ assertEquals(communicator.size(), initSize);
+
+ TMasterSink.checkCommunicator(communicator, initSize);
+ assertEquals(communicator.size(), initSize);
+
+ TMasterSink.checkCommunicator(communicator, initSize - 1);
+ assertEquals(communicator.size(), initSize - 1);
+
+ TMasterSink.checkCommunicator(communicator, capSize);
+ assertEquals(communicator.size(), capSize);
+ }
}