This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a10a56f  GEODE-9074: Added update of messageQueueSize at putting 
message to qu… (#6445)
a10a56f is described below

commit a10a56f573e9a4aabf3c5b6e3862539d6dd93b92
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Tue Aug 17 22:12:42 2021 +0200

    GEODE-9074: Added update of messageQueueSize at putting message to qu… 
(#6445)
    
    * GEODE-9074: Added update of messageQueueSize at putting message to queue.
    Also added new statistics messagesBeingQueuedInProgress and 
messagesBeingQueuedTime.
---
 .../cache/tier/sockets/CacheClientProxyTest.java   | 61 +++++++++++++++++++
 .../cache/tier/sockets/CacheClientProxyStats.java  | 70 +++++++++++++++++++++-
 .../cache/tier/sockets/MessageDispatcher.java      |  7 +++
 geode-docs/reference/statistics_list.html.md.erb   |  1 +
 4 files changed, 138 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 079c54e..ca7acf0 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -15,6 +15,7 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
@@ -28,12 +29,20 @@ import static org.mockito.Mockito.when;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.ha.HAContainerMap;
+import org.apache.geode.internal.cache.ha.HAContainerWrapper;
 import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.statistics.StatisticsClock;
@@ -93,4 +102,56 @@ public class CacheClientProxyTest {
       closeSocketShouldBeAtomic();
     }
   }
+
+  @Test
+  public void checkQueueingStats() {
+    final CacheServerStats stats = mock(CacheServerStats.class);
+    doNothing().when(stats).incCurrentQueueConnections();
+
+    final InternalCache cache = serverRule.getCache();
+
+    final CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+    final SocketCloser sc = mock(SocketCloser.class);
+    when(ccn.getCache()).thenReturn(cache);
+    when(ccn.getAcceptorStats()).thenReturn(stats);
+    when(ccn.getSocketCloser()).thenReturn(sc);
+    final HAContainerWrapper haContainer = new HAContainerMap(new 
ConcurrentHashMap<>());
+    when(ccn.getHaContainer()).thenReturn(haContainer);
+
+    final Socket socket = mock(Socket.class);
+    final InetAddress address = mock(InetAddress.class);
+    when(socket.getInetAddress()).thenReturn(address);
+    when(address.getHostAddress()).thenReturn("localhost");
+    doNothing().when(sc).asyncClose(any(), eq("localhost"), 
any(Runnable.class));
+
+    final ClientProxyMembershipID proxyID = 
mock(ClientProxyMembershipID.class);
+    final DistributedMember member = 
cache.getDistributedSystem().getDistributedMember();
+    when(proxyID.getDistributedMember()).thenReturn(member);
+    final String regionName = "region/test";
+    when(proxyID.getHARegionName()).thenReturn(regionName);
+
+    CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true,
+        Handshake.CONFLATION_DEFAULT, KnownVersion.CURRENT, 1L, true,
+        null, null, mock(StatisticsClock.class));
+
+    Region dataRegion = createDataRegion();
+    proxy.initializeMessageDispatcher();
+    ClientUpdateMessage clientUpdateMessageImpl1 =
+        new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+            (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, 
null,
+            new ClientProxyMembershipID(), new 
EventID(cache.getDistributedSystem()));
+    ClientUpdateMessage clientUpdateMessageImpl2 =
+        new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+            (LocalRegion) dataRegion, "key1", "value1".getBytes(), (byte) 
0x01, null,
+            new ClientProxyMembershipID(), new 
EventID(cache.getDistributedSystem()));
+    proxy._messageDispatcher.enqueueMessage(clientUpdateMessageImpl1);
+    proxy._messageDispatcher.enqueueMessage(clientUpdateMessageImpl2);
+
+    assertThat(proxy.getStatistics().getMessageQueueSize()).isEqualTo(2);
+
+  }
+
+  private Region createDataRegion() {
+    return 
serverRule.getCache().createRegionFactory(RegionShortcut.REPLICATE).create("data");
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
index 4337ce9..9802ba4 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
@@ -61,6 +61,11 @@ public class CacheClientProxyStats implements MessageStats {
   /** Name of the CQ count statistic */
   private static final String CQ_COUNT = "cqCount";
 
+  /** Name of the messages currently being queued statistic */
+  private static final String MESSAGES_BEING_QUEUED_IN_PROGRESS = 
"messagesBeingQueuedInProgress";
+  /** Name of the messages queueing time statistic */
+  private static final String MESSAGES_BEING_QUEUED_TIME = 
"messagesBeingQueuedTime";
+
   /** Id of the messages received statistic */
   private static final int _messagesReceivedId;
   /** Id of the messages queued statistic */
@@ -85,6 +90,12 @@ public class CacheClientProxyStats implements MessageStats {
   private static final int _cqCountId;
   private static final int _sentBytesId;
 
+  /** Id of the messages currently being queued statistic */
+  private static final int _messagesBeingQueuedInProgressId;
+  /** Id of the messages queueing time statistic */
+  private static final int _messagesBeingQueuedTimeId;
+
+
   /*
    * Static initializer to create and initialize the 
<code>StatisticsType</code>
    */
@@ -128,7 +139,14 @@ public class CacheClientProxyStats implements MessageStats 
{
             "operations"),
 
         f.createLongCounter(CQ_COUNT, "Number of CQs on the client.", 
"operations"),
-        f.createLongCounter("sentBytes", "Total number of bytes sent to 
client.", "bytes"),});
+        f.createLongCounter("sentBytes", "Total number of bytes sent to 
client.", "bytes"),
+
+        f.createLongGauge(MESSAGES_BEING_QUEUED_IN_PROGRESS,
+            "Threads currently adding a message to the queue. Consistently 
high values indicate that the queue is full and adds are being delayed.",
+            "threads"),
+        f.createLongCounter(MESSAGES_BEING_QUEUED_TIME,
+            "Total time spent while message is put in queue.", "nanoseconds"),
+    });
 
     // Initialize id fields
     _messagesReceivedId = _type.nameToId(MESSAGES_RECEIVED);
@@ -143,6 +161,9 @@ public class CacheClientProxyStats implements MessageStats {
     _deltaFullMessagesSentId = _type.nameToId(DELTA_FULL_MESSAGES_SENT);
     _cqCountId = _type.nameToId(CQ_COUNT);
     _sentBytesId = _type.nameToId("sentBytes");
+    _messagesBeingQueuedInProgressId = 
_type.nameToId(MESSAGES_BEING_QUEUED_IN_PROGRESS);
+    _messagesBeingQueuedTimeId = _type.nameToId(MESSAGES_BEING_QUEUED_TIME);
+
   }
 
   ////////////////////// Instance Fields //////////////////////
@@ -272,6 +293,25 @@ public class CacheClientProxyStats implements MessageStats 
{
   }
 
   /**
+   * Returns the current value of the "_messagesBeingQueuedInProgress" stat.
+   *
+   * @return the current value of the "_messagesBeingQueuedInProgress" stat
+   */
+  public long getMessagesBeingQueuedInProgress() {
+    return this._stats.getLong(_messagesBeingQueuedInProgressId);
+  }
+
+  /**
+   * Returns the current value of the "_messagesBeingQueuedTime" stat.
+   *
+   * @return the current value of the "_messagesBeingQueuedTime" stat
+   */
+  public long getMessagesBeingQueuedTime() {
+    return this._stats.getLong(_messagesBeingQueuedTimeId);
+  }
+
+
+  /**
    * Increments the "messagesReceived" stat.
    */
   public void incMessagesReceived() {
@@ -314,6 +354,13 @@ public class CacheClientProxyStats implements MessageStats 
{
   }
 
   /**
+   * Increments the "messagesBeingQueuedInProgress" stat.
+   */
+  public void incMessagesBeingQueuedInProgress() {
+    this._stats.incLong(_messagesBeingQueuedInProgressId, 1);
+  }
+
+  /**
    * Decrements the "cqCount" stat.
    */
   public void decCqCount() {
@@ -321,6 +368,13 @@ public class CacheClientProxyStats implements MessageStats 
{
   }
 
   /**
+   * Decrements the "messagesBeingQueuedInProgress" stat.
+   */
+  public void decMessagesBeingQueuedInProgress() {
+    this._stats.incLong(_messagesBeingQueuedInProgressId, -1);
+  }
+
+  /**
    * Sets the "messageQueueSize" stat.
    *
    * @param size The size of the queue
@@ -388,4 +442,18 @@ public class CacheClientProxyStats implements MessageStats 
{
   public void decMessagesBeingReceived(int bytes) {
     // noop since we never receive
   }
+
+  public long startMessageQueueStats() {
+    incMessagesBeingQueuedInProgress();
+    return startTime();
+  }
+
+  public void endMessageQueueStats(long startTime) {
+    long ts = DistributionStats.getStatTime();
+
+    decMessagesBeingQueuedInProgress();
+
+    long elapsed = ts - startTime;
+    this._stats.incLong(_messagesBeingQueuedTimeId, elapsed);
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index c062530..d00b9a2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -633,7 +633,11 @@ public class MessageDispatcher extends LoggingThread {
    */
   protected void enqueueMessage(Conflatable clientMessage) {
     try {
+      long startTime = _proxy._statistics.startMessageQueueStats();
       _messageQueue.put(clientMessage);
+      _proxy._statistics.endMessageQueueStats(startTime);
+      _proxy._statistics.setQueueSize(_messageQueue.size());
+
       if (_proxy.isPaused() && _proxy.isDurable()) {
         
_proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
         if (logger.isDebugEnabled()) {
@@ -660,7 +664,10 @@ public class MessageDispatcher extends LoggingThread {
         logger.debug("{}: Queueing marker message. <{}>. The queue contains {} 
entries.", this,
             message, getQueueSize());
       }
+      long startTime = _proxy._statistics.startMessageQueueStats();
       _messageQueue.put(message);
+      _proxy._statistics.endMessageQueueStats(startTime);
+      _proxy._statistics.setQueueSize(_messageQueue.size());
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Queued marker message. The queue contains {} 
entries.", this,
             getQueueSize());
diff --git a/geode-docs/reference/statistics_list.html.md.erb 
b/geode-docs/reference/statistics_list.html.md.erb
index 699c392..fbbea5e 100644
--- a/geode-docs/reference/statistics_list.html.md.erb
+++ b/geode-docs/reference/statistics_list.html.md.erb
@@ -1447,6 +1447,7 @@ Statistics regarding cache server operations and cache 
server client notificatio
 | `messagesProcessed`              | Number of client operations messages 
removed from the subscription queue and sent.                                   
                                                                |
 | `messagesQueued`                 | Number of client operations messages 
added to the subscription queue.                                                
                                                                |
 | `messagesReceived`               | Number of client operations messages 
received.                                                                       
                                                                |
+| `messagesWaitingToQueue`         | Number of client operations messages 
waiting to be put in the subscription queue.                                    
                                                                                
             |
 
 ## <a id="section_3AB1C0AA55014163A2BBF68E13D25E3A" 
class="no-quick-link"></a>Server-to-Client Messaging Performance 
(ClientSubscriptionStats)
 

Reply via email to