This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 91cd97b  Add more unit tests (#176)
91cd97b is described below

commit 91cd97b36613fa58a34ced308ab573503b3eb23d
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Aug 23 21:21:47 2022 +0800

    Add more unit tests (#176)
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  10 +-
 .../impl/consumer/MessageCacheObserverImpl.java    |  57 -----
 .../impl/consumer/ProcessQueueGaugeObserver.java   |  90 ++++++++
 .../java/impl/consumer/PushConsumerImpl.java       |  10 +-
 .../client/java/logging/CustomConsoleAppender.java |   5 +-
 .../rocketmq/client/java/metrics/ClientMeter.java  |  11 -
 ...tMeterProvider.java => ClientMeterManager.java} |  74 +++---
 ...eCacheObserver.java => EmptyGaugeObserver.java} |  31 ++-
 ...essageCacheObserver.java => GaugeObserver.java} |  24 +-
 .../client/java/metrics/HistogramEnum.java         |   2 +-
 .../client/java/metrics/MessageMeterHandler.java   |  42 ++--
 .../java/logging/CustomConsoleAppenderTest.java}   |  29 ++-
 .../java/metrics/ClientMeterManagerTest.java       |  52 +++++
 .../client/java/metrics/ClientMeterTest.java       |  91 ++++++++
 .../java/metrics/MessageMeterHandlerTest.java      | 250 +++++++++++++++++++++
 15 files changed, 584 insertions(+), 194 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index b9b6a76..e2807fb 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -76,7 +76,7 @@ import org.apache.rocketmq.client.java.hook.MessageHandler;
 import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
-import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
+import org.apache.rocketmq.client.java.metrics.ClientMeterManager;
 import org.apache.rocketmq.client.java.metrics.MessageMeterHandler;
 import org.apache.rocketmq.client.java.metrics.Metric;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
@@ -105,7 +105,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     // Thread-safe set.
     protected final Set<Endpoints> isolated;
     protected final ExecutorService clientCallbackExecutor;
-    protected final ClientMeterProvider clientMeterProvider;
+    protected final ClientMeterManager clientMeterManager;
     /**
      * Telemetry command executor, which aims to execute commands from the 
remote.
      */
@@ -153,10 +153,10 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
             new LinkedBlockingQueue<>(),
             new ThreadFactoryImpl("ClientCallbackWorker"));
 
-        this.clientMeterProvider = new ClientMeterProvider(this);
+        this.clientMeterManager = new ClientMeterManager(clientId, 
clientConfiguration);
 
         this.compositedMessageHandler =
-            new CompositedMessageHandler(Collections.singletonList(new 
MessageMeterHandler(this, clientMeterProvider)));
+            new CompositedMessageHandler(Collections.singletonList(new 
MessageMeterHandler(this, clientMeterManager)));
 
         this.telemetryCommandExecutor = new ThreadPoolExecutor(
             1,
@@ -317,7 +317,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     @Override
     public final void onSettingsCommand(Endpoints endpoints, 
apache.rocketmq.v2.Settings settings) {
         final Metric metric = new Metric(settings.getMetric());
-        clientMeterProvider.reset(metric);
+        clientMeterManager.reset(metric);
         this.getSettings().sync(settings);
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/MessageCacheObserverImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/MessageCacheObserverImpl.java
deleted file mode 100644
index b5802da..0000000
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/MessageCacheObserverImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.impl.consumer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.java.metrics.MessageCacheObserver;
-import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-
-public class MessageCacheObserverImpl implements MessageCacheObserver {
-    private final ConcurrentMap<MessageQueueImpl, ProcessQueue> 
processQueueTable;
-
-    public MessageCacheObserverImpl(ConcurrentMap<MessageQueueImpl, 
ProcessQueue> processQueueTable) {
-        this.processQueueTable = processQueueTable;
-    }
-
-    @Override
-    public Map<String, Long> getCachedMessageCount() {
-        Map<String, Long> cachedMessageCountMap = new HashMap<>();
-        for (ProcessQueue pq : processQueueTable.values()) {
-            final String topic = pq.getMessageQueue().getTopic();
-            long count = cachedMessageCountMap.containsKey(topic) ? 
cachedMessageCountMap.get(topic) : 0;
-            count += pq.getInflightMessageCount();
-            count += pq.getPendingMessageCount();
-            cachedMessageCountMap.put(topic, count);
-        }
-        return cachedMessageCountMap;
-    }
-
-    @Override
-    public Map<String, Long> getCachedMessageBytes() {
-        Map<String, Long> cachedMessageBytesMap = new HashMap<>();
-        for (ProcessQueue pq : processQueueTable.values()) {
-            final String topic = pq.getMessageQueue().getTopic();
-            long bytes = cachedMessageBytesMap.containsKey(topic) ? 
cachedMessageBytesMap.get(topic) : 0;
-            bytes += pq.getCachedMessageBytes();
-            cachedMessageBytesMap.put(topic, bytes);
-        }
-        return cachedMessageBytesMap;
-    }
-}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueGaugeObserver.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueGaugeObserver.java
new file mode 100644
index 0000000..5dda367
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueGaugeObserver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.java.metrics.GaugeEnum;
+import org.apache.rocketmq.client.java.metrics.GaugeObserver;
+import org.apache.rocketmq.client.java.metrics.MetricLabels;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+
+public class ProcessQueueGaugeObserver implements GaugeObserver {
+    private final ConcurrentMap<MessageQueueImpl, ProcessQueue> 
processQueueTable;
+    private final String clientId;
+    private final String consumerGroup;
+    private final List<GaugeEnum> gauges;
+
+    public ProcessQueueGaugeObserver(ConcurrentMap<MessageQueueImpl, 
ProcessQueue> processQueueTable, String clientId,
+        String consumerGroup) {
+        this.processQueueTable = processQueueTable;
+        this.clientId = clientId;
+        this.consumerGroup = consumerGroup;
+        this.gauges = new ArrayList<>();
+        gauges.add(GaugeEnum.CONSUMER_CACHED_MESSAGES);
+        gauges.add(GaugeEnum.CONSUMER_CACHED_BYTES);
+    }
+
+    @Override
+    public List<GaugeEnum> getGauges() {
+        return gauges;
+    }
+
+    @Override
+    public Map<Attributes, Double> getValues(GaugeEnum gauge) {
+        switch (gauge) {
+            case CONSUMER_CACHED_MESSAGES:
+                Map<Attributes, Double> cachedMessageCountMap = new 
HashMap<>();
+                for (ProcessQueue pq : processQueueTable.values()) {
+                    final String topic = pq.getMessageQueue().getTopic();
+                    Attributes attributes = Attributes.builder()
+                        .put(MetricLabels.TOPIC, topic)
+                        .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+                        .put(MetricLabels.CLIENT_ID, clientId)
+                        .build();
+                    double count = 
cachedMessageCountMap.containsKey(attributes) ?
+                        cachedMessageCountMap.get(attributes) : 0;
+                    count += pq.getInflightMessageCount();
+                    count += pq.getPendingMessageCount();
+                    cachedMessageCountMap.put(attributes, count);
+                }
+                return cachedMessageCountMap;
+            case CONSUMER_CACHED_BYTES:
+                Map<Attributes, Double> cachedMessageBytesMap = new 
HashMap<>();
+                for (ProcessQueue pq : processQueueTable.values()) {
+                    final String topic = pq.getMessageQueue().getTopic();
+                    Attributes attributes = Attributes.builder()
+                        .put(MetricLabels.TOPIC, topic)
+                        .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+                        .put(MetricLabels.CLIENT_ID, clientId)
+                        .build();
+                    double bytes = 
cachedMessageBytesMap.containsKey(attributes) ?
+                        cachedMessageBytesMap.get(attributes) : 0;
+                    bytes += pq.getCachedMessageBytes();
+                    cachedMessageBytesMap.put(attributes, bytes);
+                }
+                return cachedMessageBytesMap;
+            default:
+                return new HashMap<>();
+        }
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index d4c398d..7682071 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -67,7 +67,7 @@ import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
-import org.apache.rocketmq.client.java.metrics.MessageCacheObserver;
+import org.apache.rocketmq.client.java.metrics.GaugeObserver;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
@@ -142,8 +142,6 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         this.consumptionErrorQuantity = new AtomicLong(0);
 
         this.processQueueTable = new ConcurrentHashMap<>();
-        MessageCacheObserver messageCacheObserver = new 
MessageCacheObserverImpl(processQueueTable);
-        this.clientMeterProvider.setMessageCacheObserver(messageCacheObserver);
 
         this.consumptionExecutor = new ThreadPoolExecutor(
             consumptionThreadCount,
@@ -158,6 +156,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     protected void startUp() throws Exception {
         try {
             LOGGER.info("Begin to start the rocketmq push consumer, 
clientId={}", clientId);
+            GaugeObserver gaugeObserver = new 
ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
+            this.clientMeterManager.setGaugeObserver(gaugeObserver);
             super.startUp();
             final ScheduledExecutorService scheduler = 
this.getClientManager().getScheduler();
             this.consumeService = createConsumeService();
@@ -185,8 +185,8 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
             scanAssignmentsFuture.cancel(false);
         }
         super.shutDown();
-        consumeService.stopAsync().awaitTerminated();
-        consumptionExecutor.shutdown();
+        this.consumeService.stopAsync().awaitTerminated();
+        this.consumptionExecutor.shutdown();
         ExecutorServices.awaitTerminated(consumptionExecutor);
         LOGGER.info("Shutdown the rocketmq push consumer successfully, 
clientId={}", clientId);
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
index e131471..857e51c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
@@ -21,10 +21,11 @@ import ch.qos.logback.core.ConsoleAppender;
 
 public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
     public static final String ENABLE_CONSOLE_APPENDER_KEY = 
"mq.consoleAppender.enabled";
-    private final boolean enabled = 
Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
-        Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+    private final boolean enabled;
 
     public CustomConsoleAppender() {
+        this.enabled = 
Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
+            
Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
     }
 
     protected void append(E eventObject) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index 1afbd0a..53cdb09 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -24,7 +24,6 @@ import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.DoubleHistogram;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -68,16 +67,6 @@ public class ClientMeter {
         return enabled;
     }
 
-    public Endpoints getEndpoints() {
-        return endpoints;
-    }
-
-    Optional<DoubleHistogram> getHistogramByEnum(HistogramEnum histogramEnum) {
-        final DoubleHistogram histogram = 
histogramMap.computeIfAbsent(histogramEnum.getName(), name -> enabled ?
-            meter.histogramBuilder(histogramEnum.getName()).build() : null);
-        return null == histogram ? Optional.empty() : Optional.of(histogram);
-    }
-
     public void record(HistogramEnum histogramEnum, Attributes attributes, 
double value) {
         final DoubleHistogram histogram = 
histogramMap.computeIfAbsent(histogramEnum.getName(), name -> enabled ?
             meter.histogramBuilder(histogramEnum.getName()).build() : null);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
similarity index 72%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
index 5af1aa1..683ebc1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import io.grpc.ManagedChannel;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -36,33 +38,33 @@ import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
 import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ClientMeterProvider {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientMeterProvider.class);
+public class ClientMeterManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientMeterManager.class);
 
     private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = 
Duration.ofSeconds(3);
     private static final Duration METRIC_READER_INTERVAL = 
Duration.ofMinutes(1);
     private static final String METRIC_INSTRUMENTATION_NAME = 
"org.apache.rocketmq.message";
 
-    private final ClientImpl client;
+    private final String clientId;
+    private final ClientConfiguration clientConfiguration;
     private volatile ClientMeter clientMeter;
-    private volatile MessageCacheObserver messageCacheObserver;
+    private volatile GaugeObserver gaugeObserver = GaugeObserver.EMPTY;
 
-    public ClientMeterProvider(ClientImpl client) {
-        this.client = client;
-        this.clientMeter = ClientMeter.disabledInstance(client.clientId());
-        this.messageCacheObserver = null;
+    public ClientMeterManager(String clientId, ClientConfiguration 
clientConfiguration) {
+        this.clientId = clientId;
+        this.clientConfiguration = clientConfiguration;
+        this.clientMeter = ClientMeter.disabledInstance(clientId);
     }
 
-    public void setMessageCacheObserver(MessageCacheObserver 
messageCacheObserver) {
-        this.messageCacheObserver = messageCacheObserver;
+    public void setGaugeObserver(GaugeObserver gaugeObserver) {
+        this.gaugeObserver = checkNotNull(gaugeObserver, "gaugeObserver should 
not be null");
     }
 
     public void record(HistogramEnum histogramEnum, Attributes attributes, 
double value) {
@@ -71,7 +73,6 @@ public class ClientMeterProvider {
 
     @SuppressWarnings("deprecation")
     public synchronized void reset(Metric metric) {
-        final String clientId = client.clientId();
         try {
             if (clientMeter.satisfy(metric)) {
                 LOGGER.info("Metric settings is satisfied by the current 
message meter, clientId={}", clientId);
@@ -80,14 +81,14 @@ public class ClientMeterProvider {
             if (!metric.isOn()) {
                 LOGGER.info("Metric is off, clientId={}", clientId);
                 clientMeter.shutdown();
-                clientMeter = ClientMeter.disabledInstance(client.clientId());
+                clientMeter = ClientMeter.disabledInstance(clientId);
                 return;
             }
             final Endpoints endpoints = metric.getEndpoints();
             final SslContext sslContext = 
GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
                 .build();
             final NettyChannelBuilder channelBuilder = 
NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
-                .sslContext(sslContext).intercept(new 
AuthInterceptor(client.getClientConfiguration(), clientId));
+                .sslContext(sslContext).intercept(new 
AuthInterceptor(clientConfiguration, clientId));
             final List<InetSocketAddress> socketAddresses = 
endpoints.toSocketAddresses();
             if (null != socketAddresses) {
                 IpNameResolverFactory metricResolverFactory = new 
IpNameResolverFactory(socketAddresses);
@@ -99,9 +100,9 @@ public class ClientMeterProvider {
                 .build();
 
             InstrumentSelector sendSuccessCostTimeInstrumentSelector = 
InstrumentSelector.builder()
-                
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.SEND_SUCCESS_COST_TIME.getName()).build();
+                
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.SEND_COST_TIME.getName()).build();
             final View sendSuccessCostTimeView = View.builder()
-                
.setAggregation(HistogramEnum.SEND_SUCCESS_COST_TIME.getBucket()).build();
+                
.setAggregation(HistogramEnum.SEND_COST_TIME.getBucket()).build();
 
             InstrumentSelector deliveryLatencyInstrumentSelector = 
InstrumentSelector.builder()
                 
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.DELIVERY_LATENCY.getName()).build();
@@ -137,33 +138,20 @@ public class ClientMeterProvider {
             existedClientMeter.shutdown();
             LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints, 
clientId);
 
-            if (!(client instanceof PushConsumer)) {
-                // No need for producer and simple consumer.
-                return;
+            final List<GaugeEnum> gauges = gaugeObserver.getGauges();
+            for (GaugeEnum gauge : gauges) {
+                
meter.gaugeBuilder(gauge.getName()).buildWithCallback(measurement -> {
+                    final Map<Attributes, Double> map = 
gaugeObserver.getValues(gauge);
+                    if (map.isEmpty()) {
+                        return;
+                    }
+                    for (Map.Entry<Attributes, Double> entry : map.entrySet()) 
{
+                        final Attributes attributes = entry.getKey();
+                        final Double value = entry.getValue();
+                        measurement.record(value, attributes);
+                    }
+                });
             }
-            final String consumerGroup = ((PushConsumer) 
client).getConsumerGroup();
-            
meter.gaugeBuilder(GaugeEnum.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement
 -> {
-                final Map<String, Long> cachedMessageCountMap = 
messageCacheObserver.getCachedMessageCount();
-                for (Map.Entry<String, Long> entry : 
cachedMessageCountMap.entrySet()) {
-                    final String topic = entry.getKey();
-                    Attributes attributes = Attributes.builder()
-                        .put(MetricLabels.TOPIC, topic)
-                        .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-                        .put(MetricLabels.CLIENT_ID, clientId).build();
-                    measurement.record(entry.getValue(), attributes);
-                }
-            });
-            
meter.gaugeBuilder(GaugeEnum.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement
 -> {
-                final Map<String, Long> cachedMessageBytesMap = 
messageCacheObserver.getCachedMessageBytes();
-                for (Map.Entry<String, Long> entry : 
cachedMessageBytesMap.entrySet()) {
-                    final String topic = entry.getKey();
-                    Attributes attributes = Attributes.builder()
-                        .put(MetricLabels.TOPIC, topic)
-                        .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-                        .put(MetricLabels.CLIENT_ID, clientId).build();
-                    measurement.record(entry.getValue(), attributes);
-                }
-            });
         } catch (Throwable t) {
             LOGGER.error("Exception raised when resetting message meter, 
clientId={}", clientId, t);
         }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/EmptyGaugeObserver.java
similarity index 59%
copy from 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
copy to 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/EmptyGaugeObserver.java
index 00c65c5..ca6de8a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/EmptyGaugeObserver.java
@@ -17,24 +17,21 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 
-/**
- * The observer which could records the count and memory footprint of cached 
message in {@link PushConsumer}.
- */
-public interface MessageCacheObserver {
-    /**
-     * Get the cached message count for each topic.
-     *
-     * @return the cached message count map.
-     */
-    Map<String /* topic */, Long> getCachedMessageCount();
+public class EmptyGaugeObserver implements GaugeObserver {
+    @Override
+    public List<GaugeEnum> getGauges() {
+        return new ArrayList<>();
+    }
 
-    /**
-     * Get the cached message memory footprint for each topic.
-     *
-     * @return the cached message footprint map.
-     */
-    Map<String /* topic */, Long> getCachedMessageBytes();
+    @Override
+    public Map<Attributes, Double> getValues(GaugeEnum gauge) {
+        return new HashMap<>();
+    }
 }
+
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeObserver.java
similarity index 59%
rename from 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
rename to 
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeObserver.java
index 00c65c5..a6e653a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeObserver.java
@@ -17,24 +17,14 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
 import java.util.Map;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 
-/**
- * The observer which could records the count and memory footprint of cached 
message in {@link PushConsumer}.
- */
-public interface MessageCacheObserver {
-    /**
-     * Get the cached message count for each topic.
-     *
-     * @return the cached message count map.
-     */
-    Map<String /* topic */, Long> getCachedMessageCount();
+public interface GaugeObserver {
+    GaugeObserver EMPTY = new EmptyGaugeObserver();
+
+    List<GaugeEnum> getGauges();
 
-    /**
-     * Get the cached message memory footprint for each topic.
-     *
-     * @return the cached message footprint map.
-     */
-    Map<String /* topic */, Long> getCachedMessageBytes();
+    Map<Attributes, Double> getValues(GaugeEnum gauge);
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
index 0fd1d07..61a72fd 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
@@ -28,7 +28,7 @@ public enum HistogramEnum {
      *
      * <p>The time unit of bucket is milliseconds.
      */
-    SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time", 
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+    SEND_COST_TIME("rocketmq_send_cost_time", 
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
         10.0, 20.0, 50.0, 200.0, 500.0))),
 
     /**
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
index 4c2132d..ee0dce3 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandler.java
@@ -30,34 +30,32 @@ import org.apache.rocketmq.client.java.hook.MessageHandler;
 import org.apache.rocketmq.client.java.hook.MessageHandlerContext;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
-import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.impl.Client;
 import org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MessageMeterHandler implements MessageHandler {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageMeterHandler.class);
+    static final AttributeKey<Stopwatch> SEND_STOPWATCH_KEY = 
AttributeKey.create("send_stopwatch");
+    static final AttributeKey<Stopwatch> CONSUME_STOPWATCH_KEY = 
AttributeKey.create("consume_stopwatch");
 
-    private static final AttributeKey<Stopwatch> MESSAGE_SEND_STOPWATCH_KEY = 
AttributeKey.create(
-        "message_send_stopwatch");
-    private static final AttributeKey<Stopwatch> MESSAGE_CONSUME_STOPWATCH_KEY 
= AttributeKey.create(
-        "message_consume_stopwatch");
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageMeterHandler.class);
 
-    private final ClientImpl client;
-    private final ClientMeterProvider meterProvider;
+    private final Client client;
+    private final ClientMeterManager meterManager;
 
-    public MessageMeterHandler(ClientImpl client, ClientMeterProvider 
meterProvider) {
+    public MessageMeterHandler(Client client, ClientMeterManager meterManager) 
{
         this.client = client;
-        this.meterProvider = meterProvider;
+        this.meterManager = meterManager;
     }
 
     private void doBeforeSendMessage(MessageHandlerContext context) {
         // Record the time before sending message.
-        context.putAttribute(MESSAGE_SEND_STOPWATCH_KEY, 
Attribute.create(Stopwatch.createStarted()));
+        context.putAttribute(SEND_STOPWATCH_KEY, 
Attribute.create(Stopwatch.createStarted()));
     }
 
     private void doAfterSendMessage(MessageHandlerContext context, 
List<GeneralMessage> messages) {
-        final Attribute<Stopwatch> stopwatchAttr = 
context.getAttribute(MESSAGE_SEND_STOPWATCH_KEY);
+        final Attribute<Stopwatch> stopwatchAttr = 
context.getAttribute(SEND_STOPWATCH_KEY);
         if (null == stopwatchAttr) {
             // Should never reach here.
             return;
@@ -69,7 +67,7 @@ public class MessageMeterHandler implements MessageHandler {
             Attributes attributes = 
Attributes.builder().put(MetricLabels.TOPIC, message.getTopic())
                 .put(MetricLabels.CLIENT_ID, client.clientId())
                 .put(MetricLabels.INVOCATION_STATUS, status.getName()).build();
-            meterProvider.record(HistogramEnum.SEND_SUCCESS_COST_TIME, 
attributes, duration.toMillis());
+            meterManager.record(HistogramEnum.SEND_COST_TIME, attributes, 
duration.toMillis());
         }
     }
 
@@ -103,10 +101,14 @@ public class MessageMeterHandler implements 
MessageHandler {
         }
         final Attributes attributes = 
Attributes.builder().put(MetricLabels.TOPIC, message.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, 
consumerGroup).put(MetricLabels.CLIENT_ID, client.clientId()).build();
-        meterProvider.record(HistogramEnum.DELIVERY_LATENCY, attributes, 
latency);
+        meterManager.record(HistogramEnum.DELIVERY_LATENCY, attributes, 
latency);
     }
 
     private void doBeforeConsumeMessage(MessageHandlerContext context, 
List<GeneralMessage> messages) {
+        if (messages.isEmpty()) {
+            // Should never reach here.
+            return;
+        }
         String consumerGroup = null;
         if (client instanceof PushConsumer) {
             consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -125,9 +127,9 @@ public class MessageMeterHandler implements MessageHandler {
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
             .put(MetricLabels.CLIENT_ID, client.clientId()).build();
         final long latency = System.currentTimeMillis() - decodeTimestamp;
-        meterProvider.record(HistogramEnum.AWAIT_TIME, attributes, latency);
+        meterManager.record(HistogramEnum.AWAIT_TIME, attributes, latency);
         // Record the time before consuming message.
-        context.putAttribute(MESSAGE_CONSUME_STOPWATCH_KEY, 
Attribute.create(Stopwatch.createStarted()));
+        context.putAttribute(CONSUME_STOPWATCH_KEY, 
Attribute.create(Stopwatch.createStarted()));
     }
 
     private void doAfterConsumeMessage(MessageHandlerContext context, 
List<GeneralMessage> messages) {
@@ -136,7 +138,7 @@ public class MessageMeterHandler implements MessageHandler {
             LOGGER.error("[Bug] current client is not push consumer, 
clientId={}", client.clientId());
             return;
         }
-        final Attribute<Stopwatch> stopwatchAttr = 
context.getAttribute(MESSAGE_CONSUME_STOPWATCH_KEY);
+        final Attribute<Stopwatch> stopwatchAttr = 
context.getAttribute(CONSUME_STOPWATCH_KEY);
         if (null == stopwatchAttr) {
             // Should never reach here.
             return;
@@ -152,13 +154,13 @@ public class MessageMeterHandler implements 
MessageHandler {
                 .put(MetricLabels.INVOCATION_STATUS, 
invocationStatus.getName())
                 .build();
             final Duration duration = stopwatchAttr.get().elapsed();
-            meterProvider.record(HistogramEnum.PROCESS_TIME, attributes, 
duration.toMillis());
+            meterManager.record(HistogramEnum.PROCESS_TIME, attributes, 
duration.toMillis());
         }
     }
 
     @Override
     public void doBefore(MessageHandlerContext context, List<GeneralMessage> 
messages) {
-        if (!meterProvider.isEnabled()) {
+        if (!meterManager.isEnabled()) {
             return;
         }
         final MessageHookPoints hookPoints = context.getMessageHookPoints();
@@ -178,7 +180,7 @@ public class MessageMeterHandler implements MessageHandler {
 
     @Override
     public void doAfter(MessageHandlerContext context, List<GeneralMessage> 
messages) {
-        if (!meterProvider.isEnabled()) {
+        if (!meterManager.isEnabled()) {
             return;
         }
         final MessageHookPoints hookPoints = context.getMessageHookPoints();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppenderTest.java
similarity index 58%
copy from 
java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
copy to 
java/client/src/test/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppenderTest.java
index e131471..418ea30 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppenderTest.java
@@ -15,26 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.java.logging;
 
-import ch.qos.logback.core.ConsoleAppender;
+package org.apache.rocketmq.client.java.logging;
 
-public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
-    public static final String ENABLE_CONSOLE_APPENDER_KEY = 
"mq.consoleAppender.enabled";
-    private final boolean enabled = 
Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
-        Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-    public CustomConsoleAppender() {
-    }
+import org.junit.Test;
 
-    protected void append(E eventObject) {
-        if (this.enabled) {
-            super.append(eventObject);
-        }
+public class CustomConsoleAppenderTest {
 
-    }
+    @Test
+    public void testIsEnabled() {
+        CustomConsoleAppender<Object> appender = new CustomConsoleAppender<>();
+        assertFalse(appender.isEnabled());
 
-    public boolean isEnabled() {
-        return this.enabled;
+        System.setProperty(CustomConsoleAppender.ENABLE_CONSOLE_APPENDER_KEY, 
"true");
+        appender = new CustomConsoleAppender<>();
+        assertTrue(appender.isEnabled());
     }
-}
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterManagerTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterManagerTest.java
new file mode 100644
index 0000000..c3a6f48
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterManagerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+
+public class ClientMeterManagerTest extends TestBase {
+
+    @Test
+    public void testResetWithMetricOn() {
+        final ClientConfiguration clientConfiguration =
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        String clientId = "clientId";
+        final ClientMeterManager meterManager = new 
ClientMeterManager(clientId, clientConfiguration);
+        final Metric metric =
+            new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(fakePbEndpoints0()).build());
+        meterManager.reset(metric);
+        assertTrue(meterManager.isEnabled());
+    }
+
+    @Test
+    public void testResetWithMetricOff() {
+        final ClientConfiguration clientConfiguration =
+            
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+        String clientId = "clientId";
+        final ClientMeterManager meterManager = new 
ClientMeterManager(clientId, clientConfiguration);
+        final Metric metric =
+            new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).setEndpoints(fakePbEndpoints0()).build());
+        meterManager.reset(metric);
+        assertFalse(meterManager.isEnabled());
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterTest.java
new file mode 100644
index 0000000..177d615
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/ClientMeterTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import apache.rocketmq.v2.Endpoints;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.resources.Resource;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+
+public class ClientMeterTest extends TestBase {
+
+    @Test
+    public void testShutdownWithEnabledMeter() {
+        final SdkMeterProvider provider = 
SdkMeterProvider.builder().setResource(Resource.empty()).build();
+        final OpenTelemetrySdk openTelemetry = 
OpenTelemetrySdk.builder().setMeterProvider(provider).build();
+        Meter meter = openTelemetry.getMeter("test");
+        final ClientMeter clientMeter = new ClientMeter(meter, 
fakeEndpoints(), provider, "clientId");
+        assertTrue(clientMeter.isEnabled());
+        clientMeter.shutdown();
+    }
+
+    @Test
+    public void testShutdownWithDisabledMeter() {
+        final ClientMeter clientMeter = 
ClientMeter.disabledInstance("clientId");
+        assertFalse(clientMeter.isEnabled());
+        clientMeter.shutdown();
+    }
+
+    @Test
+    public void testSatisfy() {
+        ClientMeter clientMeter = ClientMeter.disabledInstance("clientId");
+        Metric metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).build());
+        assertTrue(clientMeter.satisfy(metric));
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).build());
+        assertTrue(clientMeter.satisfy(metric));
+
+        final Endpoints pbEndpoints0 = fakePbEndpoints0();
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).setEndpoints(pbEndpoints0).build());
+        assertTrue(clientMeter.satisfy(metric));
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(pbEndpoints0).build());
+        assertFalse(clientMeter.satisfy(metric));
+
+        final org.apache.rocketmq.client.java.route.Endpoints endpoints =
+            new org.apache.rocketmq.client.java.route.Endpoints(pbEndpoints0);
+
+        final SdkMeterProvider provider = 
SdkMeterProvider.builder().setResource(Resource.empty()).build();
+        final OpenTelemetrySdk openTelemetry = 
OpenTelemetrySdk.builder().setMeterProvider(provider).build();
+        Meter meter = openTelemetry.getMeter("test");
+        clientMeter = new ClientMeter(meter, endpoints, provider, "clientId");
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).build());
+        assertFalse(clientMeter.satisfy(metric));
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).build());
+        assertFalse(clientMeter.satisfy(metric));
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(false).setEndpoints(pbEndpoints0).build());
+        assertFalse(clientMeter.satisfy(metric));
+
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(pbEndpoints0).build());
+        assertTrue(clientMeter.satisfy(metric));
+
+        final Endpoints pbEndpoints1 = fakePbEndpoints1();
+        metric = new 
Metric(apache.rocketmq.v2.Metric.newBuilder().setOn(true).setEndpoints(pbEndpoints1).build());
+        assertFalse(clientMeter.satisfy(metric));
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
new file mode 100644
index 0000000..9137355
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/metrics/MessageMeterHandlerTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.java.hook.MessageHandlerContextImpl;
+import org.apache.rocketmq.client.java.hook.MessageHookPoints;
+import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
+import org.apache.rocketmq.client.java.impl.Client;
+import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.message.GeneralMessage;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class MessageMeterHandlerTest extends TestBase {
+
+    @Test
+    public void testSendMessageWithSuccess() {
+        final ClientImpl producer = Mockito.mock(ClientImpl.class);
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(producer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(producer, meterManager);
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        String topic = FAKE_TOPIC_0;
+        Mockito.doReturn(topic).when(message).getTopic();
+        List<GeneralMessage> messageList = new ArrayList<>();
+        messageList.add(message);
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        meterHandler.doBefore(context, messageList);
+        
assertNotNull(context.getAttributes().get(MessageMeterHandler.SEND_STOPWATCH_KEY));
+        context.setStatus(MessageHookPointsStatus.OK);
+        meterHandler.doAfter(context, messageList);
+        ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.SEND_COST_TIME), 
attributesArgumentCaptor.capture(), Mockito.anyDouble());
+        final Attributes attributes = attributesArgumentCaptor.getValue();
+        assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+        assertEquals(clientId, attributes.get(MetricLabels.CLIENT_ID));
+        assertEquals(InvocationStatus.SUCCESS.getName(), 
attributes.get(MetricLabels.INVOCATION_STATUS));
+    }
+
+    @Test
+    public void testSendMessageWithUnset() {
+        final ClientImpl producer = Mockito.mock(ClientImpl.class);
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(producer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(producer, meterManager);
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        String topic = FAKE_TOPIC_0;
+        Mockito.doReturn(topic).when(message).getTopic();
+        List<GeneralMessage> messageList = new ArrayList<>();
+        messageList.add(message);
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        meterHandler.doBefore(context, messageList);
+        context.setStatus(MessageHookPointsStatus.UNSET);
+        meterHandler.doAfter(context, messageList);
+        ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.SEND_COST_TIME), 
attributesArgumentCaptor.capture(), Mockito.anyDouble());
+        final Attributes attributes = attributesArgumentCaptor.getValue();
+        assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+        assertEquals(clientId, attributes.get(MetricLabels.CLIENT_ID));
+        assertEquals(InvocationStatus.FAILURE.getName(), 
attributes.get(MetricLabels.INVOCATION_STATUS));
+    }
+
+    @Test
+    public void testSendMessageWithFailure() {
+        final ClientImpl producer = Mockito.mock(ClientImpl.class);
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(producer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(producer, meterManager);
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        String topic = FAKE_TOPIC_0;
+        Mockito.doReturn(topic).when(message).getTopic();
+        List<GeneralMessage> messageList = new ArrayList<>();
+        messageList.add(message);
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.SEND);
+        meterHandler.doBefore(context, messageList);
+        context.setStatus(MessageHookPointsStatus.ERROR);
+        meterHandler.doAfter(context, messageList);
+        ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.SEND_COST_TIME), 
attributesArgumentCaptor.capture(), Mockito.anyDouble());
+        final Attributes attributes = attributesArgumentCaptor.getValue();
+        assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+        assertEquals(clientId, attributes.get(MetricLabels.CLIENT_ID));
+        assertEquals(InvocationStatus.FAILURE.getName(), 
attributes.get(MetricLabels.INVOCATION_STATUS));
+    }
+
+    interface MyPushConsumer extends Client, PushConsumer {
+    }
+
+    interface MySimpleConsumer extends Client, SimpleConsumer {
+    }
+
+    @Test
+    public void testReceiveMessageForPushConsumer() {
+        final MyPushConsumer pushConsumer = Mockito.mock(MyPushConsumer.class);
+        String consumerGroup = FAKE_CONSUMER_GROUP_0;
+        Mockito.doReturn(consumerGroup).when(pushConsumer).getConsumerGroup();
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(pushConsumer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(pushConsumer, meterManager);
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        String topic = FAKE_TOPIC_0;
+        Mockito.doReturn(topic).when(message).getTopic();
+        List<GeneralMessage> messageList = new ArrayList<>();
+        messageList.add(message);
+        final Optional<Long> optionalTransportDeliveryTimestamp = 
Optional.of(System.currentTimeMillis());
+        
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+        meterHandler.doAfter(context, messageList);
+        ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.DELIVERY_LATENCY), 
attributesArgumentCaptor.capture(),
+                Mockito.anyDouble());
+        final Attributes attributes = attributesArgumentCaptor.getValue();
+        assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+        assertEquals(consumerGroup, 
attributes.get(MetricLabels.CONSUMER_GROUP));
+        assertNotNull(attributes.get(MetricLabels.CLIENT_ID));
+    }
+
+    @Test
+    public void testReceiveMessageForPushConsumerWithEmptyMessage() {
+        final MyPushConsumer pushConsumer = Mockito.mock(MyPushConsumer.class);
+        
Mockito.doReturn(FAKE_CONSUMER_GROUP_0).when(pushConsumer).getConsumerGroup();
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(pushConsumer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(pushConsumer, meterManager);
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        Mockito.doReturn(FAKE_TOPIC_0).when(message).getTopic();
+        List<GeneralMessage> messageList = new ArrayList<>();
+        final Optional<Long> optionalTransportDeliveryTimestamp = 
Optional.of(System.currentTimeMillis());
+        
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+        meterHandler.doAfter(context, messageList);
+        Mockito.verify(meterManager, Mockito.never())
+            .record(any(HistogramEnum.class), any(Attributes.class), 
Mockito.anyDouble());
+    }
+
+    @Test
+    public void testReceiveMessageForSimpleConsumer() {
+        final MySimpleConsumer simpleConsumer = 
Mockito.mock(MySimpleConsumer.class);
+        String consumerGroup = FAKE_CONSUMER_GROUP_0;
+        
Mockito.doReturn(consumerGroup).when(simpleConsumer).getConsumerGroup();
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(simpleConsumer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(simpleConsumer, meterManager);
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        String topic = FAKE_TOPIC_0;
+        Mockito.doReturn(topic).when(message).getTopic();
+        List<GeneralMessage> messageList = new ArrayList<>();
+        messageList.add(message);
+        final Optional<Long> optionalTransportDeliveryTimestamp = 
Optional.of(System.currentTimeMillis());
+        
Mockito.doReturn(optionalTransportDeliveryTimestamp).when(message).getTransportDeliveryTimestamp();
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.RECEIVE);
+        meterHandler.doAfter(context, messageList);
+        ArgumentCaptor<Attributes> attributesArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.DELIVERY_LATENCY), 
attributesArgumentCaptor.capture(),
+                Mockito.anyDouble());
+        final Attributes attributes = attributesArgumentCaptor.getValue();
+        assertEquals(topic, attributes.get(MetricLabels.TOPIC));
+        assertEquals(consumerGroup, 
attributes.get(MetricLabels.CONSUMER_GROUP));
+        assertNotNull(attributes.get(MetricLabels.CLIENT_ID));
+    }
+
+    @Test
+    public void testConsumeMessage() {
+        final MyPushConsumer pushConsumer = Mockito.mock(MyPushConsumer.class);
+        String consumerGroup = FAKE_CONSUMER_GROUP_0;
+        Mockito.doReturn(consumerGroup).when(pushConsumer).getConsumerGroup();
+        final ClientMeterManager meterManager = 
Mockito.mock(ClientMeterManager.class);
+        Mockito.doReturn(true).when(meterManager).isEnabled();
+        String clientId = "clientId";
+        Mockito.doReturn(clientId).when(pushConsumer).clientId();
+        final MessageMeterHandler meterHandler = new 
MessageMeterHandler(pushConsumer, meterManager);
+        List<GeneralMessage> generalMessages = new ArrayList<>();
+        final GeneralMessage message = Mockito.mock(GeneralMessage.class);
+        generalMessages.add(message);
+        String topic = FAKE_TOPIC_0;
+        Mockito.doReturn(topic).when(message).getTopic();
+        long awaitTimeMills = 3000;
+        long decodeTimestamp = System.currentTimeMillis() - awaitTimeMills;
+        
Mockito.doReturn(Optional.of(decodeTimestamp)).when(message).getDecodeTimestamp();
+        final MessageHandlerContextImpl context = new 
MessageHandlerContextImpl(MessageHookPoints.CONSUME);
+        meterHandler.doBefore(context, generalMessages);
+        ArgumentCaptor<Attributes> attributes0ArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        final ArgumentCaptor<Double> awaitTimeArgumentCaptor = 
ArgumentCaptor.forClass(Double.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.AWAIT_TIME), 
attributes0ArgumentCaptor.capture(),
+                awaitTimeArgumentCaptor.capture());
+        assertEquals(awaitTimeMills, awaitTimeArgumentCaptor.getValue(), 100);
+        final Attributes attributes0 = attributes0ArgumentCaptor.getValue();
+        assertEquals(topic, attributes0.get(MetricLabels.TOPIC));
+        assertEquals(consumerGroup, 
attributes0.get(MetricLabels.CONSUMER_GROUP));
+        assertEquals(clientId, attributes0.get(MetricLabels.CLIENT_ID));
+        
assertNotNull(context.getAttributes().get(MessageMeterHandler.CONSUME_STOPWATCH_KEY));
+
+        context.setStatus(MessageHookPointsStatus.OK);
+        meterHandler.doAfter(context, generalMessages);
+        ArgumentCaptor<Attributes> attributes1ArgumentCaptor = 
ArgumentCaptor.forClass(Attributes.class);
+        Mockito.verify(meterManager, Mockito.times(1))
+            .record(Mockito.eq(HistogramEnum.PROCESS_TIME), 
attributes1ArgumentCaptor.capture(),
+                awaitTimeArgumentCaptor.capture());
+        final Attributes attributes1 = attributes1ArgumentCaptor.getValue();
+        assertEquals(topic, attributes1.get(MetricLabels.TOPIC));
+        assertEquals(consumerGroup, 
attributes1.get(MetricLabels.CONSUMER_GROUP));
+        assertEquals(clientId, attributes1.get(MetricLabels.CLIENT_ID));
+        assertEquals(InvocationStatus.SUCCESS.getName(), 
attributes1.get(MetricLabels.INVOCATION_STATUS));
+    }
+}
\ No newline at end of file

Reply via email to