[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717181#comment-16717181
 ] 

ASF GitHub Bot commented on FLINK-10252:
----------------------------------------

zentol closed pull request #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 7188a597c86..244a1ede5ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +27,8 @@
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,14 +69,18 @@
        // contains for every configured reporter its name and the 
configuration object
        private final List<Tuple2<String, Configuration>> 
reporterConfigurations;
 
+       private final long queryServiceMessageSizeLimit;
+
        public MetricRegistryConfiguration(
                ScopeFormats scopeFormats,
                char delimiter,
-               List<Tuple2<String, Configuration>> reporterConfigurations) {
+               List<Tuple2<String, Configuration>> reporterConfigurations,
+               long queryServiceMessageSizeLimit) {
 
                this.scopeFormats = Preconditions.checkNotNull(scopeFormats);
                this.delimiter = delimiter;
                this.reporterConfigurations = 
Preconditions.checkNotNull(reporterConfigurations);
+               this.queryServiceMessageSizeLimit = 
queryServiceMessageSizeLimit;
        }
 
        // 
------------------------------------------------------------------------
@@ -92,6 +99,10 @@ public char getDelimiter() {
                return reporterConfigurations;
        }
 
+       public long getQueryServiceMessageSizeLimit() {
+               return queryServiceMessageSizeLimit;
+       }
+
        // 
------------------------------------------------------------------------
        //  Static factory methods
        // 
------------------------------------------------------------------------
@@ -160,7 +171,15 @@ public static MetricRegistryConfiguration 
fromConfiguration(Configuration config
                        }
                }
 
-               return new MetricRegistryConfiguration(scopeFormats, delim, 
reporterConfigurations);
+               final String maxFrameSizeStr = 
configuration.getString(AkkaOptions.FRAMESIZE);
+               final String akkaConfigStr = String.format("akka {remote 
{netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
+               final Config akkaConfig = 
ConfigFactory.parseString(akkaConfigStr);
+               final long maximumFrameSize = 
akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+
+               // padding to account for serialization overhead
+               final long messageSizeLimitPadding = 256;
+
+               return new MetricRegistryConfiguration(scopeFormats, delim, 
reporterConfigurations, maximumFrameSize - messageSizeLimitPadding);
        }
 
        public static MetricRegistryConfiguration 
defaultMetricRegistryConfiguration() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index 6b3770907a9..31775e24276 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -77,6 +77,8 @@
 
        private final CompletableFuture<Void> terminationFuture;
 
+       private final long maximumFramesize;
+
        @Nullable
        private ActorRef queryService;
 
@@ -91,6 +93,7 @@
         * Creates a new MetricRegistry and starts the configured reporter.
         */
        public MetricRegistryImpl(MetricRegistryConfiguration config) {
+               this.maximumFramesize = 
config.getQueryServiceMessageSizeLimit();
                this.scopeFormats = config.getScopeFormats();
                this.globalDelimiter = config.getDelimiter();
                this.delimiters = new ArrayList<>(10);
@@ -184,7 +187,7 @@ public void startQueryService(ActorSystem actorSystem, 
ResourceID resourceID) {
                        Preconditions.checkState(!isShutdown(), "The metric 
registry has already been shut down.");
 
                        try {
-                               queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+                               queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID, 
maximumFramesize);
                                metricQueryServicePath = 
AkkaUtils.getAkkaURL(actorSystem, queryService);
                        } catch (Exception e) {
                                LOG.warn("Could not start MetricDumpActor. No 
metrics will be submitted to the WebInterface.", e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 16a885dd345..5456b56cdbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -73,19 +73,38 @@ private MetricDumpSerialization() {
 
                private static final long serialVersionUID = 
6928770855951536906L;
 
-               public final byte[] serializedMetrics;
+               public final byte[] serializedCounters;
+               public final byte[] serializedGauges;
+               public final byte[] serializedMeters;
+               public final byte[] serializedHistograms;
+
                public final int numCounters;
                public final int numGauges;
                public final int numMeters;
                public final int numHistograms;
 
-               public MetricSerializationResult(byte[] serializedMetrics, int 
numCounters, int numGauges, int numMeters, int numHistograms) {
-                       Preconditions.checkNotNull(serializedMetrics);
+               public MetricSerializationResult(
+                       byte[] serializedCounters,
+                       byte[] serializedGauges,
+                       byte[] serializedMeters,
+                       byte[] serializedHistograms,
+                       int numCounters,
+                       int numGauges,
+                       int numMeters,
+                       int numHistograms) {
+
+                       Preconditions.checkNotNull(serializedCounters);
+                       Preconditions.checkNotNull(serializedGauges);
+                       Preconditions.checkNotNull(serializedMeters);
+                       Preconditions.checkNotNull(serializedHistograms);
                        Preconditions.checkArgument(numCounters >= 0);
                        Preconditions.checkArgument(numGauges >= 0);
                        Preconditions.checkArgument(numMeters >= 0);
                        Preconditions.checkArgument(numHistograms >= 0);
-                       this.serializedMetrics = serializedMetrics;
+                       this.serializedCounters = serializedCounters;
+                       this.serializedGauges = serializedGauges;
+                       this.serializedMeters = serializedMeters;
+                       this.serializedHistograms = serializedHistograms;
                        this.numCounters = numCounters;
                        this.numGauges = numGauges;
                        this.numMeters = numMeters;
@@ -102,7 +121,10 @@ public MetricSerializationResult(byte[] serializedMetrics, 
int numCounters, int
         */
        public static class MetricDumpSerializer {
 
-               private DataOutputSerializer buffer = new 
DataOutputSerializer(1024 * 32);
+               private DataOutputSerializer countersBuffer = new 
DataOutputSerializer(1024 * 8);
+               private DataOutputSerializer gaugesBuffer = new 
DataOutputSerializer(1024 * 8);
+               private DataOutputSerializer metersBuffer = new 
DataOutputSerializer(1024 * 8);
+               private DataOutputSerializer histogramsBuffer = new 
DataOutputSerializer(1024 * 8);
 
                /**
                 * Serializes the given metrics and returns the resulting byte 
array.
@@ -126,53 +148,66 @@ public MetricSerializationResult serialize(
                        Map<Histogram, Tuple2<QueryScopeInfo, String>> 
histograms,
                        Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
 
-                       buffer.clear();
-
+                       countersBuffer.clear();
                        int numCounters = 0;
                        for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> 
entry : counters.entrySet()) {
                                try {
-                                       serializeCounter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+                                       serializeCounter(countersBuffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
                                        numCounters++;
                                } catch (Exception e) {
                                        LOG.debug("Failed to serialize 
counter.", e);
                                }
                        }
 
+                       gaugesBuffer.clear();
                        int numGauges = 0;
                        for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, 
String>> entry : gauges.entrySet()) {
                                try {
-                                       serializeGauge(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+                                       serializeGauge(gaugesBuffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
                                        numGauges++;
                                } catch (Exception e) {
                                        LOG.debug("Failed to serialize gauge.", 
e);
                                }
                        }
 
+                       histogramsBuffer.clear();
                        int numHistograms = 0;
                        for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, 
String>> entry : histograms.entrySet()) {
                                try {
-                                       serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+                                       serializeHistogram(histogramsBuffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
                                        numHistograms++;
                                } catch (Exception e) {
                                        LOG.debug("Failed to serialize 
histogram.", e);
                                }
                        }
 
+                       metersBuffer.clear();
                        int numMeters = 0;
                        for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> 
entry : meters.entrySet()) {
                                try {
-                                       serializeMeter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+                                       serializeMeter(metersBuffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
                                        numMeters++;
                                } catch (Exception e) {
                                        LOG.debug("Failed to serialize meter.", 
e);
                                }
                        }
 
-                       return new 
MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, 
numMeters, numHistograms);
+                       return new MetricSerializationResult(
+                               countersBuffer.getCopyOfBuffer(),
+                               gaugesBuffer.getCopyOfBuffer(),
+                               metersBuffer.getCopyOfBuffer(),
+                               histogramsBuffer.getCopyOfBuffer(),
+                               numCounters,
+                               numGauges,
+                               numMeters,
+                               numHistograms);
                }
 
                public void close() {
-                       buffer = null;
+                       countersBuffer = null;
+                       gaugesBuffer = null;
+                       metersBuffer = null;
+                       histogramsBuffer = null;
                }
        }
 
@@ -280,13 +315,16 @@ private static void serializeMeter(DataOutput out, 
QueryScopeInfo info, String n
                 * @return A list containing the deserialized metrics.
                 */
                public List<MetricDump> 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
-                       DataInputView in = new 
DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
+                       DataInputView countersInputView = new 
DataInputDeserializer(data.serializedCounters, 0, 
data.serializedCounters.length);
+                       DataInputView gaugesInputView = new 
DataInputDeserializer(data.serializedGauges, 0, data.serializedGauges.length);
+                       DataInputView metersInputView = new 
DataInputDeserializer(data.serializedMeters, 0, data.serializedMeters.length);
+                       DataInputView histogramsInputView = new 
DataInputDeserializer(data.serializedHistograms, 0, 
data.serializedHistograms.length);
 
-                       List<MetricDump> metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
+                       List<MetricDump> metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numMeters + 
data.numHistograms);
 
                        for (int x = 0; x < data.numCounters; x++) {
                                try {
-                                       metrics.add(deserializeCounter(in));
+                                       
metrics.add(deserializeCounter(countersInputView));
                                } catch (Exception e) {
                                        LOG.debug("Failed to deserialize 
counter.", e);
                                }
@@ -294,25 +332,25 @@ private static void serializeMeter(DataOutput out, 
QueryScopeInfo info, String n
 
                        for (int x = 0; x < data.numGauges; x++) {
                                try {
-                                       metrics.add(deserializeGauge(in));
+                                       
metrics.add(deserializeGauge(gaugesInputView));
                                } catch (Exception e) {
                                        LOG.debug("Failed to deserialize 
gauge.", e);
                                }
                        }
 
-                       for (int x = 0; x < data.numHistograms; x++) {
+                       for (int x = 0; x < data.numMeters; x++) {
                                try {
-                                       metrics.add(deserializeHistogram(in));
+                                       
metrics.add(deserializeMeter(metersInputView));
                                } catch (Exception e) {
-                                       LOG.debug("Failed to deserialize 
histogram.", e);
+                                       LOG.debug("Failed to deserialize 
meter.", e);
                                }
                        }
 
-                       for (int x = 0; x < data.numMeters; x++) {
+                       for (int x = 0; x < data.numHistograms; x++) {
                                try {
-                                       metrics.add(deserializeMeter(in));
+                                       
metrics.add(deserializeHistogram(histogramsInputView));
                                } catch (Exception e) {
-                                       LOG.debug("Failed to deserialize 
meter.", e);
+                                       LOG.debug("Failed to deserialize 
histogram.", e);
                                }
                        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d9f4a..fc69d17503d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -55,6 +55,7 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
 
        public static final String METRIC_QUERY_SERVICE_NAME = 
"MetricQueryService";
+       private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not 
be reported as the metric dump would exceed the maximum size of {} bytes.";
 
        private static final CharacterFilter FILTER = new CharacterFilter() {
                @Override
@@ -70,6 +71,12 @@ public String filterCharacters(String input) {
        private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms 
= new HashMap<>();
        private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new 
HashMap<>();
 
+       private final long messageSizeLimit;
+
+       public MetricQueryService(long messageSizeLimit) {
+               this.messageSizeLimit = messageSizeLimit;
+       }
+
        @Override
        public void postStop() {
                serializer.close();
@@ -109,6 +116,9 @@ public void onReceive(Object message) {
                                }
                        } else if (message instanceof CreateDump) {
                                
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
+
+                               dump = enforceSizeLimit(dump);
+
                                getSender().tell(dump, getSelf());
                        } else {
                                LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
@@ -119,6 +129,83 @@ public void onReceive(Object message) {
                }
        }
 
+       private MetricDumpSerialization.MetricSerializationResult 
enforceSizeLimit(
+               MetricDumpSerialization.MetricSerializationResult 
serializationResult) {
+
+               int currentLength = 0;
+               boolean hasExceededBefore = false;
+
+               byte[] serializedCounters = 
serializationResult.serializedCounters;
+               int numCounters = serializationResult.numCounters;
+               if (exceedsMessageSizeLimit(currentLength + 
serializationResult.serializedCounters.length)) {
+                       logDumpSizeWouldExceedLimit("Counters", 
hasExceededBefore);
+                       hasExceededBefore = true;
+
+                       serializedCounters = new byte[0];
+                       numCounters = 0;
+               } else {
+                       currentLength += serializedCounters.length;
+               }
+
+               byte[] serializedMeters = serializationResult.serializedMeters;
+               int numMeters = serializationResult.numMeters;
+               if (exceedsMessageSizeLimit(currentLength + 
serializationResult.serializedMeters.length)) {
+                       logDumpSizeWouldExceedLimit("Meters", 
hasExceededBefore);
+                       hasExceededBefore = true;
+
+                       serializedMeters = new byte[0];
+                       numMeters = 0;
+               } else {
+                       currentLength += serializedMeters.length;
+               }
+
+               byte[] serializedGauges = serializationResult.serializedGauges;
+               int numGauges = serializationResult.numGauges;
+               if (exceedsMessageSizeLimit(currentLength + 
serializationResult.serializedGauges.length)) {
+                       logDumpSizeWouldExceedLimit("Gauges", 
hasExceededBefore);
+                       hasExceededBefore = true;
+
+                       serializedGauges = new byte[0];
+                       numGauges = 0;
+               } else {
+                       currentLength += serializedGauges.length;
+               }
+
+               byte[] serializedHistograms = 
serializationResult.serializedHistograms;
+               int numHistograms = serializationResult.numHistograms;
+               if (exceedsMessageSizeLimit(currentLength + 
serializationResult.serializedHistograms.length)) {
+                       logDumpSizeWouldExceedLimit("Histograms", 
hasExceededBefore);
+                       hasExceededBefore = true;
+
+                       serializedHistograms = new byte[0];
+                       numHistograms = 0;
+               }
+
+               return new MetricDumpSerialization.MetricSerializationResult(
+                       serializedCounters,
+                       serializedGauges,
+                       serializedMeters,
+                       serializedHistograms,
+                       numCounters,
+                       numGauges,
+                       numMeters,
+                       numHistograms);
+       }
+
+       private boolean exceedsMessageSizeLimit(final int currentSize) {
+               return currentSize > messageSizeLimit;
+       }
+
+       private void logDumpSizeWouldExceedLimit(final String metricType, 
boolean hasExceededBefore) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(SIZE_EXCEEDED_LOG_TEMPLATE, metricType, 
messageSizeLimit);
+               } else {
+                       if (!hasExceededBefore) {
+                               LOG.info(SIZE_EXCEEDED_LOG_TEMPLATE, "Some 
metrics", messageSizeLimit);
+                       }
+               }
+       }
+
        /**
         * Lightweight method to replace unsupported characters.
         * If the string does not contain any unsupported characters, this 
method creates no
@@ -165,11 +252,16 @@ static String replaceInvalidChars(String str) {
         * @param resourceID resource ID to disambiguate the actor name
         * @return actor reference to the MetricQueryService
         */
-       public static ActorRef startMetricQueryService(ActorSystem actorSystem, 
ResourceID resourceID) {
+       public static ActorRef startMetricQueryService(
+               ActorSystem actorSystem,
+               ResourceID resourceID,
+               long maximumFramesize) {
+
                String actorName = resourceID == null
                        ? METRIC_QUERY_SERVICE_NAME
                        : METRIC_QUERY_SERVICE_NAME + "_" + 
resourceID.getResourceIdString();
-               return 
actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
+
+               return 
actorSystem.actorOf(Props.create(MetricQueryService.class, maximumFramesize), 
actorName);
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 5f83e794ff9..1aab6f7de43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -70,7 +70,10 @@ public Object getValue() {
                        Collections.<Meter, Tuple2<QueryScopeInfo, 
String>>emptyMap());
 
                // no metrics should be serialized
-               Assert.assertEquals(0, output.serializedMetrics.length);
+               Assert.assertEquals(0, output.serializedCounters.length);
+               Assert.assertEquals(0, output.serializedGauges.length);
+               Assert.assertEquals(0, output.serializedHistograms.length);
+               Assert.assertEquals(0, output.serializedMeters.length);
 
                List<MetricDump> deserialized = 
deserializer.deserialize(output);
                Assert.assertEquals(0, deserialized.size());
@@ -141,7 +144,8 @@ public long getCount() {
                gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
                histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
 
-               MetricDumpSerialization.MetricSerializationResult serialized = 
serializer.serialize(counters, gauges, histograms, meters);
+               MetricDumpSerialization.MetricSerializationResult serialized = 
serializer.serialize(
+                       counters, gauges, histograms, meters);
                List<MetricDump> deserialized = 
deserializer.deserialize(serialized);
 
                // ===== Counters 
==============================================================================================
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 3767421b7d6..673409cca5f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.metrics.dump;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -25,10 +26,10 @@
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
@@ -38,6 +39,10 @@
 import akka.testkit.TestActorRef;
 import org.junit.Test;
 
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -47,81 +52,119 @@
 public class MetricQueryServiceTest extends TestLogger {
        @Test
        public void testCreateDump() throws Exception {
-
                ActorSystem s = AkkaUtils.createLocalActorSystem(new 
Configuration());
-               ActorRef serviceActor = 
MetricQueryService.startMetricQueryService(s, null);
-               TestActorRef testActorRef = TestActorRef.create(s, 
Props.create(TestActor.class));
-               TestActor testActor = (TestActor) 
testActorRef.underlyingActor();
-
-               final Counter c = new SimpleCounter();
-               final Gauge<String> g = new Gauge<String>() {
-                       @Override
-                       public String getValue() {
-                               return "Hello";
-                       }
-               };
-               final Histogram h = new TestHistogram();
-               final Meter m = new Meter() {
+               try {
+                       ActorRef serviceActor = 
MetricQueryService.startMetricQueryService(s, null, Long.MAX_VALUE);
+                       TestActorRef testActorRef = TestActorRef.create(s, 
Props.create(TestActor.class));
+                       TestActor testActor = (TestActor) 
testActorRef.underlyingActor();
 
-                       @Override
-                       public void markEvent() {
-                       }
+                       final Counter c = new SimpleCounter();
+                       final Gauge<String> g = () -> "Hello";
+                       final Histogram h = new TestHistogram();
+                       final Meter m = new TestMeter();
 
-                       @Override
-                       public void markEvent(long n) {
-                       }
+                       final TaskManagerMetricGroup tm = 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
-                       @Override
-                       public double getRate() {
-                               return 5;
-                       }
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, c, 
"counter", tm);
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, g, 
"gauge", tm);
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, h, 
"histogram", tm);
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, m, 
"meter", tm);
+                       serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
 
-                       @Override
-                       public long getCount() {
-                               return 10;
-                       }
-               };
-
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               final TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
-
-               MetricQueryService.notifyOfAddedMetric(serviceActor, c, 
"counter", tm);
-               MetricQueryService.notifyOfAddedMetric(serviceActor, g, 
"gauge", tm);
-               MetricQueryService.notifyOfAddedMetric(serviceActor, h, 
"histogram", tm);
-               MetricQueryService.notifyOfAddedMetric(serviceActor, m, 
"meter", tm);
-               serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
-               synchronized (testActor.lock) {
-                       if (testActor.message == null) {
-                               testActor.lock.wait();
-                       }
-               }
+                       testActor.waitForResult();
 
-               MetricDumpSerialization.MetricSerializationResult dump = 
(MetricDumpSerialization.MetricSerializationResult) testActor.message;
-               testActor.message = null;
-               assertTrue(dump.serializedMetrics.length > 0);
+                       MetricDumpSerialization.MetricSerializationResult dump 
= testActor.getSerializationResult();
 
-               MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
-               MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
-               MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
-               MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
+                       assertTrue(dump.serializedCounters.length > 0);
+                       assertTrue(dump.serializedGauges.length > 0);
+                       assertTrue(dump.serializedHistograms.length > 0);
+                       assertTrue(dump.serializedMeters.length > 0);
 
-               serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
-               synchronized (testActor.lock) {
-                       if (testActor.message == null) {
-                               testActor.lock.wait();
-                       }
+                       MetricQueryService.notifyOfRemovedMetric(serviceActor, 
c);
+                       MetricQueryService.notifyOfRemovedMetric(serviceActor, 
g);
+                       MetricQueryService.notifyOfRemovedMetric(serviceActor, 
h);
+                       MetricQueryService.notifyOfRemovedMetric(serviceActor, 
m);
+
+                       serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
+
+                       testActor.waitForResult();
+
+                       MetricDumpSerialization.MetricSerializationResult 
emptyDump = testActor.getSerializationResult();
+
+                       assertEquals(0, emptyDump.serializedCounters.length);
+                       assertEquals(0, emptyDump.serializedGauges.length);
+                       assertEquals(0, emptyDump.serializedHistograms.length);
+                       assertEquals(0, emptyDump.serializedMeters.length);
+               } finally {
+                       s.terminate();
                }
+       }
+
+       @Test
+       public void testHandleOversizedMetricMessage() throws Exception {
+               ActorSystem s = AkkaUtils.createLocalActorSystem(new 
Configuration());
+               try {
+                       final long sizeLimit = 200L;
+                       ActorRef serviceActor = 
MetricQueryService.startMetricQueryService(s, null, sizeLimit);
+                       TestActorRef testActorRef = TestActorRef.create(s, 
Props.create(TestActor.class));
+                       TestActor testActor = (TestActor) 
testActorRef.underlyingActor();
+
+                       final TaskManagerMetricGroup tm = 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
+
+                       final String gaugeValue = "Hello";
+                       final long requiredGaugesToExceedLimit = sizeLimit / 
gaugeValue.length() + 1;
+                       List<Tuple2<String, Gauge<String>>> gauges = 
LongStream.range(0, requiredGaugesToExceedLimit)
+                               .mapToObj(x -> Tuple2.of("gauge" + x, 
(Gauge<String>) () -> "Hello" + x))
+                               .collect(Collectors.toList());
+                       gauges.forEach(gauge -> 
MetricQueryService.notifyOfAddedMetric(serviceActor, gauge.f1, gauge.f0, tm));
+
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, 
new SimpleCounter(), "counter", tm);
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, 
new TestHistogram(), "histogram", tm);
+                       MetricQueryService.notifyOfAddedMetric(serviceActor, 
new TestMeter(), "meter", tm);
+
+                       serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
+                       testActor.waitForResult();
 
-               MetricDumpSerialization.MetricSerializationResult emptyDump = 
(MetricDumpSerialization.MetricSerializationResult) testActor.message;
-               testActor.message = null;
-               assertEquals(0, emptyDump.serializedMetrics.length);
+                       MetricDumpSerialization.MetricSerializationResult dump 
= testActor.getSerializationResult();
 
-               s.terminate();
+                       assertTrue(dump.serializedCounters.length > 0);
+                       assertEquals(1, dump.numCounters);
+                       assertTrue(dump.serializedMeters.length > 0);
+                       assertEquals(1, dump.numMeters);
+
+                       // gauges exceeded the size limit and will be excluded
+                       assertEquals(0, dump.serializedGauges.length);
+                       assertEquals(0, dump.numGauges);
+
+                       assertTrue(dump.serializedHistograms.length > 0);
+                       assertEquals(1, dump.numHistograms);
+
+                       // unregister all but one gauge to ensure gauges are 
reported again if the remaining fit
+                       for (int x = 1; x < gauges.size(); x++) {
+                               
MetricQueryService.notifyOfRemovedMetric(serviceActor, gauges.get(x).f1);
+                       }
+
+                       serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
+                       testActor.waitForResult();
+
+                       MetricDumpSerialization.MetricSerializationResult 
recoveredDump = testActor.getSerializationResult();
+
+                       assertTrue(recoveredDump.serializedCounters.length > 0);
+                       assertEquals(1, recoveredDump.numCounters);
+                       assertTrue(recoveredDump.serializedMeters.length > 0);
+                       assertEquals(1, recoveredDump.numMeters);
+                       assertTrue(recoveredDump.serializedGauges.length > 0);
+                       assertEquals(1, recoveredDump.numGauges);
+                       assertTrue(recoveredDump.serializedHistograms.length > 
0);
+                       assertEquals(1, recoveredDump.numHistograms);
+               } finally {
+                       s.terminate();
+               }
        }
 
        private static class TestActor extends UntypedActor {
-               public Object message;
-               public Object lock = new Object();
+               private Object message;
+               private final Object lock = new Object();
 
                @Override
                public void onReceive(Object message) throws Exception {
@@ -130,5 +173,19 @@ public void onReceive(Object message) throws Exception {
                                lock.notifyAll();
                        }
                }
+
+               void waitForResult() throws InterruptedException {
+                       synchronized (lock) {
+                               if (message == null) {
+                                       lock.wait();
+                               }
+                       }
+               }
+
+               MetricDumpSerialization.MetricSerializationResult 
getSerializationResult() {
+                       final MetricDumpSerialization.MetricSerializationResult 
result = (MetricDumpSerialization.MetricSerializationResult) message;
+                       message = null;
+                       return result;
+               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index da8182a8ede..61c028ffab5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -99,7 +99,7 @@ public void testUpdate() throws Exception {
                MetricDumpSerialization.MetricSerializationResult 
requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
 
                when(jmQueryService.queryMetrics(any(Time.class)))
-                       .thenReturn(CompletableFuture.completedFuture(new 
MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+                       .thenReturn(CompletableFuture.completedFuture(new 
MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new 
byte[0], new byte[0], 0, 0, 0, 0)));
                when(tmQueryService.queryMetrics(any(Time.class)))
                        
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> -------------------------------
>
>                 Key: FLINK-10252
>                 URL: https://issues.apache.org/jira/browse/FLINK-10252
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Metrics
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: vinoyang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to