This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 86d85f9c191 IGNITE-23901 Added performance statistics for
putAllConflict, removeAllConflict operations (#11793)
86d85f9c191 is described below
commit 86d85f9c191ee8a11f18504c17aabdbd186b83f3
Author: Maksim Davydov <[email protected]>
AuthorDate: Mon Mar 3 14:07:04 2025 +0300
IGNITE-23901 Added performance statistics for putAllConflict,
removeAllConflict operations (#11793)
---
.../processors/cache/GridCacheAdapter.java | 26 ++++++++--
.../distributed/dht/atomic/GridDhtAtomicCache.java | 15 ++++--
.../performancestatistics/OperationType.java | 8 ++-
.../PerformanceStatisticsThinClientTest.java | 59 ++++++++++++++++++++--
.../ignite/internal/util/lang/ConsumerX.java | 5 +-
5 files changed, 97 insertions(+), 16 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b53302be892..8e842d39f57 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1969,8 +1969,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return;
final boolean statsEnabled = ctx.statisticsEnabled();
+ boolean perfStatsEnabled =
ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() :
0L;
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
@@ -1986,6 +1987,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addPutAllConflictTimeNanos(System.nanoTime() - start);
+
+ if (perfStatsEnabled)
+ writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start);
}
/** {@inheritDoc} */
@@ -1995,8 +1999,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return new GridFinishedFuture<Object>();
final boolean statsEnabled = ctx.statisticsEnabled();
+ boolean perfStatsEnabled =
ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() :
0L;
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
@@ -2013,6 +2018,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(),
start));
+ if (perfStatsEnabled)
+ fut.listen(() ->
writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start));
+
return fut;
}
@@ -2839,8 +2847,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return;
boolean statsEnabled = ctx.statisticsEnabled();
+ boolean perfStatsEnabled =
ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() :
0L;
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
@@ -2856,6 +2865,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (statsEnabled)
metrics0().addRemoveAllConflictTimeNanos(System.nanoTime() -
start);
+
+ if (perfStatsEnabled)
+ writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start);
}
/** {@inheritDoc} */
@@ -2865,8 +2877,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return new GridFinishedFuture<Object>();
final boolean statsEnabled = ctx.statisticsEnabled();
+ boolean perfStatsEnabled =
ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = (statsEnabled || perfStatsEnabled) ?
System.nanoTime() : 0L;
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
@@ -2883,6 +2896,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (statsEnabled)
fut.listen(new
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
+ if (perfStatsEnabled)
+ fut.listen(() ->
writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start));
+
return fut;
}
@@ -6165,7 +6181,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
* @param op Operation type.
* @param start Start time in nanoseconds.
*/
- private void writeStatistics(OperationType op, long start) {
+ protected void writeStatistics(OperationType op, long start) {
ctx.kernalContext().performanceStatistics().cacheOperation(
op,
ctx.cacheId(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 984b688956d..50ff074b2b4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -96,6 +96,7 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import
org.apache.ignite.internal.processors.performancestatistics.OperationType;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -695,9 +696,10 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?>
putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
- final boolean statsEnabled = ctx.statisticsEnabled();
+ boolean statsEnabled = ctx.statisticsEnabled();
+ boolean perfStatsEnabled =
ctx.kernalContext().performanceStatistics().enabled();
- long start = statsEnabled ? System.nanoTime() : 0L;
+ long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() :
0L;
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
@@ -716,6 +718,9 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
if (statsEnabled)
fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(),
start));
+ if (perfStatsEnabled)
+ fut.listen(() ->
writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start));
+
return fut;
}
@@ -760,8 +765,9 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?>
removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
final boolean statsEnabled = ctx.statisticsEnabled();
+ boolean perfStatsEnabled =
ctx.kernalContext().performanceStatistics().enabled();
- final long start = statsEnabled ? System.nanoTime() : 0L;
+ final long start = (statsEnabled || perfStatsEnabled) ?
System.nanoTime() : 0L;
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
@@ -770,6 +776,9 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
if (statsEnabled)
fut.listen(new
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
+ if (perfStatsEnabled)
+ fut.listen(() ->
writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start));
+
return fut;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
index 17de3905a48..131cd9c84a8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
@@ -93,13 +93,19 @@ public enum OperationType {
/** Custom query property. */
QUERY_PROPERTY(21),
+ /** Cache put all conflict. */
+ CACHE_PUT_ALL_CONFLICT(22),
+
+ /** Cache remove all conflict. */
+ CACHE_REMOVE_ALL_CONFLICT(23),
+
/** Version. */
VERSION(255);
/** Cache operations. */
public static final EnumSet<OperationType> CACHE_OPS =
EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE,
CACHE_GET_AND_PUT, CACHE_GET_AND_REMOVE, CACHE_INVOKE, CACHE_LOCK,
CACHE_GET_ALL, CACHE_PUT_ALL,
- CACHE_REMOVE_ALL, CACHE_INVOKE_ALL);
+ CACHE_REMOVE_ALL, CACHE_INVOKE_ALL, CACHE_PUT_ALL_CONFLICT,
CACHE_REMOVE_ALL_CONFLICT);
/** Transaction operations. */
public static final EnumSet<OperationType> TX_OPS = EnumSet.of(TX_COMMIT,
TX_ROLLBACK);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
index 769c1776a22..0efc0779e4d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
@@ -19,12 +19,14 @@ package
org.apache.ignite.internal.processors.performancestatistics;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientTransaction;
import org.apache.ignite.client.Config;
import org.apache.ignite.client.IgniteClient;
@@ -33,26 +35,38 @@ import
org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.ThinClientConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
import org.apache.ignite.internal.client.thin.TestTask;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL_CONFLICT;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL_CONFLICT;
+import static org.junit.Assume.assumeTrue;
/**
* Tests thin client performance statistics.
*/
+@RunWith(Parameterized.class)
public class PerformanceStatisticsThinClientTest extends
AbstractPerformanceStatisticsTest {
/** Test task name. */
public static final String TEST_TASK_NAME = "TestTask";
@@ -66,12 +80,20 @@ public class PerformanceStatisticsThinClientTest extends
AbstractPerformanceStat
/** Thin client. */
private static IgniteClient thinClient;
+ /** */
+ @Parameterized.Parameter
+ public CacheAtomicityMode atomicityMode;
+
+ /** */
+ @Parameterized.Parameters(name = "atomicityMode={0}")
+ public static Collection<?> parameters() {
+ return EnumSet.of(ATOMIC, TRANSACTIONAL);
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setCacheConfiguration(defaultCacheConfiguration());
-
cfg.setClientConnectorConfiguration(
new ClientConnectorConfiguration().setThinClientConfiguration(
new
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(ACTIVE_TASKS_LIMIT)));
@@ -96,9 +118,22 @@ public class PerformanceStatisticsThinClientTest extends
AbstractPerformanceStat
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
+ stopAllGrids();
thinClient.close();
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ thinClient.createCache(new ClientCacheConfiguration()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(atomicityMode));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ thinClient.destroyCache(DEFAULT_CACHE_NAME);
+ }
+
/** @throws Exception If failed. */
@Test
public void testCompute() throws Exception {
@@ -169,10 +204,24 @@ public class PerformanceStatisticsThinClientTest extends
AbstractPerformanceStat
checkCacheOperation(CACHE_REMOVE_ALL, cache ->
cache.removeAll(Collections.singleton(3)));
checkCacheOperation(CACHE_GET_AND_REMOVE, cache ->
cache.getAndRemove(5));
+
+ GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2);
+
+ checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache ->
((TcpClientCache<Object, Object>)cache)
+ .putAllConflict(F.asMap(6, new T3<>(1, confl,
CU.EXPIRE_TIME_ETERNAL))));
+
+ checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache ->
((TcpClientCache<Object, Object>)cache)
+ .removeAllConflict(F.asMap(6, confl)));
+
+ checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache ->
((TcpClientCache<Object, Object>)cache)
+ .putAllConflictAsync(F.asMap(7, new T3<>(2, confl,
CU.EXPIRE_TIME_ETERNAL))).get());
+
+ checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache ->
((TcpClientCache<Object, Object>)cache)
+ .removeAllConflictAsync(F.asMap(7, confl)).get());
}
/** Checks cache operation. */
- private void checkCacheOperation(OperationType op,
Consumer<ClientCache<Object, Object>> clo) throws Exception {
+ private void checkCacheOperation(OperationType op,
ConsumerX<ClientCache<Object, Object>> clo) throws Exception {
long startTime = U.currentTimeMillis();
cleanPerformanceStatisticsDir();
@@ -202,6 +251,8 @@ public class PerformanceStatisticsThinClientTest extends
AbstractPerformanceStat
/** @throws Exception If failed. */
@Test
public void testTransaction() throws Exception {
+ assumeTrue(atomicityMode == TRANSACTIONAL);
+
checkTx(true);
checkTx(false);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
index 9c3e4fed385..2af2ea878cd 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
@@ -18,13 +18,12 @@
package org.apache.ignite.internal.util.lang;
import java.util.function.Consumer;
-import org.jetbrains.annotations.Nullable;
/**
* Represents an operation that accepts a single input argument and returns
* no result. Unlike most other functional interfaces,
* {@code ConsumerX} is expected to operate via side-effects.
- *
+ * <p>
* Also it is able to throw {@link Exception} unlike {@link Consumer}.
*
* @param <T> The type of the input to the operation.
@@ -36,5 +35,5 @@ public interface ConsumerX<T> {
*
* @param t the input argument.
*/
- public void accept(@Nullable T t) throws Exception;
+ public void accept(T t) throws Exception;
}