This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 91cd97b Add more unit tests (#176)
91cd97b is described below
commit 91cd97b36613fa58a34ced308ab573503b3eb23d
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Aug 23 21:21:47 2022 +0800
Add more unit tests (#176)
---
.../rocketmq/client/java/impl/ClientImpl.java | 10 +-
.../impl/consumer/MessageCacheObserverImpl.java | 57 -----
.../impl/consumer/ProcessQueueGaugeObserver.java | 90 ++++++++
.../java/impl/consumer/PushConsumerImpl.java | 10 +-
.../client/java/logging/CustomConsoleAppender.java | 5 +-
.../rocketmq/client/java/metrics/ClientMeter.java | 11 -
...tMeterProvider.java => ClientMeterManager.java} | 74 +++---
...eCacheObserver.java => EmptyGaugeObserver.java} | 31 ++-
...essageCacheObserver.java => GaugeObserver.java} | 24 +-
.../client/java/metrics/HistogramEnum.java | 2 +-
.../client/java/metrics/MessageMeterHandler.java | 42 ++--
.../java/logging/CustomConsoleAppenderTest.java} | 29 ++-
.../java/metrics/ClientMeterManagerTest.java | 52 +++++
.../client/java/metrics/ClientMeterTest.java | 91 ++++++++
.../java/metrics/MessageMeterHandlerTest.java | 250 +++++++++++++++++++++
15 files changed, 584 insertions(+), 194 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index b9b6a76..e2807fb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -76,7 +76,7 @@ import org.apache.rocketmq.client.java.hook.MessageHandler;
import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.message.GeneralMessage;
-import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
+import org.apache.rocketmq.client.java.metrics.ClientMeterManager;
import org.apache.rocketmq.client.java.metrics.MessageMeterHandler;
import org.apache.rocketmq.client.java.metrics.Metric;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
@@ -105,7 +105,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
// Thread-safe set.
protected final Set<Endpoints> isolated;
protected final ExecutorService clientCallbackExecutor;
- protected final ClientMeterProvider clientMeterProvider;
+ protected final ClientMeterManager clientMeterManager;
/**
* Telemetry command executor, which aims to execute commands from the
remote.
*/
@@ -153,10 +153,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
new LinkedBlockingQueue<>(),
new ThreadFactoryImpl("ClientCallbackWorker"));
- this.clientMeterProvider = new ClientMeterProvider(this);
+ this.clientMeterManager = new ClientMeterManager(clientId,
clientConfiguration);
this.compositedMessageHandler =
- new CompositedMessageHandler(Collections.singletonList(new
MessageMeterHandler(this, clientMeterProvider)));
+ new CompositedMessageHandler(Collections.singletonList(new
MessageMeterHandler(this, clientMeterManager)));
this.telemetryCommandExecutor = new ThreadPoolExecutor(
1,
@@ -317,7 +317,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
@Override
public final void onSettingsCommand(Endpoints endpoints,
apache.rocketmq.v2.Settings settings) {
final Metric metric = new Metric(settings.getMetric());
- clientMeterProvider.reset(metric);
+ clientMeterManager.reset(metric);
this.getSettings().sync(settings);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/MessageCacheObserverImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/MessageCacheObserverImpl.java
deleted file mode 100644
index b5802da..0000000
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/MessageCacheObserverImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.impl.consumer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.java.metrics.MessageCacheObserver;
-import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-
-public class MessageCacheObserverImpl implements MessageCacheObserver {
- private final ConcurrentMap<MessageQueueImpl, ProcessQueue>
processQueueTable;
-
- public MessageCacheObserverImpl(ConcurrentMap<MessageQueueImpl,
ProcessQueue> processQueueTable) {
- this.processQueueTable = processQueueTable;
- }
-
- @Override
- public Map<String, Long> getCachedMessageCount() {
- Map<String, Long> cachedMessageCountMap = new HashMap<>();
- for (ProcessQueue pq : processQueueTable.values()) {
- final String topic = pq.getMessageQueue().getTopic();
- long count = cachedMessageCountMap.containsKey(topic) ?
cachedMessageCountMap.get(topic) : 0;
- count += pq.getInflightMessageCount();
- count += pq.getPendingMessageCount();
- cachedMessageCountMap.put(topic, count);
- }
- return cachedMessageCountMap;
- }
-
- @Override
- public Map<String, Long> getCachedMessageBytes() {
- Map<String, Long> cachedMessageBytesMap = new HashMap<>();
- for (ProcessQueue pq : processQueueTable.values()) {
- final String topic = pq.getMessageQueue().getTopic();
- long bytes = cachedMessageBytesMap.containsKey(topic) ?
cachedMessageBytesMap.get(topic) : 0;
- bytes += pq.getCachedMessageBytes();
- cachedMessageBytesMap.put(topic, bytes);
- }
- return cachedMessageBytesMap;
- }
-}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueGaugeObserver.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueGaugeObserver.java
new file mode 100644
index 0000000..5dda367
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueGaugeObserver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.java.metrics.GaugeEnum;
+import org.apache.rocketmq.client.java.metrics.GaugeObserver;
+import org.apache.rocketmq.client.java.metrics.MetricLabels;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+
+public class ProcessQueueGaugeObserver implements GaugeObserver {
+ private final ConcurrentMap<MessageQueueImpl, ProcessQueue>
processQueueTable;
+ private final String clientId;
+ private final String consumerGroup;
+ private final List<GaugeEnum> gauges;
+
+ public ProcessQueueGaugeObserver(ConcurrentMap<MessageQueueImpl,
ProcessQueue> processQueueTable, String clientId,
+ String consumerGroup) {
+ this.processQueueTable = processQueueTable;
+ this.clientId = clientId;
+ this.consumerGroup = consumerGroup;
+ this.gauges = new ArrayList<>();
+ gauges.add(GaugeEnum.CONSUMER_CACHED_MESSAGES);
+ gauges.add(GaugeEnum.CONSUMER_CACHED_BYTES);
+ }
+
+ @Override
+ public List<GaugeEnum> getGauges() {
+ return gauges;
+ }
+
+ @Override
+ public Map<Attributes, Double> getValues(GaugeEnum gauge) {
+ switch (gauge) {
+ case CONSUMER_CACHED_MESSAGES:
+ Map<Attributes, Double> cachedMessageCountMap = new
HashMap<>();
+ for (ProcessQueue pq : processQueueTable.values()) {
+ final String topic = pq.getMessageQueue().getTopic();
+ Attributes attributes = Attributes.builder()
+ .put(MetricLabels.TOPIC, topic)
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, clientId)
+ .build();
+ double count =
cachedMessageCountMap.containsKey(attributes) ?
+ cachedMessageCountMap.get(attributes) : 0;
+ count += pq.getInflightMessageCount();
+ count += pq.getPendingMessageCount();
+ cachedMessageCountMap.put(attributes, count);
+ }
+ return cachedMessageCountMap;
+ case CONSUMER_CACHED_BYTES:
+ Map<Attributes, Double> cachedMessageBytesMap = new
HashMap<>();
+ for (ProcessQueue pq : processQueueTable.values()) {
+ final String topic = pq.getMessageQueue().getTopic();
+ Attributes attributes = Attributes.builder()
+ .put(MetricLabels.TOPIC, topic)
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, clientId)
+ .build();
+ double bytes =
cachedMessageBytesMap.containsKey(attributes) ?
+ cachedMessageBytesMap.get(attributes) : 0;
+ bytes += pq.getCachedMessageBytes();
+ cachedMessageBytesMap.put(attributes, bytes);
+ }
+ return cachedMessageBytesMap;
+ default:
+ return new HashMap<>();
+ }
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index d4c398d..7682071 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -67,7 +67,7 @@ import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
-import org.apache.rocketmq.client.java.metrics.MessageCacheObserver;
+import org.apache.rocketmq.client.java.metrics.GaugeObserver;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
@@ -142,8 +142,6 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
this.consumptionErrorQuantity = new AtomicLong(0);
this.processQueueTable = new ConcurrentHashMap<>();
- MessageCacheObserver messageCacheObserver = new
MessageCacheObserverImpl(processQueueTable);
- this.clientMeterProvider.setMessageCacheObserver(messageCacheObserver);
this.consumptionExecutor = new ThreadPoolExecutor(
consumptionThreadCount,
@@ -158,6 +156,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
protected void startUp() throws Exception {
try {
LOGGER.info("Begin to start the rocketmq push consumer,
clientId={}", clientId);
+ GaugeObserver gaugeObserver = new
ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
+ this.clientMeterManager.setGaugeObserver(gaugeObserver);
super.startUp();
final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
this.consumeService = createConsumeService();
@@ -185,8 +185,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
scanAssignmentsFuture.cancel(false);
}
super.shutDown();
- consumeService.stopAsync().awaitTerminated();
- consumptionExecutor.shutdown();
+ this.consumeService.stopAsync().awaitTerminated();
+ this.consumptionExecutor.shutdown();
ExecutorServices.awaitTerminated(consumptionExecutor);
LOGGER.info("Shutdown the rocketmq push consumer successfully,
clientId={}", clientId);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
index e131471..857e51c 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
@@ -21,10 +21,11 @@ import ch.qos.logback.core.ConsoleAppender;
public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
public static final String ENABLE_CONSOLE_APPENDER_KEY =
"mq.consoleAppender.enabled";
- private final boolean enabled =
Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
- Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+ private final boolean enabled;
public CustomConsoleAppender() {
+ this.enabled =
Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
+
Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
}
protected void append(E eventObject) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index 1afbd0a..53cdb09 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -24,7 +24,6 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -68,16 +67,6 @@ public class ClientMeter {
return enabled;
}
- public Endpoints getEndpoints() {
- return endpoints;
- }
-
- Optional<DoubleHistogram> getHistogramByEnum(HistogramEnum histogramEnum) {
- final DoubleHistogram histogram =
histogramMap.computeIfAbsent(histogramEnum.getName(), name -> enabled ?
- meter.histogramBuilder(histogramEnum.getName()).build() : null);
- return null == histogram ? Optional.empty() : Optional.of(histogram);
- }
-
public void record(HistogramEnum histogramEnum, Attributes attributes,
double value) {
final DoubleHistogram histogram =
histogramMap.computeIfAbsent(histogramEnum.getName(), name -> enabled ?
meter.histogramBuilder(histogramEnum.getName()).build() : null);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
similarity index 72%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
index 5af1aa1..683ebc1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.client.java.metrics;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -36,33 +38,33 @@ import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ClientMeterProvider {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeterProvider.class);
+public class ClientMeterManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClientMeterManager.class);
private static final Duration METRIC_EXPORTER_RPC_TIMEOUT =
Duration.ofSeconds(3);
private static final Duration METRIC_READER_INTERVAL =
Duration.ofMinutes(1);
private static final String METRIC_INSTRUMENTATION_NAME =
"org.apache.rocketmq.message";
- private final ClientImpl client;
+ private final String clientId;
+ private final ClientConfiguration clientConfiguration;
private volatile ClientMeter clientMeter;
- private volatile MessageCacheObserver messageCacheObserver;
+ private volatile GaugeObserver gaugeObserver = GaugeObserver.EMPTY;
- public ClientMeterProvider(ClientImpl client) {
- this.client = client;
- this.clientMeter = ClientMeter.disabledInstance(client.clientId());
- this.messageCacheObserver = null;
+ public ClientMeterManager(String clientId, ClientConfiguration
clientConfiguration) {
+ this.clientId = clientId;
+ this.clientConfiguration = clientConfiguration;
+ this.clientMeter = ClientMeter.disabledInstance(clientId);
}
- public void setMessageCacheObserver(MessageCacheObserver
messageCacheObserver) {
- this.messageCacheObserver = messageCacheObserver;
+ public void setGaugeObserver(GaugeObserver gaugeObserver) {
+ this.gaugeObserver = checkNotNull(gaugeObserver, "gaugeObserver should
not be null");
}
public void record(HistogramEnum histogramEnum, Attributes attributes,
double value) {
@@ -71,7 +73,6 @@ public class ClientMeterProvider {
@SuppressWarnings("deprecation")
public synchronized void reset(Metric metric) {
- final String clientId = client.clientId();
try {
if (clientMeter.satisfy(metric)) {
LOGGER.info("Metric settings is satisfied by the current
message meter, clientId={}", clientId);
@@ -80,14 +81,14 @@ public class ClientMeterProvider {
if (!metric.isOn()) {
LOGGER.info("Metric is off, clientId={}", clientId);
clientMeter.shutdown();
- clientMeter = ClientMeter.disabledInstance(client.clientId());
+ clientMeter = ClientMeter.disabledInstance(clientId);
return;
}
final Endpoints endpoints = metric.getEndpoints();
final SslContext sslContext =
GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
- .sslContext(sslContext).intercept(new
AuthInterceptor(client.getClientConfiguration(), clientId));
+ .sslContext(sslContext).intercept(new
AuthInterceptor(clientConfiguration, clientId));
final List<InetSocketAddress> socketAddresses =
endpoints.toSocketAddresses();
if (null != socketAddresses) {
IpNameResolverFactory metricResolverFactory = new
IpNameResolverFactory(socketAddresses);
@@ -99,9 +100,9 @@ public class ClientMeterProvider {
.build();
InstrumentSelector sendSuccessCostTimeInstrumentSelector =
InstrumentSelector.builder()
-
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.SEND_SUCCESS_COST_TIME.getName()).build();
+
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.SEND_COST_TIME.getName()).build();
final View sendSuccessCostTimeView = View.builder()
-
.setAggregation(HistogramEnum.SEND_SUCCESS_COST_TIME.getBucket()).build();
+
.setAggregation(HistogramEnum.SEND_COST_TIME.getBucket()).build();
InstrumentSelector deliveryLatencyInstrumentSelector =
InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.DELIVERY_LATENCY.getName()).build();
@@ -137,33 +138,20 @@ public class ClientMeterProvider {
existedClientMeter.shutdown();
LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints,
clientId);
- if (!(client instanceof PushConsumer)) {
- // No need for producer and simple consumer.
- return;
+ final List<GaugeEnum> gauges = gaugeObserver.getGauges();
+ for (GaugeEnum gauge : gauges) {
+
meter.gaugeBuilder(gauge.getName()).buildWithCallback(measurement -> {
+ final Map<Attributes, Double> map =
gaugeObserver.getValues(gauge);
+ if (map.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<Attributes, Double> entry : map.entrySet())
{
+ final Attributes attributes = entry.getKey();
+ final Double value = entry.getValue();
+ measurement.record(value, attributes);
+ }
+ });
}
- final String consumerGroup = ((PushConsumer)
client).getConsumerGroup();
-
meter.gaugeBuilder(GaugeEnum.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement
-> {
- final Map<String, Long> cachedMessageCountMap =
messageCacheObserver.getCachedMessageCount();
- for (Map.Entry<String, Long> entry :
cachedMessageCountMap.entrySet()) {
- final String topic = entry.getKey();
- Attributes attributes = Attributes.builder()
- .put(MetricLabels.TOPIC, topic)
- .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, clientId).build();
- measurement.record(entry.getValue(), attributes);
- }
- });
-
meter.gaugeBuilder(GaugeEnum.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement
-> {
- final Map<String, Long> cachedMessageBytesMap =
messageCacheObserver.getCachedMessageBytes();
- for (Map.Entry<String, Long> entry :
cachedMessageBytesMap.entrySet()) {
- final String topic = entry.getKey();
- Attributes attributes = Attributes.builder()
- .put(MetricLabels.TOPIC, topic)
- .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, clientId).build();
- measurement.record(entry.getValue(), attributes);
- }
- });
} catch (Throwable t) {
LOGGER.error("Exception raised when resetting message meter,
clientId={}", clientId, t);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/EmptyGaugeObserver.java
similarity index 59%
copy from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
copy to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/EmptyGaugeObserver.java
index 00c65c5..ca6de8a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/EmptyGaugeObserver.java
@@ -17,24 +17,21 @@
package org.apache.rocketmq.client.java.metrics;
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-/**
- * The observer which could records the count and memory footprint of cached
message in {@link PushConsumer}.
- */
-public interface MessageCacheObserver {
- /**
- * Get the cached message count for each topic.
- *
- * @return the cached message count map.
- */
- Map<String /* topic */, Long> getCachedMessageCount();
+public class EmptyGaugeObserver implements GaugeObserver {
+ @Override
+ public List<GaugeEnum> getGauges() {
+ return new ArrayList<>();
+ }
- /**
- * Get the cached message memory footprint for each topic.
- *
- * @return the cached message footprint map.
- */
- Map<String /* topic */, Long> getCachedMessageBytes();
+ @Override
+ public Map<Attributes, Double> getValues(GaugeEnum gauge) {
+ return new HashMap<>();
+ }
}
+
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeObserver.java
similarity index 59%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeObserver.java
index 00c65c5..a6e653a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeObserver.java
@@ -17,24 +17,14 @@
package org.apache.rocketmq.client.java.metrics;
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
import java.util.Map;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-/**
- * The observer which could records the count and memory footprint of cached
message in {@link PushConsumer}.
- */
-public interface MessageCacheObserver {
- /**
- * Get the cached message count for each topic.
- *
- * @return the cached message count map.
- */
- Map<String /* topic */, Long> getCachedMessageCount();
+public interface GaugeObserver {
+ GaugeObserver EMPTY = new EmptyGaugeObserver();
+
+ List<GaugeEnum> getGauges();
- /**
- * Get the cached message memory footprint for each topic.
- *
- * @return the cached message footprint map.
- */
- Map<String /* topic */, Long> getCachedMessageBytes();
+ Map<Attributes, Double> getValues(GaugeEnum gauge);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
index 0fd1d07..61a72fd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
@@ -28,7 +28,7 @@ public enum HistogramEnum {
*
* <p>The time unit of bucket is milliseconds.
*/
- SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time",
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+ SEND_COST_TIME("rocketmq_send_cost_time",
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
10.0, 20.0, 50.0, 200.0, 500.0))),
/**
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
index 4c2132d..ee0dce3 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
@@ -30,34 +30,32 @@ import org.apache.rocketmq.client.java.hook.MessageHandler;
import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
-import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.impl.Client;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageMeterHandler implements MessageHandler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterHandler.class);
+ static final AttributeKey<Stopwatch> SEND_STOPWATCH_KEY =
AttributeKey.create("send_stopwatch");
+ static final AttributeKey<Stopwatch> CONSUME_STOPWATCH_KEY =
AttributeKey.create("consume_stopwatch");
- private static final AttributeKey<Stopwatch> MESSAGE_SEND_STOPWATCH_KEY =
AttributeKey.create(
- "message_send_stopwatch");
- private static final AttributeKey<Stopwatch> MESSAGE_CONSUME_STOPWATCH_KEY
= AttributeKey.create(
- "message_consume_stopwatch");
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MessageMeterHandler.class);
- private final ClientImpl client;
- private final ClientMeterProvider meterProvider;
+ private final Client client;
+ private final ClientMeterManager meterManager;
- public MessageMeterHandler(ClientImpl client, ClientMeterProvider
meterProvider) {
+ public MessageMeterHandler(Client client, ClientMeterManager meterManager)
{
this.client = client;
- this.meterProvider = meterProvider;
+ this.meterManager = meterManager;
}
private void doBeforeSendMessage(MessageHandlerContext context) {
// Record the time before sending message.
- context.putAttribute(MESSAGE_SEND_STOPWATCH_KEY,
Attribute.create(Stopwatch.createStarted()));
+ context.putAttribute(SEND_STOPWATCH_KEY,
Attribute.create(Stopwatch.createStarted()));
}
private void doAfterSendMessage(MessageHandlerContext context,
List<GeneralMessage> messages) {
- final Attribute<Stopwatch> stopwatchAttr =
context.getAttribute(MESSAGE_SEND_STOPWATCH_KEY);
+ final Attribute<Stopwatch> stopwatchAttr =
context.getAttribute(SEND_STOPWATCH_KEY);
if (null == stopwatchAttr) {
// Should never reach here.
return;
@@ -69,7 +67,7 @@ public class MessageMeterHandler implements MessageHandler {
Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, message.getTopic())
.put(MetricLabels.CLIENT_ID, client.clientId())
.put(MetricLabels.INVOCATION_STATUS, status.getName()).build();
- meterProvider.record(HistogramEnum.SEND_SUCCESS_COST_TIME,
attributes, duration.toMillis());
+ meterManager.record(HistogramEnum.SEND_COST_TIME, attributes,
duration.toMillis());
}
}
@@ -103,10 +101,14 @@ public class MessageMeterHandler implements
MessageHandler {
}
final Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, message.getTopic())
.put(MetricLabels.CONSUMER_GROUP,
consumerGroup).put(MetricLabels.CLIENT_ID, client.clientId()).build();
- meterProvider.record(HistogramEnum.DELIVERY_LATENCY, attributes,
latency);
+ meterManager.record(HistogramEnum.DELIVERY_LATENCY, attributes,
latency);
}
private void doBeforeConsumeMessage(MessageHandlerContext context,
List<GeneralMessage> messages) {
+ if (messages.isEmpty()) {
+ // Should never reach here.
+ return;
+ }
String consumerGroup = null;
if (client instanceof PushConsumer) {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -125,9 +127,9 @@ public class MessageMeterHandler implements MessageHandler {
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
.put(MetricLabels.CLIENT_ID, client.clientId()).build();
final long latency = System.currentTimeMillis() - decodeTimestamp;
- meterProvider.record(HistogramEnum.AWAIT_TIME, attributes, latency);
+ meterManager.record(HistogramEnum.AWAIT_TIME, attributes, latency);
// Record the time before consuming message.
- context.putAttribute(MESSAGE_CONSUME_STOPWATCH_KEY,
Attribute.create(Stopwatch.createStarted()));
+ context.putAttribute(CONSUME_STOPWATCH_KEY,
Attribute.create(Stopwatch.createStarted()));
}
private void doAfterConsumeMessage(MessageHandlerContext context,
List<GeneralMessage> messages) {
@@ -136,7 +138,7 @@ public class MessageMeterHandler implements MessageHandler {
LOGGER.error("[Bug] current client is not push consumer,
clientId={}", client.clientId());
return;
}
- final Attribute<Stopwatch> stopwatchAttr =
context.getAttribute(MESSAGE_CONSUME_STOPWATCH_KEY);
+ final Attribute<Stopwatch> stopwatchAttr =
context.getAttribute(CONSUME_STOPWATCH_KEY);
if (null == stopwatchAttr) {
// Should never reach here.
return;
@@ -152,13 +154,13 @@ public class MessageMeterHandler implements
MessageHandler {
.put(MetricLabels.INVOCATION_STATUS,
invocationStatus.getName())
.build();
final Duration duration = stopwatchAttr.get().elapsed();
- meterProvider.record(HistogramEnum.PROCESS_TIME, attributes,
duration.toMillis());
+ meterManager.record(HistogramEnum.PROCESS_TIME, attributes,
duration.toMillis());
}
}
@Override
public void doBefore(MessageHandlerContext context, List<GeneralMessage>
messages) {
- if (!meterProvider.isEnabled()) {
+ if (!meterManager.isEnabled()) {
return;
}
final MessageHookPoints hookPoints = context.getMessageHookPoints();
@@ -178,7 +180,7 @@ public class MessageMeterHandler implements MessageHandler {
@Override
public void doAfter(MessageHandlerContext context, List<GeneralMessage>
messages) {
- if (!meterProvider.isEnabled()) {
+ if (!meterManager.isEnabled()) {
return;
}
final MessageHookPoints hookPoints = context.getMessageHookPoints();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppenderTest.java
similarity index 58%
copy from
java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
copy to
java/client/src/test/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppenderTest.java
index e131471..418ea30 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppenderTest.java
@@ -15,26 +15,23 @@
* limitations under the License.
*/
-package org.apache.rocketmq.client.java.logging;
-import ch.qos.logback.core.ConsoleAppender;
+package org.apache.rocketmq.client.java.logging;
-public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
- public static final String ENABLE_CONSOLE_APPENDER_KEY =
"mq.consoleAppender.enabled";
- private final boolean enabled =
Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
- Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
- public CustomConsoleAppender() {
- }
+import org.junit.Test;
- protected void append(E eventObject) {
- if (this.enabled) {
- super.append(eventObject);
- }
+public class CustomConsoleAppenderTest {
- }
+ @Test
+ public void testIsEnabled() {
+ CustomConsoleAppender<Object> appender = new CustomConsoleAppender<>();
+ assertFalse(appender.isEnabled());
- public boolean isEnabled() {
- return this.enabled;
+ System.setProperty(CustomConsoleAppender.ENABLE_CONSOLE_APPENDER_KEY,
"true");
+ appender = new CustomConsoleAppender<>();
+ assertTrue(appender.isEnabled());
}
-}
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterManagerTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterManagerTest.java
new file mode 100644
index 0000000..c3a6f48
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterManagerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+
+public class ClientMeterManagerTest extends TestBase {
+
+ @Test
+ public void testResetWithMetricOn() {
+ final ClientConfiguration clientConfiguration =
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ String clientId = "clientId";
+ final ClientMeterManager meterManager = new
ClientMeterManager(clientId, clientConfiguration);
+ final Metric metric =
+ new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(fakePbEndpoints0()).build());
+ meterManager.reset(metric);
+ assertTrue(meterManager.isEnabled());
+ }
+
+ @Test
+ public void testResetWithMetricOff() {
+ final ClientConfiguration clientConfiguration =
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ String clientId = "clientId";
+ final ClientMeterManager meterManager = new
ClientMeterManager(clientId, clientConfiguration);
+ final Metric metric =
+ new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).setEndpoints(fakePbEndpoints0()).build());
+ meterManager.reset(metric);
+ assertFalse(meterManager.isEnabled());
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterTest.java
new file mode 100644
index 0000000..177d615
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import apache.rocketmq.v2.Endpoints;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.resources.Resource;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+
+public class ClientMeterTest extends TestBase {
+
+ @Test
+ public void testShutdownWithEnabledMeter() {
+ final SdkMeterProvider provider =
SdkMeterProvider.builder().setResource(Resource.empty()).build();
+ final OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(provider).build();
+ Meter meter = openTelemetry.getMeter("test");
+ final ClientMeter clientMeter = new ClientMeter(meter,
fakeEndpoints(), provider, "clientId");
+ assertTrue(clientMeter.isEnabled());
+ clientMeter.shutdown();
+ }
+
+ @Test
+ public void testShutdownWithDisabledMeter() {
+ final ClientMeter clientMeter =
ClientMeter.disabledInstance("clientId");
+ assertFalse(clientMeter.isEnabled());
+ clientMeter.shutdown();
+ }
+
+ @Test
+ public void testSatisfy() {
+ ClientMeter clientMeter = ClientMeter.disabledInstance("clientId");
+ Metric metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).build());
+ assertTrue(clientMeter.satisfy(metric));
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).build());
+ assertTrue(clientMeter.satisfy(metric));
+
+ final Endpoints pbEndpoints0 = fakePbEndpoints0();
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).setEndpoints(pbEndpoints0).build());
+ assertTrue(clientMeter.satisfy(metric));
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(pbEndpoints0).build());
+ assertFalse(clientMeter.satisfy(metric));
+
+ final org.apache.rocketmq.client.java.route.Endpoints endpoints =
+ new org.apache.rocketmq.client.java.route.Endpoints(pbEndpoints0);
+
+ final SdkMeterProvider provider =
SdkMeterProvider.builder().setResource(Resource.empty()).build();
+ final OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(provider).build();
+ Meter meter = openTelemetry.getMeter("test");
+ clientMeter = new ClientMeter(meter, endpoints, provider, "clientId");
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).build());
+ assertFalse(clientMeter.satisfy(metric));
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).build());
+ assertFalse(clientMeter.satisfy(metric));
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).setEndpoints(pbEndpoints0).build());
+ assertFalse(clientMeter.satisfy(metric));
+
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(pbEndpoints0).build());
+ assertTrue(clientMeter.satisfy(metric));
+
+ final Endpoints pbEndpoints1 = fakePbEndpoints1();
+ metric = new
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(pbEndpoints1).build());
+ assertFalse(clientMeter.satisfy(metric));
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
new file mode 100644
index 0000000..9137355
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
+import org.apache.rocketmq.client.java.hook.MessageHookPoints;
+import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.impl.Client;
+import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.message.GeneralMessage;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class MessageMeterHandlerTest extends TestBase {
+
+ @Test
+ public void testSendMessageWithSuccess() {
+ final ClientImpl producer = Mockito.mock(ClientImpl.class);
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(producer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(producer, meterManager);
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ String topic = FAKE_TOPIC_0;
+ Mockito.doReturn(topic).when(message).getTopic();
+ List<GeneralMessage> messageList = new ArrayList<>();
+ messageList.add(message);
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ meterHandler.doBefore(context, messageList);
+
assertNotNull(context.getAttributes().get(MessageMeterHandler.SEND_STOPWATCH_KEY));
+ context.setStatus(MessageHookPointsStatus.OK);
+ meterHandler.doAfter(context, messageList);
+ ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.SEND_COST_TIME),
attributesArgumentCaptor.capture(), Mockito.anyDouble());
+ final Attributes attributes = attributesArgumentCaptor.getValue();
+ assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+ assertEquals(clientId, attributes.get(MetricLabels.CLIENT_ID));
+ assertEquals(InvocationStatus.SUCCESS.getName(),
attributes.get(MetricLabels.INVOCATION_STATUS));
+ }
+
+ @Test
+ public void testSendMessageWithUnset() {
+ final ClientImpl producer = Mockito.mock(ClientImpl.class);
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(producer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(producer, meterManager);
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ String topic = FAKE_TOPIC_0;
+ Mockito.doReturn(topic).when(message).getTopic();
+ List<GeneralMessage> messageList = new ArrayList<>();
+ messageList.add(message);
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ meterHandler.doBefore(context, messageList);
+ context.setStatus(MessageHookPointsStatus.UNSET);
+ meterHandler.doAfter(context, messageList);
+ ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.SEND_COST_TIME),
attributesArgumentCaptor.capture(), Mockito.anyDouble());
+ final Attributes attributes = attributesArgumentCaptor.getValue();
+ assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+ assertEquals(clientId, attributes.get(MetricLabels.CLIENT_ID));
+ assertEquals(InvocationStatus.FAILURE.getName(),
attributes.get(MetricLabels.INVOCATION_STATUS));
+ }
+
+ @Test
+ public void testSendMessageWithFailure() {
+ final ClientImpl producer = Mockito.mock(ClientImpl.class);
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(producer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(producer, meterManager);
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ String topic = FAKE_TOPIC_0;
+ Mockito.doReturn(topic).when(message).getTopic();
+ List<GeneralMessage> messageList = new ArrayList<>();
+ messageList.add(message);
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.SEND);
+ meterHandler.doBefore(context, messageList);
+ context.setStatus(MessageHookPointsStatus.ERROR);
+ meterHandler.doAfter(context, messageList);
+ ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.SEND_COST_TIME),
attributesArgumentCaptor.capture(), Mockito.anyDouble());
+ final Attributes attributes = attributesArgumentCaptor.getValue();
+ assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+ assertEquals(clientId, attributes.get(MetricLabels.CLIENT_ID));
+ assertEquals(InvocationStatus.FAILURE.getName(),
attributes.get(MetricLabels.INVOCATION_STATUS));
+ }
+
+ interface MyPushConsumer extends Client, PushConsumer {
+ }
+
+ interface MySimpleConsumer extends Client, SimpleConsumer {
+ }
+
+ @Test
+ public void testReceiveMessageForPushConsumer() {
+ final MyPushConsumer pushConsumer = Mockito.mock(MyPushConsumer.class);
+ String consumerGroup = FAKE_CONSUMER_GROUP_0;
+ Mockito.doReturn(consumerGroup).when(pushConsumer).getConsumerGroup();
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(pushConsumer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(pushConsumer, meterManager);
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ String topic = FAKE_TOPIC_0;
+ Mockito.doReturn(topic).when(message).getTopic();
+ List<GeneralMessage> messageList = new ArrayList<>();
+ messageList.add(message);
+ final Optional<Long> optionalTransportDeliveryTimestamp =
Optional.of(System.currentTimeMillis());
+
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ meterHandler.doAfter(context, messageList);
+ ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.DELIVERY_LATENCY),
attributesArgumentCaptor.capture(),
+ Mockito.anyDouble());
+ final Attributes attributes = attributesArgumentCaptor.getValue();
+ assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+ assertEquals(consumerGroup,
attributes.get(MetricLabels.CONSUMER_GROUP));
+ assertNotNull(attributes.get(MetricLabels.CLIENT_ID));
+ }
+
+ @Test
+ public void testReceiveMessageForPushConsumerWithEmptyMessage() {
+ final MyPushConsumer pushConsumer = Mockito.mock(MyPushConsumer.class);
+
Mockito.doReturn(FAKE_CONSUMER_GROUP_0).when(pushConsumer).getConsumerGroup();
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(pushConsumer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(pushConsumer, meterManager);
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ Mockito.doReturn(FAKE_TOPIC_0).when(message).getTopic();
+ List<GeneralMessage> messageList = new ArrayList<>();
+ final Optional<Long> optionalTransportDeliveryTimestamp =
Optional.of(System.currentTimeMillis());
+
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ meterHandler.doAfter(context, messageList);
+ Mockito.verify(meterManager, Mockito.never())
+ .record(any(HistogramEnum.class), any(Attributes.class),
Mockito.anyDouble());
+ }
+
+ @Test
+ public void testReceiveMessageForSimpleConsumer() {
+ final MySimpleConsumer simpleConsumer =
Mockito.mock(MySimpleConsumer.class);
+ String consumerGroup = FAKE_CONSUMER_GROUP_0;
+
Mockito.doReturn(consumerGroup).when(simpleConsumer).getConsumerGroup();
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(simpleConsumer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(simpleConsumer, meterManager);
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ String topic = FAKE_TOPIC_0;
+ Mockito.doReturn(topic).when(message).getTopic();
+ List<GeneralMessage> messageList = new ArrayList<>();
+ messageList.add(message);
+ final Optional<Long> optionalTransportDeliveryTimestamp =
Optional.of(System.currentTimeMillis());
+
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+ meterHandler.doAfter(context, messageList);
+ ArgumentCaptor<Attributes> attributesArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.DELIVERY_LATENCY),
attributesArgumentCaptor.capture(),
+ Mockito.anyDouble());
+ final Attributes attributes = attributesArgumentCaptor.getValue();
+ assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+ assertEquals(consumerGroup,
attributes.get(MetricLabels.CONSUMER_GROUP));
+ assertNotNull(attributes.get(MetricLabels.CLIENT_ID));
+ }
+
+ @Test
+ public void testConsumeMessage() {
+ final MyPushConsumer pushConsumer = Mockito.mock(MyPushConsumer.class);
+ String consumerGroup = FAKE_CONSUMER_GROUP_0;
+ Mockito.doReturn(consumerGroup).when(pushConsumer).getConsumerGroup();
+ final ClientMeterManager meterManager =
Mockito.mock(ClientMeterManager.class);
+ Mockito.doReturn(true).when(meterManager).isEnabled();
+ String clientId = "clientId";
+ Mockito.doReturn(clientId).when(pushConsumer).clientId();
+ final MessageMeterHandler meterHandler = new
MessageMeterHandler(pushConsumer, meterManager);
+ List<GeneralMessage> generalMessages = new ArrayList<>();
+ final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+ generalMessages.add(message);
+ String topic = FAKE_TOPIC_0;
+ Mockito.doReturn(topic).when(message).getTopic();
+ long awaitTimeMills = 3000;
+ long decodeTimestamp = System.currentTimeMillis() - awaitTimeMills;
+
Mockito.doReturn(Optional.of(decodeTimestamp)).when(message).getDecodeTimestamp();
+ final MessageHandlerContextImpl context = new
MessageHandlerContextImpl(MessageHookPoints.CONSUME);
+ meterHandler.doBefore(context, generalMessages);
+ ArgumentCaptor<Attributes> attributes0ArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ final ArgumentCaptor<Double> awaitTimeArgumentCaptor =
ArgumentCaptor.forClass(Double.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.AWAIT_TIME),
attributes0ArgumentCaptor.capture(),
+ awaitTimeArgumentCaptor.capture());
+ assertEquals(awaitTimeMills, awaitTimeArgumentCaptor.getValue(), 100);
+ final Attributes attributes0 = attributes0ArgumentCaptor.getValue();
+ assertEquals(topic, attributes0.get(MetricLabels.TOPIC));
+ assertEquals(consumerGroup,
attributes0.get(MetricLabels.CONSUMER_GROUP));
+ assertEquals(clientId, attributes0.get(MetricLabels.CLIENT_ID));
+
assertNotNull(context.getAttributes().get(MessageMeterHandler.CONSUME_STOPWATCH_KEY));
+
+ context.setStatus(MessageHookPointsStatus.OK);
+ meterHandler.doAfter(context, generalMessages);
+ ArgumentCaptor<Attributes> attributes1ArgumentCaptor =
ArgumentCaptor.forClass(Attributes.class);
+ Mockito.verify(meterManager, Mockito.times(1))
+ .record(Mockito.eq(HistogramEnum.PROCESS_TIME),
attributes1ArgumentCaptor.capture(),
+ awaitTimeArgumentCaptor.capture());
+ final Attributes attributes1 = attributes1ArgumentCaptor.getValue();
+ assertEquals(topic, attributes1.get(MetricLabels.TOPIC));
+ assertEquals(consumerGroup,
attributes1.get(MetricLabels.CONSUMER_GROUP));
+ assertEquals(clientId, attributes1.get(MetricLabels.CLIENT_ID));
+ assertEquals(InvocationStatus.SUCCESS.getName(),
attributes1.get(MetricLabels.INVOCATION_STATUS));
+ }
+}
\ No newline at end of file