This is an automated email from the ASF dual-hosted git repository. agresch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 3cc4797 STORM-3682 Upgrade netty client metrics to use V2 API (#3371) 3cc4797 is described below commit 3cc4797e56cf30dc04c8c8dcf133534522cc87eb Author: agresch <agre...@gmail.com> AuthorDate: Tue Jan 19 11:06:56 2021 -0600 STORM-3682 Upgrade netty client metrics to use V2 API (#3371) * STORM-3682 Upgrade netty client metrics to use V2 API --- docs/Metrics.md | 20 +----- .../storm/daemon/metrics/BuiltinMetricsUtil.java | 23 ------- .../apache/storm/daemon/worker/WorkerState.java | 3 +- .../apache/storm/executor/bolt/BoltExecutor.java | 2 - .../jvm/org/apache/storm/messaging/IContext.java | 12 ++++ .../apache/storm/messaging/TransportFactory.java | 6 +- .../org/apache/storm/messaging/netty/Client.java | 78 ++++++++++++++++------ .../org/apache/storm/messaging/netty/Context.java | 10 ++- .../jvm/org/apache/storm/metrics2/RateCounter.java | 4 ++ .../apache/storm/metrics2/StormMetricRegistry.java | 45 +++++++++++++ .../org/apache/storm/metrics2/TaskMetricRepo.java | 77 +++++++++++---------- .../apache/storm/messaging/netty/NettyTest.java | 14 ++-- 12 files changed, 188 insertions(+), 106 deletions(-) diff --git a/docs/Metrics.md b/docs/Metrics.md index 561f2f0..9d21f50 100644 --- a/docs/Metrics.md +++ b/docs/Metrics.md @@ -268,25 +268,11 @@ Be aware that the `__system` bolt is an actual bolt so regular bolt metrics desc ##### Send (Netty Client) -The `__send-iconnection` metric holds information about all of the clients for this worker. It is of the form +The `__send-iconnection` metrics report information about all of the clients for this worker. They are named __send-iconnection-METRIC_TYPE-HOST:PORT for a given Client that is +connected to a worker with the given host/port. -``` -{ - NodeInfo(node:7decee4b-c314-41f4-b362-fd1358c985b3-127.0.01, port:[6701]): { - "reconnects": 0, - "src": "/127.0.0.1:49951", - "pending": 0, - "dest": "localhost/127.0.0.1:6701", - "sent": 420779, - "lostOnSend": 0 - } -} -``` - -The value is a map where the key is a NodeInfo class for the downstream worker it is sending messages to. This is the SupervisorId + port. The value is another map with the fields +The metric types reported for each client are: - * `src` What host/port this client has used to connect to the receiving worker. - * `dest` What host/port this client has connected to. * `reconnects` the number of reconnections that have happened. * `pending` the number of messages that have not been sent. (This corresponds to messages, not tuples) * `sent` the number of messages that have been sent. (This is messages not tuples) diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java index 72d660f..828d7ea 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java @@ -12,16 +12,12 @@ package org.apache.storm.daemon.metrics; -import java.util.HashMap; import java.util.Map; import org.apache.storm.Config; -import org.apache.storm.generated.NodeInfo; -import org.apache.storm.messaging.IConnection; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.api.StateMetric; import org.apache.storm.task.TopologyContext; -import org.apache.storm.utils.JCQueue; public class BuiltinMetricsUtil { public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) { @@ -30,25 +26,6 @@ public class BuiltinMetricsUtil { } } - public static void registerIconnectionClientMetrics(final Map<NodeInfo, IConnection> nodePortToSocket, Map<String, Object> topoConf, - TopologyContext context) { - IMetric metric = new IMetric() { - @Override - public Object getValueAndReset() { - Map<Object, Object> ret = new HashMap<>(); - for (Map.Entry<NodeInfo, IConnection> entry : nodePortToSocket.entrySet()) { - NodeInfo nodePort = entry.getKey(); - IConnection connection = entry.getValue(); - if (connection instanceof IStatefulObject) { - ret.put(nodePort, ((IStatefulObject) connection).getState()); - } - } - return ret; - } - }; - registerMetric("__send-iconnection", metric, topoConf, context); - } - public static void registerMetric(String name, IMetric metric, Map<String, Object> topoConf, TopologyContext context) { int bucketSize = ((Number) topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue(); context.registerMetric(name, metric, bucketSize); diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index ab4a8b6..f7aa451 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -174,7 +174,8 @@ public class WorkerState { this.credentialsAtom = new AtomicReference(initialCredentials); this.conf = conf; this.supervisorIfaceSupplier = supervisorIfaceSupplier; - this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf); + this.mqContext = (null != mqContext) ? mqContext : + TransportFactory.makeContext(topologyConf, metricRegistry); this.topologyId = topologyId; this.assignmentId = assignmentId; this.port = port; diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java index ddd830d..293d756 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java @@ -114,8 +114,6 @@ public class BoltExecutor extends Executor { ((ICredentialsListener) boltObject).setCredentials(credentials); } if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) { - Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get(); - BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext); BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext); // add any autocredential expiry metrics from the worker diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java index ac56a8a..8b1183b 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java @@ -15,6 +15,7 @@ package org.apache.storm.messaging; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import org.apache.storm.metrics2.StormMetricRegistry; /** * This interface needs to be implemented for messaging plugin. @@ -30,9 +31,20 @@ public interface IContext { * * @param topoConf storm configuration */ + @Deprecated void prepare(Map<String, Object> topoConf); /** + * This method is invoked at the startup of messaging plugin. + * + * @param topoConf storm configuration + * @param metricRegistry storm metric registry + */ + default void prepare(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) { + prepare(topoConf); + } + + /** * This method is invoked when a worker is unloading a messaging plugin. */ void term(); diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java index cc48eca..fde5344 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java @@ -15,13 +15,14 @@ package org.apache.storm.messaging; import java.lang.reflect.Method; import java.util.Map; import org.apache.storm.Config; +import org.apache.storm.metrics2.StormMetricRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TransportFactory { public static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class); - public static IContext makeContext(Map<String, Object> topoConf) { + public static IContext makeContext(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) { //get factory class name String transportPluginClassName = (String) topoConf.get(Config.STORM_MESSAGING_TRANSPORT); @@ -37,9 +38,10 @@ public class TransportFactory { //case 1: plugin is a IContext class transport = (IContext) obj; //initialize with storm configuration - transport.prepare(topoConf); + transport.prepare(topoConf, metricRegistry); } else { //case 2: Non-IContext plugin must have a makeContext(topoConf) method that returns IContext object + // StormMetricRegistry is ignored if IContext is created this way Method method = klass.getMethod("makeContext", Map.class); LOG.debug("object:" + obj + " method:" + method); transport = (IContext) method.invoke(obj, topoConf); diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java index 5819924..9f54fbd 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java @@ -14,13 +14,17 @@ package org.apache.storm.messaging.netty; import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,10 +32,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.storm.Config; +import org.apache.storm.Constants; import org.apache.storm.grouping.Load; import org.apache.storm.messaging.ConnectionWithStatus; import org.apache.storm.messaging.TaskMessage; -import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.policy.IWaitStrategy.WaitSituation; import org.apache.storm.policy.WaitStrategyProgressive; @@ -63,7 +68,7 @@ import org.slf4j.LoggerFactory; * asynchronously. Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote * destination is currently unavailable. */ -public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient { +public class Client extends ConnectionWithStatus implements ISaslClient { private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L; /** @@ -119,10 +124,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa * This flag is set to true if and only if a client instance is being closed. */ private volatile boolean closing = false; + StormMetricRegistry metricRegistry; + private Set<Metric> metrics = new HashSet<>(); Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host, - int port) { + int port, StormMetricRegistry metricRegistry) { this.topoConf = topoConf; closing = false; this.scheduler = scheduler; @@ -163,6 +170,50 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa waitStrategy = ReflectionUtils.newInstance(clazz); } waitStrategy.prepare(topoConf, WaitSituation.BACK_PRESSURE_WAIT); + this.metricRegistry = metricRegistry; + + // it's possible to be passed a null metric registry if users are using their own IContext implementation. + if (this.metricRegistry != null) { + Gauge<Integer> reconnects = new Gauge<Integer>() { + @Override + public Integer getValue() { + return totalConnectionAttempts.get(); + } + }; + metricRegistry.gauge("__send-iconnection-reconnects-" + host + ":" + port, reconnects, + Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID); + metrics.add(reconnects); + + Gauge<Integer> sent = new Gauge<Integer>() { + @Override + public Integer getValue() { + return messagesSent.get(); + } + }; + metricRegistry.gauge("__send-iconnection-sent-" + host + ":" + port, sent, + Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID); + metrics.add(sent); + + Gauge<Long> pending = new Gauge<Long>() { + @Override + public Long getValue() { + return pendingMessages.get(); + } + }; + metricRegistry.gauge("__send-iconnection-pending-" + host + ":" + port, pending, + Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID); + metrics.add(pending); + + Gauge<Integer> lostOnSend = new Gauge<Integer>() { + @Override + public Integer getValue() { + return messagesLost.get(); + } + }; + metricRegistry.gauge("__send-iconnection-lostOnSend-" + host + ":" + port, lostOnSend, + Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID); + metrics.add(lostOnSend); + } } /** @@ -415,6 +466,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa closing = true; waitForPendingMessagesToBeSent(); closeChannel(); + + // stop tracking metrics for this client + if (this.metricRegistry != null) { + this.metricRegistry.deregister(this.metrics); + } } } @@ -467,22 +523,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa return ret; } - @Override - public Object getState() { - LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName); - HashMap<String, Object> ret = new HashMap<String, Object>(); - ret.put("reconnects", totalConnectionAttempts.getAndSet(0)); - ret.put("sent", messagesSent.getAndSet(0)); - ret.put("pending", pendingMessages.get()); - ret.put("lostOnSend", messagesLost.getAndSet(0)); - ret.put("dest", dstAddress.toString()); - String src = srcAddressName(); - if (src != null) { - ret.put("src", src); - } - return ret; - } - public Map<String, Object> getConfig() { return topoConf; } diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java index 03feaf8..a1384cb 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java @@ -22,6 +22,7 @@ import org.apache.storm.Config; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.IConnectionCallback; import org.apache.storm.messaging.IContext; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.shade.io.netty.channel.EventLoopGroup; import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup; import org.apache.storm.shade.io.netty.util.HashedWheelTimer; @@ -32,12 +33,18 @@ public class Context implements IContext { private List<Server> serverConnections; private EventLoopGroup workerEventLoopGroup; private HashedWheelTimer clientScheduleService; + private StormMetricRegistry metricRegistry = null; /** * initialization per Storm configuration. */ @Override public void prepare(Map<String, Object> topoConf) { + prepare(topoConf, null); + } + + @Override + public void prepare(Map<String, Object> topoConf, StormMetricRegistry metricRegistry) { this.topoConf = topoConf; serverConnections = new ArrayList<>(); @@ -49,6 +56,7 @@ public class Context implements IContext { this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory); clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service")); + this.metricRegistry = metricRegistry; } /** @@ -67,7 +75,7 @@ public class Context implements IContext { @Override public IConnection connect(String stormId, String host, int port, AtomicBoolean[] remoteBpStatus) { return new Client(topoConf, remoteBpStatus, workerEventLoopGroup, - clientScheduleService, host, port); + clientScheduleService, host, port, metricRegistry); } /** diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java index 77b6720..6f5c32c 100644 --- a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java +++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java @@ -59,4 +59,8 @@ public class RateCounter implements Gauge<Double> { values[time] = counter.getCount(); currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds); } + + Counter getCounter() { + return counter; + } } diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index 6863503..3ea7a69 100644 --- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -18,6 +18,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Timer; @@ -25,6 +26,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -84,6 +86,7 @@ public class StormMetricRegistry implements MetricRegistryProvider { return gauge; } + @Deprecated public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) { MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port); gauge = registerGauge(metricNames, gauge, taskId, componentId, null); @@ -91,6 +94,13 @@ public class StormMetricRegistry implements MetricRegistryProvider { return gauge; } + public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String componentId, Integer taskId) { + MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port); + gauge = registerGauge(metricNames, gauge, taskId, componentId, null); + saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges); + return gauge; + } + public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, String streamId, Integer taskId, Integer port) { MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, port); @@ -235,6 +245,14 @@ public class StormMetricRegistry implements MetricRegistryProvider { return histogram; } + public void deregister(Set<Metric> toRemove) { + MetricFilter metricFilter = new RemoveMetricFilter(toRemove); + for (TaskMetricRepo taskMetricRepo : taskMetrics.values()) { + taskMetricRepo.degister(metricFilter); + } + registry.removeMatching(metricFilter); + } + private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) { Map<String, T> ret = new HashMap<>(); Map<String, T> taskMetrics = taskIdMetrics.getOrDefault(taskId, Collections.emptyMap()); @@ -432,4 +450,31 @@ public class StormMetricRegistry implements MetricRegistryProvider { } } } + + private static class RemoveMetricFilter implements MetricFilter { + private Set<Metric> metrics = new HashSet<>(); + + RemoveMetricFilter(Set<Metric> toRemove) { + this.metrics.addAll(toRemove); + for (Metric metric : toRemove) { + // RateCounters are gauges, but also have internal Counters that should also be removed + if (metric instanceof RateCounter) { + RateCounter rateCounter = (RateCounter) metric; + this.metrics.add(rateCounter.getCounter()); + } + } + } + + /** + * Returns {@code true} if the metric matches the filter; {@code false} otherwise. + * + * @param name the metric's name + * @param metric the metric + * @return {@code true} if the metric matches the filter + */ + @Override + public boolean matches(String name, Metric metric) { + return this.metrics.contains(metric); + } + } } diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java index e88b64f..785271c 100644 --- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java +++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java @@ -22,16 +22,17 @@ import com.codahale.metrics.Timer; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; /** * Metric repository to allow reporting of task-specific metrics. */ public class TaskMetricRepo { - private SortedMap<String, Gauge> gauges = new TreeMap<>(); - private SortedMap<String, Counter> counters = new TreeMap<>(); - private SortedMap<String, Histogram> histograms = new TreeMap<>(); - private SortedMap<String, Meter> meters = new TreeMap<>(); - private SortedMap<String, Timer> timers = new TreeMap<>(); + private ConcurrentHashMap<String, Gauge> gauges = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Histogram> histograms = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Meter> meters = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Timer> timers = new ConcurrentHashMap<>(); public void addCounter(String name, Counter counter) { counters.put(name, counter); @@ -54,41 +55,49 @@ public class TaskMetricRepo { } public void report(ScheduledReporter reporter, MetricFilter filter) { - if (filter != null) { - SortedMap<String, Gauge> filteredGauges = new TreeMap<>(); - SortedMap<String, Counter> filteredCounters = new TreeMap<>(); - SortedMap<String, Histogram> filteredHistograms = new TreeMap<>(); - SortedMap<String, Meter> filteredMeters = new TreeMap<>(); - SortedMap<String, Timer> filteredTimers = new TreeMap<>(); + if (filter == null) { + filter = MetricFilter.ALL; + } + + SortedMap<String, Gauge> filteredGauges = new TreeMap<>(); + SortedMap<String, Counter> filteredCounters = new TreeMap<>(); + SortedMap<String, Histogram> filteredHistograms = new TreeMap<>(); + SortedMap<String, Meter> filteredMeters = new TreeMap<>(); + SortedMap<String, Timer> filteredTimers = new TreeMap<>(); - for (Map.Entry<String, Gauge> entry : gauges.entrySet()) { - if (filter.matches(entry.getKey(), entry.getValue())) { - filteredGauges.put(entry.getKey(), entry.getValue()); - } + for (Map.Entry<String, Gauge> entry : gauges.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + filteredGauges.put(entry.getKey(), entry.getValue()); } - for (Map.Entry<String, Counter> entry : counters.entrySet()) { - if (filter.matches(entry.getKey(), entry.getValue())) { - filteredCounters.put(entry.getKey(), entry.getValue()); - } + } + for (Map.Entry<String, Counter> entry : counters.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + filteredCounters.put(entry.getKey(), entry.getValue()); } - for (Map.Entry<String, Histogram> entry : histograms.entrySet()) { - if (filter.matches(entry.getKey(), entry.getValue())) { - filteredHistograms.put(entry.getKey(), entry.getValue()); - } + } + for (Map.Entry<String, Histogram> entry : histograms.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + filteredHistograms.put(entry.getKey(), entry.getValue()); } - for (Map.Entry<String, Meter> entry : meters.entrySet()) { - if (filter.matches(entry.getKey(), entry.getValue())) { - filteredMeters.put(entry.getKey(), entry.getValue()); - } + } + for (Map.Entry<String, Meter> entry : meters.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + filteredMeters.put(entry.getKey(), entry.getValue()); } - for (Map.Entry<String, Timer> entry : timers.entrySet()) { - if (filter.matches(entry.getKey(), entry.getValue())) { - filteredTimers.put(entry.getKey(), entry.getValue()); - } + } + for (Map.Entry<String, Timer> entry : timers.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + filteredTimers.put(entry.getKey(), entry.getValue()); } - reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers); - } else { - reporter.report(gauges, counters, histograms, meters, timers); } + reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers); + } + + void degister(MetricFilter metricFilter) { + gauges.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue())); + counters.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue())); + histograms.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue())); + meters.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue())); + timers.entrySet().removeIf(entry -> metricFilter.matches(entry.getKey(), entry.getValue())); } } \ No newline at end of file diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java index 19f016f..3336737 100644 --- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java +++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java @@ -110,7 +110,7 @@ public class NettyTest { private void doTestBasic(Map<String, Object> stormConf) throws Exception { LOG.info("1. Should send and receive a basic message"); String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz"; - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { AtomicReference<TaskMessage> response = new AtomicReference<>(); try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null); @@ -171,7 +171,7 @@ public class NettyTest { private void doTestLoad(Map<String, Object> stormConf) throws Exception { LOG.info("2 test load"); String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz"; - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { AtomicReference<TaskMessage> response = new AtomicReference<>(); try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null); @@ -225,7 +225,7 @@ public class NettyTest { private void doTestLargeMessage(Map<String, Object> stormConf) throws Exception { LOG.info("3 Should send and receive a large message"); String reqMessage = StringUtils.repeat("c", 2_048_000); - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { AtomicReference<TaskMessage> response = new AtomicReference<>(); try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null); @@ -264,7 +264,7 @@ public class NettyTest { private void doTestServerDelayed(Map<String, Object> stormConf) throws Exception { LOG.info("4. test server delayed"); String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz"; - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { AtomicReference<TaskMessage> response = new AtomicReference<>(); int port = Utils.getAvailablePort(6700); @@ -315,7 +315,7 @@ public class NettyTest { LOG.info("Should send and receive many messages (testing with " + numMessages + " messages)"); ArrayList<TaskMessage> responses = new ArrayList<>(); AtomicInteger received = new AtomicInteger(); - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { try (IConnection server = context.bind(null, 0, mkConnectionCallback((message) -> { responses.add(message); @@ -362,7 +362,7 @@ public class NettyTest { private void doTestServerAlwaysReconnects(Map<String, Object> stormConf) throws Exception { LOG.info("6. test server always reconnects"); String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz"; - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { AtomicReference<TaskMessage> response = new AtomicReference<>(); int port = Utils.getAvailablePort(6700); @@ -396,7 +396,7 @@ public class NettyTest { private void connectToFixedPort(Map<String, Object> stormConf, int port) throws Exception { LOG.info("7. Should be able to rebind to a port quickly"); String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz"; - IContext context = TransportFactory.makeContext(stormConf); + IContext context = TransportFactory.makeContext(stormConf, null); try { AtomicReference<TaskMessage> response = new AtomicReference<>(); try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null);