zentol closed pull request #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 7188a597c86..244a1ede5ca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +27,8 @@
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.util.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,14 +69,18 @@
// contains for every configured reporter its name and the
configuration object
private final List<Tuple2<String, Configuration>>
reporterConfigurations;
+ private final long queryServiceMessageSizeLimit;
+
public MetricRegistryConfiguration(
ScopeFormats scopeFormats,
char delimiter,
- List<Tuple2<String, Configuration>> reporterConfigurations) {
+ List<Tuple2<String, Configuration>> reporterConfigurations,
+ long queryServiceMessageSizeLimit) {
this.scopeFormats = Preconditions.checkNotNull(scopeFormats);
this.delimiter = delimiter;
this.reporterConfigurations =
Preconditions.checkNotNull(reporterConfigurations);
+ this.queryServiceMessageSizeLimit =
queryServiceMessageSizeLimit;
}
//
------------------------------------------------------------------------
@@ -92,6 +99,10 @@ public char getDelimiter() {
return reporterConfigurations;
}
+ public long getQueryServiceMessageSizeLimit() {
+ return queryServiceMessageSizeLimit;
+ }
+
//
------------------------------------------------------------------------
// Static factory methods
//
------------------------------------------------------------------------
@@ -160,7 +171,15 @@ public static MetricRegistryConfiguration
fromConfiguration(Configuration config
}
}
- return new MetricRegistryConfiguration(scopeFormats, delim,
reporterConfigurations);
+ final String maxFrameSizeStr =
configuration.getString(AkkaOptions.FRAMESIZE);
+ final String akkaConfigStr = String.format("akka {remote
{netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
+ final Config akkaConfig =
ConfigFactory.parseString(akkaConfigStr);
+ final long maximumFrameSize =
akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+
+ // padding to account for serialization overhead
+ final long messageSizeLimitPadding = 256;
+
+ return new MetricRegistryConfiguration(scopeFormats, delim,
reporterConfigurations, maximumFrameSize - messageSizeLimitPadding);
}
public static MetricRegistryConfiguration
defaultMetricRegistryConfiguration() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index 6b3770907a9..31775e24276 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -77,6 +77,8 @@
private final CompletableFuture<Void> terminationFuture;
+ private final long maximumFramesize;
+
@Nullable
private ActorRef queryService;
@@ -91,6 +93,7 @@
* Creates a new MetricRegistry and starts the configured reporter.
*/
public MetricRegistryImpl(MetricRegistryConfiguration config) {
+ this.maximumFramesize =
config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
@@ -184,7 +187,7 @@ public void startQueryService(ActorSystem actorSystem,
ResourceID resourceID) {
Preconditions.checkState(!isShutdown(), "The metric
registry has already been shut down.");
try {
- queryService =
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+ queryService =
MetricQueryService.startMetricQueryService(actorSystem, resourceID,
maximumFramesize);
metricQueryServicePath =
AkkaUtils.getAkkaURL(actorSystem, queryService);
} catch (Exception e) {
LOG.warn("Could not start MetricDumpActor. No
metrics will be submitted to the WebInterface.", e);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 16a885dd345..5456b56cdbd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -73,19 +73,38 @@ private MetricDumpSerialization() {
private static final long serialVersionUID =
6928770855951536906L;
- public final byte[] serializedMetrics;
+ public final byte[] serializedCounters;
+ public final byte[] serializedGauges;
+ public final byte[] serializedMeters;
+ public final byte[] serializedHistograms;
+
public final int numCounters;
public final int numGauges;
public final int numMeters;
public final int numHistograms;
- public MetricSerializationResult(byte[] serializedMetrics, int
numCounters, int numGauges, int numMeters, int numHistograms) {
- Preconditions.checkNotNull(serializedMetrics);
+ public MetricSerializationResult(
+ byte[] serializedCounters,
+ byte[] serializedGauges,
+ byte[] serializedMeters,
+ byte[] serializedHistograms,
+ int numCounters,
+ int numGauges,
+ int numMeters,
+ int numHistograms) {
+
+ Preconditions.checkNotNull(serializedCounters);
+ Preconditions.checkNotNull(serializedGauges);
+ Preconditions.checkNotNull(serializedMeters);
+ Preconditions.checkNotNull(serializedHistograms);
Preconditions.checkArgument(numCounters >= 0);
Preconditions.checkArgument(numGauges >= 0);
Preconditions.checkArgument(numMeters >= 0);
Preconditions.checkArgument(numHistograms >= 0);
- this.serializedMetrics = serializedMetrics;
+ this.serializedCounters = serializedCounters;
+ this.serializedGauges = serializedGauges;
+ this.serializedMeters = serializedMeters;
+ this.serializedHistograms = serializedHistograms;
this.numCounters = numCounters;
this.numGauges = numGauges;
this.numMeters = numMeters;
@@ -102,7 +121,10 @@ public MetricSerializationResult(byte[] serializedMetrics,
int numCounters, int
*/
public static class MetricDumpSerializer {
- private DataOutputSerializer buffer = new
DataOutputSerializer(1024 * 32);
+ private DataOutputSerializer countersBuffer = new
DataOutputSerializer(1024 * 8);
+ private DataOutputSerializer gaugesBuffer = new
DataOutputSerializer(1024 * 8);
+ private DataOutputSerializer metersBuffer = new
DataOutputSerializer(1024 * 8);
+ private DataOutputSerializer histogramsBuffer = new
DataOutputSerializer(1024 * 8);
/**
* Serializes the given metrics and returns the resulting byte
array.
@@ -126,53 +148,66 @@ public MetricSerializationResult serialize(
Map<Histogram, Tuple2<QueryScopeInfo, String>>
histograms,
Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
- buffer.clear();
-
+ countersBuffer.clear();
int numCounters = 0;
for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>>
entry : counters.entrySet()) {
try {
- serializeCounter(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeCounter(countersBuffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numCounters++;
} catch (Exception e) {
LOG.debug("Failed to serialize
counter.", e);
}
}
+ gaugesBuffer.clear();
int numGauges = 0;
for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo,
String>> entry : gauges.entrySet()) {
try {
- serializeGauge(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeGauge(gaugesBuffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numGauges++;
} catch (Exception e) {
LOG.debug("Failed to serialize gauge.",
e);
}
}
+ histogramsBuffer.clear();
int numHistograms = 0;
for (Map.Entry<Histogram, Tuple2<QueryScopeInfo,
String>> entry : histograms.entrySet()) {
try {
- serializeHistogram(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeHistogram(histogramsBuffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numHistograms++;
} catch (Exception e) {
LOG.debug("Failed to serialize
histogram.", e);
}
}
+ metersBuffer.clear();
int numMeters = 0;
for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>>
entry : meters.entrySet()) {
try {
- serializeMeter(buffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+ serializeMeter(metersBuffer,
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numMeters++;
} catch (Exception e) {
LOG.debug("Failed to serialize meter.",
e);
}
}
- return new
MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges,
numMeters, numHistograms);
+ return new MetricSerializationResult(
+ countersBuffer.getCopyOfBuffer(),
+ gaugesBuffer.getCopyOfBuffer(),
+ metersBuffer.getCopyOfBuffer(),
+ histogramsBuffer.getCopyOfBuffer(),
+ numCounters,
+ numGauges,
+ numMeters,
+ numHistograms);
}
public void close() {
- buffer = null;
+ countersBuffer = null;
+ gaugesBuffer = null;
+ metersBuffer = null;
+ histogramsBuffer = null;
}
}
@@ -280,13 +315,16 @@ private static void serializeMeter(DataOutput out,
QueryScopeInfo info, String n
* @return A list containing the deserialized metrics.
*/
public List<MetricDump>
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
- DataInputView in = new
DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
+ DataInputView countersInputView = new
DataInputDeserializer(data.serializedCounters, 0,
data.serializedCounters.length);
+ DataInputView gaugesInputView = new
DataInputDeserializer(data.serializedGauges, 0, data.serializedGauges.length);
+ DataInputView metersInputView = new
DataInputDeserializer(data.serializedMeters, 0, data.serializedMeters.length);
+ DataInputView histogramsInputView = new
DataInputDeserializer(data.serializedHistograms, 0,
data.serializedHistograms.length);
- List<MetricDump> metrics = new
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms +
data.numMeters);
+ List<MetricDump> metrics = new
ArrayList<>(data.numCounters + data.numGauges + data.numMeters +
data.numHistograms);
for (int x = 0; x < data.numCounters; x++) {
try {
- metrics.add(deserializeCounter(in));
+
metrics.add(deserializeCounter(countersInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize
counter.", e);
}
@@ -294,25 +332,25 @@ private static void serializeMeter(DataOutput out,
QueryScopeInfo info, String n
for (int x = 0; x < data.numGauges; x++) {
try {
- metrics.add(deserializeGauge(in));
+
metrics.add(deserializeGauge(gaugesInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize
gauge.", e);
}
}
- for (int x = 0; x < data.numHistograms; x++) {
+ for (int x = 0; x < data.numMeters; x++) {
try {
- metrics.add(deserializeHistogram(in));
+
metrics.add(deserializeMeter(metersInputView));
} catch (Exception e) {
- LOG.debug("Failed to deserialize
histogram.", e);
+ LOG.debug("Failed to deserialize
meter.", e);
}
}
- for (int x = 0; x < data.numMeters; x++) {
+ for (int x = 0; x < data.numHistograms; x++) {
try {
- metrics.add(deserializeMeter(in));
+
metrics.add(deserializeHistogram(histogramsInputView));
} catch (Exception e) {
- LOG.debug("Failed to deserialize
meter.", e);
+ LOG.debug("Failed to deserialize
histogram.", e);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d9f4a..fc69d17503d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -55,6 +55,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(MetricQueryService.class);
public static final String METRIC_QUERY_SERVICE_NAME =
"MetricQueryService";
+ private static final String SIZE_EXCEEDED_LOG_TEMPLATE = "{} will not
be reported as the metric dump would exceed the maximum size of {} bytes.";
private static final CharacterFilter FILTER = new CharacterFilter() {
@Override
@@ -70,6 +71,12 @@ public String filterCharacters(String input) {
private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms
= new HashMap<>();
private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new
HashMap<>();
+ private final long messageSizeLimit;
+
+ public MetricQueryService(long messageSizeLimit) {
+ this.messageSizeLimit = messageSizeLimit;
+ }
+
@Override
public void postStop() {
serializer.close();
@@ -109,6 +116,9 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {
MetricDumpSerialization.MetricSerializationResult dump =
serializer.serialize(counters, gauges, histograms, meters);
+
+ dump = enforceSizeLimit(dump);
+
getSender().tell(dump, getSelf());
} else {
LOG.warn("MetricQueryServiceActor received an
invalid message. " + message.toString());
@@ -119,6 +129,83 @@ public void onReceive(Object message) {
}
}
+ private MetricDumpSerialization.MetricSerializationResult
enforceSizeLimit(
+ MetricDumpSerialization.MetricSerializationResult
serializationResult) {
+
+ int currentLength = 0;
+ boolean hasExceededBefore = false;
+
+ byte[] serializedCounters =
serializationResult.serializedCounters;
+ int numCounters = serializationResult.numCounters;
+ if (exceedsMessageSizeLimit(currentLength +
serializationResult.serializedCounters.length)) {
+ logDumpSizeWouldExceedLimit("Counters",
hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedCounters = new byte[0];
+ numCounters = 0;
+ } else {
+ currentLength += serializedCounters.length;
+ }
+
+ byte[] serializedMeters = serializationResult.serializedMeters;
+ int numMeters = serializationResult.numMeters;
+ if (exceedsMessageSizeLimit(currentLength +
serializationResult.serializedMeters.length)) {
+ logDumpSizeWouldExceedLimit("Meters",
hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedMeters = new byte[0];
+ numMeters = 0;
+ } else {
+ currentLength += serializedMeters.length;
+ }
+
+ byte[] serializedGauges = serializationResult.serializedGauges;
+ int numGauges = serializationResult.numGauges;
+ if (exceedsMessageSizeLimit(currentLength +
serializationResult.serializedGauges.length)) {
+ logDumpSizeWouldExceedLimit("Gauges",
hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedGauges = new byte[0];
+ numGauges = 0;
+ } else {
+ currentLength += serializedGauges.length;
+ }
+
+ byte[] serializedHistograms =
serializationResult.serializedHistograms;
+ int numHistograms = serializationResult.numHistograms;
+ if (exceedsMessageSizeLimit(currentLength +
serializationResult.serializedHistograms.length)) {
+ logDumpSizeWouldExceedLimit("Histograms",
hasExceededBefore);
+ hasExceededBefore = true;
+
+ serializedHistograms = new byte[0];
+ numHistograms = 0;
+ }
+
+ return new MetricDumpSerialization.MetricSerializationResult(
+ serializedCounters,
+ serializedGauges,
+ serializedMeters,
+ serializedHistograms,
+ numCounters,
+ numGauges,
+ numMeters,
+ numHistograms);
+ }
+
+ private boolean exceedsMessageSizeLimit(final int currentSize) {
+ return currentSize > messageSizeLimit;
+ }
+
+ private void logDumpSizeWouldExceedLimit(final String metricType,
boolean hasExceededBefore) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(SIZE_EXCEEDED_LOG_TEMPLATE, metricType,
messageSizeLimit);
+ } else {
+ if (!hasExceededBefore) {
+ LOG.info(SIZE_EXCEEDED_LOG_TEMPLATE, "Some
metrics", messageSizeLimit);
+ }
+ }
+ }
+
/**
* Lightweight method to replace unsupported characters.
* If the string does not contain any unsupported characters, this
method creates no
@@ -165,11 +252,16 @@ static String replaceInvalidChars(String str) {
* @param resourceID resource ID to disambiguate the actor name
* @return actor reference to the MetricQueryService
*/
- public static ActorRef startMetricQueryService(ActorSystem actorSystem,
ResourceID resourceID) {
+ public static ActorRef startMetricQueryService(
+ ActorSystem actorSystem,
+ ResourceID resourceID,
+ long maximumFramesize) {
+
String actorName = resourceID == null
? METRIC_QUERY_SERVICE_NAME
: METRIC_QUERY_SERVICE_NAME + "_" +
resourceID.getResourceIdString();
- return
actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
+
+ return
actorSystem.actorOf(Props.create(MetricQueryService.class, maximumFramesize),
actorName);
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 5f83e794ff9..1aab6f7de43 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -70,7 +70,10 @@ public Object getValue() {
Collections.<Meter, Tuple2<QueryScopeInfo,
String>>emptyMap());
// no metrics should be serialized
- Assert.assertEquals(0, output.serializedMetrics.length);
+ Assert.assertEquals(0, output.serializedCounters.length);
+ Assert.assertEquals(0, output.serializedGauges.length);
+ Assert.assertEquals(0, output.serializedHistograms.length);
+ Assert.assertEquals(0, output.serializedMeters.length);
List<MetricDump> deserialized =
deserializer.deserialize(output);
Assert.assertEquals(0, deserialized.size());
@@ -141,7 +144,8 @@ public long getCount() {
gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new
QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new
QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
- MetricDumpSerialization.MetricSerializationResult serialized =
serializer.serialize(counters, gauges, histograms, meters);
+ MetricDumpSerialization.MetricSerializationResult serialized =
serializer.serialize(
+ counters, gauges, histograms, meters);
List<MetricDump> deserialized =
deserializer.deserialize(serialized);
// ===== Counters
==============================================================================================
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 3767421b7d6..673409cca5f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.metrics.dump;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -25,10 +26,10 @@
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
+import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorRef;
@@ -38,6 +39,10 @@
import akka.testkit.TestActorRef;
import org.junit.Test;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -47,81 +52,119 @@
public class MetricQueryServiceTest extends TestLogger {
@Test
public void testCreateDump() throws Exception {
-
ActorSystem s = AkkaUtils.createLocalActorSystem(new
Configuration());
- ActorRef serviceActor =
MetricQueryService.startMetricQueryService(s, null);
- TestActorRef testActorRef = TestActorRef.create(s,
Props.create(TestActor.class));
- TestActor testActor = (TestActor)
testActorRef.underlyingActor();
-
- final Counter c = new SimpleCounter();
- final Gauge<String> g = new Gauge<String>() {
- @Override
- public String getValue() {
- return "Hello";
- }
- };
- final Histogram h = new TestHistogram();
- final Meter m = new Meter() {
+ try {
+ ActorRef serviceActor =
MetricQueryService.startMetricQueryService(s, null, Long.MAX_VALUE);
+ TestActorRef testActorRef = TestActorRef.create(s,
Props.create(TestActor.class));
+ TestActor testActor = (TestActor)
testActorRef.underlyingActor();
- @Override
- public void markEvent() {
- }
+ final Counter c = new SimpleCounter();
+ final Gauge<String> g = () -> "Hello";
+ final Histogram h = new TestHistogram();
+ final Meter m = new TestMeter();
- @Override
- public void markEvent(long n) {
- }
+ final TaskManagerMetricGroup tm =
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
- @Override
- public double getRate() {
- return 5;
- }
+ MetricQueryService.notifyOfAddedMetric(serviceActor, c,
"counter", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, g,
"gauge", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, h,
"histogram", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor, m,
"meter", tm);
+ serviceActor.tell(MetricQueryService.getCreateDump(),
testActorRef);
- @Override
- public long getCount() {
- return 10;
- }
- };
-
- MetricRegistryImpl registry = new
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
- final TaskManagerMetricGroup tm = new
TaskManagerMetricGroup(registry, "host", "id");
-
- MetricQueryService.notifyOfAddedMetric(serviceActor, c,
"counter", tm);
- MetricQueryService.notifyOfAddedMetric(serviceActor, g,
"gauge", tm);
- MetricQueryService.notifyOfAddedMetric(serviceActor, h,
"histogram", tm);
- MetricQueryService.notifyOfAddedMetric(serviceActor, m,
"meter", tm);
- serviceActor.tell(MetricQueryService.getCreateDump(),
testActorRef);
- synchronized (testActor.lock) {
- if (testActor.message == null) {
- testActor.lock.wait();
- }
- }
+ testActor.waitForResult();
- MetricDumpSerialization.MetricSerializationResult dump =
(MetricDumpSerialization.MetricSerializationResult) testActor.message;
- testActor.message = null;
- assertTrue(dump.serializedMetrics.length > 0);
+ MetricDumpSerialization.MetricSerializationResult dump
= testActor.getSerializationResult();
- MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
- MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
- MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
- MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
+ assertTrue(dump.serializedCounters.length > 0);
+ assertTrue(dump.serializedGauges.length > 0);
+ assertTrue(dump.serializedHistograms.length > 0);
+ assertTrue(dump.serializedMeters.length > 0);
- serviceActor.tell(MetricQueryService.getCreateDump(),
testActorRef);
- synchronized (testActor.lock) {
- if (testActor.message == null) {
- testActor.lock.wait();
- }
+ MetricQueryService.notifyOfRemovedMetric(serviceActor,
c);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor,
g);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor,
h);
+ MetricQueryService.notifyOfRemovedMetric(serviceActor,
m);
+
+ serviceActor.tell(MetricQueryService.getCreateDump(),
testActorRef);
+
+ testActor.waitForResult();
+
+ MetricDumpSerialization.MetricSerializationResult
emptyDump = testActor.getSerializationResult();
+
+ assertEquals(0, emptyDump.serializedCounters.length);
+ assertEquals(0, emptyDump.serializedGauges.length);
+ assertEquals(0, emptyDump.serializedHistograms.length);
+ assertEquals(0, emptyDump.serializedMeters.length);
+ } finally {
+ s.terminate();
}
+ }
+
+ @Test
+ public void testHandleOversizedMetricMessage() throws Exception {
+ ActorSystem s = AkkaUtils.createLocalActorSystem(new
Configuration());
+ try {
+ final long sizeLimit = 200L;
+ ActorRef serviceActor =
MetricQueryService.startMetricQueryService(s, null, sizeLimit);
+ TestActorRef testActorRef = TestActorRef.create(s,
Props.create(TestActor.class));
+ TestActor testActor = (TestActor)
testActorRef.underlyingActor();
+
+ final TaskManagerMetricGroup tm =
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
+
+ final String gaugeValue = "Hello";
+ final long requiredGaugesToExceedLimit = sizeLimit /
gaugeValue.length() + 1;
+ List<Tuple2<String, Gauge<String>>> gauges =
LongStream.range(0, requiredGaugesToExceedLimit)
+ .mapToObj(x -> Tuple2.of("gauge" + x,
(Gauge<String>) () -> "Hello" + x))
+ .collect(Collectors.toList());
+ gauges.forEach(gauge ->
MetricQueryService.notifyOfAddedMetric(serviceActor, gauge.f1, gauge.f0, tm));
+
+ MetricQueryService.notifyOfAddedMetric(serviceActor,
new SimpleCounter(), "counter", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor,
new TestHistogram(), "histogram", tm);
+ MetricQueryService.notifyOfAddedMetric(serviceActor,
new TestMeter(), "meter", tm);
+
+ serviceActor.tell(MetricQueryService.getCreateDump(),
testActorRef);
+ testActor.waitForResult();
- MetricDumpSerialization.MetricSerializationResult emptyDump =
(MetricDumpSerialization.MetricSerializationResult) testActor.message;
- testActor.message = null;
- assertEquals(0, emptyDump.serializedMetrics.length);
+ MetricDumpSerialization.MetricSerializationResult dump
= testActor.getSerializationResult();
- s.terminate();
+ assertTrue(dump.serializedCounters.length > 0);
+ assertEquals(1, dump.numCounters);
+ assertTrue(dump.serializedMeters.length > 0);
+ assertEquals(1, dump.numMeters);
+
+ // gauges exceeded the size limit and will be excluded
+ assertEquals(0, dump.serializedGauges.length);
+ assertEquals(0, dump.numGauges);
+
+ assertTrue(dump.serializedHistograms.length > 0);
+ assertEquals(1, dump.numHistograms);
+
+ // unregister all but one gauge to ensure gauges are
reported again if the remaining fit
+ for (int x = 1; x < gauges.size(); x++) {
+
MetricQueryService.notifyOfRemovedMetric(serviceActor, gauges.get(x).f1);
+ }
+
+ serviceActor.tell(MetricQueryService.getCreateDump(),
testActorRef);
+ testActor.waitForResult();
+
+ MetricDumpSerialization.MetricSerializationResult
recoveredDump = testActor.getSerializationResult();
+
+ assertTrue(recoveredDump.serializedCounters.length > 0);
+ assertEquals(1, recoveredDump.numCounters);
+ assertTrue(recoveredDump.serializedMeters.length > 0);
+ assertEquals(1, recoveredDump.numMeters);
+ assertTrue(recoveredDump.serializedGauges.length > 0);
+ assertEquals(1, recoveredDump.numGauges);
+ assertTrue(recoveredDump.serializedHistograms.length >
0);
+ assertEquals(1, recoveredDump.numHistograms);
+ } finally {
+ s.terminate();
+ }
}
private static class TestActor extends UntypedActor {
- public Object message;
- public Object lock = new Object();
+ private Object message;
+ private final Object lock = new Object();
@Override
public void onReceive(Object message) throws Exception {
@@ -130,5 +173,19 @@ public void onReceive(Object message) throws Exception {
lock.notifyAll();
}
}
+
+ void waitForResult() throws InterruptedException {
+ synchronized (lock) {
+ if (message == null) {
+ lock.wait();
+ }
+ }
+ }
+
+ MetricDumpSerialization.MetricSerializationResult
getSerializationResult() {
+ final MetricDumpSerialization.MetricSerializationResult
result = (MetricDumpSerialization.MetricSerializationResult) message;
+ message = null;
+ return result;
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index da8182a8ede..61c028ffab5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -99,7 +99,7 @@ public void testUpdate() throws Exception {
MetricDumpSerialization.MetricSerializationResult
requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
when(jmQueryService.queryMetrics(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new
MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+ .thenReturn(CompletableFuture.completedFuture(new
MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new
byte[0], new byte[0], 0, 0, 0, 0)));
when(tmQueryService.queryMetrics(any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services