This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a10a56f GEODE-9074: Added update of messageQueueSize at putting
message to qu… (#6445)
a10a56f is described below
commit a10a56f573e9a4aabf3c5b6e3862539d6dd93b92
Author: Mario Ivanac <[email protected]>
AuthorDate: Tue Aug 17 22:12:42 2021 +0200
GEODE-9074: Added update of messageQueueSize at putting message to qu…
(#6445)
* GEODE-9074: Added update of messageQueueSize at putting message to queue.
Also added new statistics messagesBeingQueuedInProgress and
messagesBeingQueuedTime.
---
.../cache/tier/sockets/CacheClientProxyTest.java | 61 +++++++++++++++++++
.../cache/tier/sockets/CacheClientProxyStats.java | 70 +++++++++++++++++++++-
.../cache/tier/sockets/MessageDispatcher.java | 7 +++
geode-docs/reference/statistics_list.html.md.erb | 1 +
4 files changed, 138 insertions(+), 1 deletion(-)
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 079c54e..ca7acf0 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -15,6 +15,7 @@
package org.apache.geode.internal.cache.tier.sockets;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
@@ -28,12 +29,20 @@ import static org.mockito.Mockito.when;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.ha.HAContainerMap;
+import org.apache.geode.internal.cache.ha.HAContainerWrapper;
import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
@@ -93,4 +102,56 @@ public class CacheClientProxyTest {
closeSocketShouldBeAtomic();
}
}
+
+ @Test
+ public void checkQueueingStats() {
+ final CacheServerStats stats = mock(CacheServerStats.class);
+ doNothing().when(stats).incCurrentQueueConnections();
+
+ final InternalCache cache = serverRule.getCache();
+
+ final CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+ final SocketCloser sc = mock(SocketCloser.class);
+ when(ccn.getCache()).thenReturn(cache);
+ when(ccn.getAcceptorStats()).thenReturn(stats);
+ when(ccn.getSocketCloser()).thenReturn(sc);
+ final HAContainerWrapper haContainer = new HAContainerMap(new
ConcurrentHashMap<>());
+ when(ccn.getHaContainer()).thenReturn(haContainer);
+
+ final Socket socket = mock(Socket.class);
+ final InetAddress address = mock(InetAddress.class);
+ when(socket.getInetAddress()).thenReturn(address);
+ when(address.getHostAddress()).thenReturn("localhost");
+ doNothing().when(sc).asyncClose(any(), eq("localhost"),
any(Runnable.class));
+
+ final ClientProxyMembershipID proxyID =
mock(ClientProxyMembershipID.class);
+ final DistributedMember member =
cache.getDistributedSystem().getDistributedMember();
+ when(proxyID.getDistributedMember()).thenReturn(member);
+ final String regionName = "region/test";
+ when(proxyID.getHARegionName()).thenReturn(regionName);
+
+ CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true,
+ Handshake.CONFLATION_DEFAULT, KnownVersion.CURRENT, 1L, true,
+ null, null, mock(StatisticsClock.class));
+
+ Region dataRegion = createDataRegion();
+ proxy.initializeMessageDispatcher();
+ ClientUpdateMessage clientUpdateMessageImpl1 =
+ new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01,
null,
+ new ClientProxyMembershipID(), new
EventID(cache.getDistributedSystem()));
+ ClientUpdateMessage clientUpdateMessageImpl2 =
+ new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key1", "value1".getBytes(), (byte)
0x01, null,
+ new ClientProxyMembershipID(), new
EventID(cache.getDistributedSystem()));
+ proxy._messageDispatcher.enqueueMessage(clientUpdateMessageImpl1);
+ proxy._messageDispatcher.enqueueMessage(clientUpdateMessageImpl2);
+
+ assertThat(proxy.getStatistics().getMessageQueueSize()).isEqualTo(2);
+
+ }
+
+ private Region createDataRegion() {
+ return
serverRule.getCache().createRegionFactory(RegionShortcut.REPLICATE).create("data");
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
index 4337ce9..9802ba4 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java
@@ -61,6 +61,11 @@ public class CacheClientProxyStats implements MessageStats {
/** Name of the CQ count statistic */
private static final String CQ_COUNT = "cqCount";
+ /** Name of the messages currently being queued statistic */
+ private static final String MESSAGES_BEING_QUEUED_IN_PROGRESS =
"messagesBeingQueuedInProgress";
+ /** Name of the messages queueing time statistic */
+ private static final String MESSAGES_BEING_QUEUED_TIME =
"messagesBeingQueuedTime";
+
/** Id of the messages received statistic */
private static final int _messagesReceivedId;
/** Id of the messages queued statistic */
@@ -85,6 +90,12 @@ public class CacheClientProxyStats implements MessageStats {
private static final int _cqCountId;
private static final int _sentBytesId;
+ /** Id of the messages currently being queued statistic */
+ private static final int _messagesBeingQueuedInProgressId;
+ /** Id of the messages queueing time statistic */
+ private static final int _messagesBeingQueuedTimeId;
+
+
/*
* Static initializer to create and initialize the
<code>StatisticsType</code>
*/
@@ -128,7 +139,14 @@ public class CacheClientProxyStats implements MessageStats
{
"operations"),
f.createLongCounter(CQ_COUNT, "Number of CQs on the client.",
"operations"),
- f.createLongCounter("sentBytes", "Total number of bytes sent to
client.", "bytes"),});
+ f.createLongCounter("sentBytes", "Total number of bytes sent to
client.", "bytes"),
+
+ f.createLongGauge(MESSAGES_BEING_QUEUED_IN_PROGRESS,
+ "Threads currently adding a message to the queue. Consistently
high values indicate that the queue is full and adds are being delayed.",
+ "threads"),
+ f.createLongCounter(MESSAGES_BEING_QUEUED_TIME,
+ "Total time spent while message is put in queue.", "nanoseconds"),
+ });
// Initialize id fields
_messagesReceivedId = _type.nameToId(MESSAGES_RECEIVED);
@@ -143,6 +161,9 @@ public class CacheClientProxyStats implements MessageStats {
_deltaFullMessagesSentId = _type.nameToId(DELTA_FULL_MESSAGES_SENT);
_cqCountId = _type.nameToId(CQ_COUNT);
_sentBytesId = _type.nameToId("sentBytes");
+ _messagesBeingQueuedInProgressId =
_type.nameToId(MESSAGES_BEING_QUEUED_IN_PROGRESS);
+ _messagesBeingQueuedTimeId = _type.nameToId(MESSAGES_BEING_QUEUED_TIME);
+
}
////////////////////// Instance Fields //////////////////////
@@ -272,6 +293,25 @@ public class CacheClientProxyStats implements MessageStats
{
}
/**
+ * Returns the current value of the "_messagesBeingQueuedInProgress" stat.
+ *
+ * @return the current value of the "_messagesBeingQueuedInProgress" stat
+ */
+ public long getMessagesBeingQueuedInProgress() {
+ return this._stats.getLong(_messagesBeingQueuedInProgressId);
+ }
+
+ /**
+ * Returns the current value of the "_messagesBeingQueuedTime" stat.
+ *
+ * @return the current value of the "_messagesBeingQueuedTime" stat
+ */
+ public long getMessagesBeingQueuedTime() {
+ return this._stats.getLong(_messagesBeingQueuedTimeId);
+ }
+
+
+ /**
* Increments the "messagesReceived" stat.
*/
public void incMessagesReceived() {
@@ -314,6 +354,13 @@ public class CacheClientProxyStats implements MessageStats
{
}
/**
+ * Increments the "messagesBeingQueuedInProgress" stat.
+ */
+ public void incMessagesBeingQueuedInProgress() {
+ this._stats.incLong(_messagesBeingQueuedInProgressId, 1);
+ }
+
+ /**
* Decrements the "cqCount" stat.
*/
public void decCqCount() {
@@ -321,6 +368,13 @@ public class CacheClientProxyStats implements MessageStats
{
}
/**
+ * Decrements the "messagesBeingQueuedInProgress" stat.
+ */
+ public void decMessagesBeingQueuedInProgress() {
+ this._stats.incLong(_messagesBeingQueuedInProgressId, -1);
+ }
+
+ /**
* Sets the "messageQueueSize" stat.
*
* @param size The size of the queue
@@ -388,4 +442,18 @@ public class CacheClientProxyStats implements MessageStats
{
public void decMessagesBeingReceived(int bytes) {
// noop since we never receive
}
+
+ public long startMessageQueueStats() {
+ incMessagesBeingQueuedInProgress();
+ return startTime();
+ }
+
+ public void endMessageQueueStats(long startTime) {
+ long ts = DistributionStats.getStatTime();
+
+ decMessagesBeingQueuedInProgress();
+
+ long elapsed = ts - startTime;
+ this._stats.incLong(_messagesBeingQueuedTimeId, elapsed);
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index c062530..d00b9a2 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -633,7 +633,11 @@ public class MessageDispatcher extends LoggingThread {
*/
protected void enqueueMessage(Conflatable clientMessage) {
try {
+ long startTime = _proxy._statistics.startMessageQueueStats();
_messageQueue.put(clientMessage);
+ _proxy._statistics.endMessageQueueStats(startTime);
+ _proxy._statistics.setQueueSize(_messageQueue.size());
+
if (_proxy.isPaused() && _proxy.isDurable()) {
_proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
if (logger.isDebugEnabled()) {
@@ -660,7 +664,10 @@ public class MessageDispatcher extends LoggingThread {
logger.debug("{}: Queueing marker message. <{}>. The queue contains {}
entries.", this,
message, getQueueSize());
}
+ long startTime = _proxy._statistics.startMessageQueueStats();
_messageQueue.put(message);
+ _proxy._statistics.endMessageQueueStats(startTime);
+ _proxy._statistics.setQueueSize(_messageQueue.size());
if (logger.isDebugEnabled()) {
logger.debug("{}: Queued marker message. The queue contains {}
entries.", this,
getQueueSize());
diff --git a/geode-docs/reference/statistics_list.html.md.erb
b/geode-docs/reference/statistics_list.html.md.erb
index 699c392..fbbea5e 100644
--- a/geode-docs/reference/statistics_list.html.md.erb
+++ b/geode-docs/reference/statistics_list.html.md.erb
@@ -1447,6 +1447,7 @@ Statistics regarding cache server operations and cache
server client notificatio
| `messagesProcessed` | Number of client operations messages
removed from the subscription queue and sent.
|
| `messagesQueued` | Number of client operations messages
added to the subscription queue.
|
| `messagesReceived` | Number of client operations messages
received.
|
+| `messagesWaitingToQueue` | Number of client operations messages
waiting to be put in the subscription queue.
|
## <a id="section_3AB1C0AA55014163A2BBF68E13D25E3A"
class="no-quick-link"></a>Server-to-Client Messaging Performance
(ClientSubscriptionStats)