This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new a10a56f GEODE-9074: Added update of messageQueueSize at putting message to qu… (#6445) a10a56f is described below commit a10a56f573e9a4aabf3c5b6e3862539d6dd93b92 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Tue Aug 17 22:12:42 2021 +0200 GEODE-9074: Added update of messageQueueSize at putting message to qu… (#6445) * GEODE-9074: Added update of messageQueueSize at putting message to queue. Also added new statistics messagesBeingQueuedInProgress and messagesBeingQueuedTime. --- .../cache/tier/sockets/CacheClientProxyTest.java | 61 +++++++++++++++++++ .../cache/tier/sockets/CacheClientProxyStats.java | 70 +++++++++++++++++++++- .../cache/tier/sockets/MessageDispatcher.java | 7 +++ geode-docs/reference/statistics_list.html.md.erb | 1 + 4 files changed, 138 insertions(+), 1 deletion(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java index 079c54e..ca7acf0 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.cache.tier.sockets; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; @@ -28,12 +29,20 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.net.Socket; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.internal.cache.EnumListenerEvent; +import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.ha.HAContainerMap; +import org.apache.geode.internal.cache.ha.HAContainerWrapper; import org.apache.geode.internal.net.SocketCloser; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.statistics.StatisticsClock; @@ -93,4 +102,56 @@ public class CacheClientProxyTest { closeSocketShouldBeAtomic(); } } + + @Test + public void checkQueueingStats() { + final CacheServerStats stats = mock(CacheServerStats.class); + doNothing().when(stats).incCurrentQueueConnections(); + + final InternalCache cache = serverRule.getCache(); + + final CacheClientNotifier ccn = mock(CacheClientNotifier.class); + final SocketCloser sc = mock(SocketCloser.class); + when(ccn.getCache()).thenReturn(cache); + when(ccn.getAcceptorStats()).thenReturn(stats); + when(ccn.getSocketCloser()).thenReturn(sc); + final HAContainerWrapper haContainer = new HAContainerMap(new ConcurrentHashMap<>()); + when(ccn.getHaContainer()).thenReturn(haContainer); + + final Socket socket = mock(Socket.class); + final InetAddress address = mock(InetAddress.class); + when(socket.getInetAddress()).thenReturn(address); + when(address.getHostAddress()).thenReturn("localhost"); + doNothing().when(sc).asyncClose(any(), eq("localhost"), any(Runnable.class)); + + final ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class); + final DistributedMember member = cache.getDistributedSystem().getDistributedMember(); + when(proxyID.getDistributedMember()).thenReturn(member); + final String regionName = "region/test"; + when(proxyID.getHARegionName()).thenReturn(regionName); + + CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true, + Handshake.CONFLATION_DEFAULT, KnownVersion.CURRENT, 1L, true, + null, null, mock(StatisticsClock.class)); + + Region dataRegion = createDataRegion(); + proxy.initializeMessageDispatcher(); + ClientUpdateMessage clientUpdateMessageImpl1 = + new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE, + (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null, + new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem())); + ClientUpdateMessage clientUpdateMessageImpl2 = + new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE, + (LocalRegion) dataRegion, "key1", "value1".getBytes(), (byte) 0x01, null, + new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem())); + proxy._messageDispatcher.enqueueMessage(clientUpdateMessageImpl1); + proxy._messageDispatcher.enqueueMessage(clientUpdateMessageImpl2); + + assertThat(proxy.getStatistics().getMessageQueueSize()).isEqualTo(2); + + } + + private Region createDataRegion() { + return serverRule.getCache().createRegionFactory(RegionShortcut.REPLICATE).create("data"); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java index 4337ce9..9802ba4 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyStats.java @@ -61,6 +61,11 @@ public class CacheClientProxyStats implements MessageStats { /** Name of the CQ count statistic */ private static final String CQ_COUNT = "cqCount"; + /** Name of the messages currently being queued statistic */ + private static final String MESSAGES_BEING_QUEUED_IN_PROGRESS = "messagesBeingQueuedInProgress"; + /** Name of the messages queueing time statistic */ + private static final String MESSAGES_BEING_QUEUED_TIME = "messagesBeingQueuedTime"; + /** Id of the messages received statistic */ private static final int _messagesReceivedId; /** Id of the messages queued statistic */ @@ -85,6 +90,12 @@ public class CacheClientProxyStats implements MessageStats { private static final int _cqCountId; private static final int _sentBytesId; + /** Id of the messages currently being queued statistic */ + private static final int _messagesBeingQueuedInProgressId; + /** Id of the messages queueing time statistic */ + private static final int _messagesBeingQueuedTimeId; + + /* * Static initializer to create and initialize the <code>StatisticsType</code> */ @@ -128,7 +139,14 @@ public class CacheClientProxyStats implements MessageStats { "operations"), f.createLongCounter(CQ_COUNT, "Number of CQs on the client.", "operations"), - f.createLongCounter("sentBytes", "Total number of bytes sent to client.", "bytes"),}); + f.createLongCounter("sentBytes", "Total number of bytes sent to client.", "bytes"), + + f.createLongGauge(MESSAGES_BEING_QUEUED_IN_PROGRESS, + "Threads currently adding a message to the queue. Consistently high values indicate that the queue is full and adds are being delayed.", + "threads"), + f.createLongCounter(MESSAGES_BEING_QUEUED_TIME, + "Total time spent while message is put in queue.", "nanoseconds"), + }); // Initialize id fields _messagesReceivedId = _type.nameToId(MESSAGES_RECEIVED); @@ -143,6 +161,9 @@ public class CacheClientProxyStats implements MessageStats { _deltaFullMessagesSentId = _type.nameToId(DELTA_FULL_MESSAGES_SENT); _cqCountId = _type.nameToId(CQ_COUNT); _sentBytesId = _type.nameToId("sentBytes"); + _messagesBeingQueuedInProgressId = _type.nameToId(MESSAGES_BEING_QUEUED_IN_PROGRESS); + _messagesBeingQueuedTimeId = _type.nameToId(MESSAGES_BEING_QUEUED_TIME); + } ////////////////////// Instance Fields ////////////////////// @@ -272,6 +293,25 @@ public class CacheClientProxyStats implements MessageStats { } /** + * Returns the current value of the "_messagesBeingQueuedInProgress" stat. + * + * @return the current value of the "_messagesBeingQueuedInProgress" stat + */ + public long getMessagesBeingQueuedInProgress() { + return this._stats.getLong(_messagesBeingQueuedInProgressId); + } + + /** + * Returns the current value of the "_messagesBeingQueuedTime" stat. + * + * @return the current value of the "_messagesBeingQueuedTime" stat + */ + public long getMessagesBeingQueuedTime() { + return this._stats.getLong(_messagesBeingQueuedTimeId); + } + + + /** * Increments the "messagesReceived" stat. */ public void incMessagesReceived() { @@ -314,6 +354,13 @@ public class CacheClientProxyStats implements MessageStats { } /** + * Increments the "messagesBeingQueuedInProgress" stat. + */ + public void incMessagesBeingQueuedInProgress() { + this._stats.incLong(_messagesBeingQueuedInProgressId, 1); + } + + /** * Decrements the "cqCount" stat. */ public void decCqCount() { @@ -321,6 +368,13 @@ public class CacheClientProxyStats implements MessageStats { } /** + * Decrements the "messagesBeingQueuedInProgress" stat. + */ + public void decMessagesBeingQueuedInProgress() { + this._stats.incLong(_messagesBeingQueuedInProgressId, -1); + } + + /** * Sets the "messageQueueSize" stat. * * @param size The size of the queue @@ -388,4 +442,18 @@ public class CacheClientProxyStats implements MessageStats { public void decMessagesBeingReceived(int bytes) { // noop since we never receive } + + public long startMessageQueueStats() { + incMessagesBeingQueuedInProgress(); + return startTime(); + } + + public void endMessageQueueStats(long startTime) { + long ts = DistributionStats.getStatTime(); + + decMessagesBeingQueuedInProgress(); + + long elapsed = ts - startTime; + this._stats.incLong(_messagesBeingQueuedTimeId, elapsed); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java index c062530..d00b9a2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java @@ -633,7 +633,11 @@ public class MessageDispatcher extends LoggingThread { */ protected void enqueueMessage(Conflatable clientMessage) { try { + long startTime = _proxy._statistics.startMessageQueueStats(); _messageQueue.put(clientMessage); + _proxy._statistics.endMessageQueueStats(startTime); + _proxy._statistics.setQueueSize(_messageQueue.size()); + if (_proxy.isPaused() && _proxy.isDurable()) { _proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount(); if (logger.isDebugEnabled()) { @@ -660,7 +664,10 @@ public class MessageDispatcher extends LoggingThread { logger.debug("{}: Queueing marker message. <{}>. The queue contains {} entries.", this, message, getQueueSize()); } + long startTime = _proxy._statistics.startMessageQueueStats(); _messageQueue.put(message); + _proxy._statistics.endMessageQueueStats(startTime); + _proxy._statistics.setQueueSize(_messageQueue.size()); if (logger.isDebugEnabled()) { logger.debug("{}: Queued marker message. The queue contains {} entries.", this, getQueueSize()); diff --git a/geode-docs/reference/statistics_list.html.md.erb b/geode-docs/reference/statistics_list.html.md.erb index 699c392..fbbea5e 100644 --- a/geode-docs/reference/statistics_list.html.md.erb +++ b/geode-docs/reference/statistics_list.html.md.erb @@ -1447,6 +1447,7 @@ Statistics regarding cache server operations and cache server client notificatio | `messagesProcessed` | Number of client operations messages removed from the subscription queue and sent. | | `messagesQueued` | Number of client operations messages added to the subscription queue. | | `messagesReceived` | Number of client operations messages received. | +| `messagesWaitingToQueue` | Number of client operations messages waiting to be put in the subscription queue. | ## <a id="section_3AB1C0AA55014163A2BBF68E13D25E3A" class="no-quick-link"></a>Server-to-Client Messaging Performance (ClientSubscriptionStats)