This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 86d85f9c191 IGNITE-23901 Added performance statistics for 
putAllConflict, removeAllConflict operations (#11793)
86d85f9c191 is described below

commit 86d85f9c191ee8a11f18504c17aabdbd186b83f3
Author: Maksim Davydov <[email protected]>
AuthorDate: Mon Mar 3 14:07:04 2025 +0300

    IGNITE-23901 Added performance statistics for putAllConflict, 
removeAllConflict operations (#11793)
---
 .../processors/cache/GridCacheAdapter.java         | 26 ++++++++--
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 15 ++++--
 .../performancestatistics/OperationType.java       |  8 ++-
 .../PerformanceStatisticsThinClientTest.java       | 59 ++++++++++++++++++++--
 .../ignite/internal/util/lang/ConsumerX.java       |  5 +-
 5 files changed, 97 insertions(+), 16 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b53302be892..8e842d39f57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1969,8 +1969,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return;
 
         final boolean statsEnabled = ctx.statisticsEnabled();
+        boolean perfStatsEnabled = 
ctx.kernalContext().performanceStatistics().enabled();
 
-        long start = statsEnabled ? System.nanoTime() : 0L;
+        long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 
0L;
 
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
@@ -1986,6 +1987,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         if (statsEnabled)
             metrics0().addPutAllConflictTimeNanos(System.nanoTime() - start);
+
+        if (perfStatsEnabled)
+            writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start);
     }
 
     /** {@inheritDoc} */
@@ -1995,8 +1999,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return new GridFinishedFuture<Object>();
 
         final boolean statsEnabled = ctx.statisticsEnabled();
+        boolean perfStatsEnabled = 
ctx.kernalContext().performanceStatistics().enabled();
 
-        long start = statsEnabled ? System.nanoTime() : 0L;
+        long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 
0L;
 
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
@@ -2013,6 +2018,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (statsEnabled)
             fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), 
start));
 
+        if (perfStatsEnabled)
+            fut.listen(() -> 
writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start));
+
         return fut;
     }
 
@@ -2839,8 +2847,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return;
 
         boolean statsEnabled = ctx.statisticsEnabled();
+        boolean perfStatsEnabled = 
ctx.kernalContext().performanceStatistics().enabled();
 
-        long start = statsEnabled ? System.nanoTime() : 0L;
+        long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 
0L;
 
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
@@ -2856,6 +2865,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         if (statsEnabled)
             metrics0().addRemoveAllConflictTimeNanos(System.nanoTime() - 
start);
+
+        if (perfStatsEnabled)
+            writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start);
     }
 
     /** {@inheritDoc} */
@@ -2865,8 +2877,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return new GridFinishedFuture<Object>();
 
         final boolean statsEnabled = ctx.statisticsEnabled();
+        boolean perfStatsEnabled = 
ctx.kernalContext().performanceStatistics().enabled();
 
-        final long start = statsEnabled ? System.nanoTime() : 0L;
+        final long start = (statsEnabled || perfStatsEnabled) ? 
System.nanoTime() : 0L;
 
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
@@ -2883,6 +2896,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (statsEnabled)
             fut.listen(new 
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
 
+        if (perfStatsEnabled)
+            fut.listen(() -> 
writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start));
+
         return fut;
     }
 
@@ -6165,7 +6181,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * @param op Operation type.
      * @param start Start time in nanoseconds.
      */
-    private void writeStatistics(OperationType op, long start) {
+    protected void writeStatistics(OperationType op, long start) {
         ctx.kernalContext().performanceStatistics().cacheOperation(
             op,
             ctx.cacheId(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 984b688956d..50ff074b2b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -96,6 +96,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import 
org.apache.ignite.internal.processors.performancestatistics.OperationType;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -695,9 +696,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
-        final boolean statsEnabled = ctx.statisticsEnabled();
+        boolean statsEnabled = ctx.statisticsEnabled();
+        boolean perfStatsEnabled = 
ctx.kernalContext().performanceStatistics().enabled();
 
-        long start = statsEnabled ? System.nanoTime() : 0L;
+        long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 
0L;
 
         ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
 
@@ -716,6 +718,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (statsEnabled)
             fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), 
start));
 
+        if (perfStatsEnabled)
+            fut.listen(() -> 
writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start));
+
         return fut;
     }
 
@@ -760,8 +765,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
         final boolean statsEnabled = ctx.statisticsEnabled();
+        boolean perfStatsEnabled = 
ctx.kernalContext().performanceStatistics().enabled();
 
-        final long start = statsEnabled ? System.nanoTime() : 0L;
+        final long start = (statsEnabled || perfStatsEnabled) ? 
System.nanoTime() : 0L;
 
         ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
 
@@ -770,6 +776,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (statsEnabled)
             fut.listen(new 
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
 
+        if (perfStatsEnabled)
+            fut.listen(() -> 
writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start));
+
         return fut;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
index 17de3905a48..131cd9c84a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
@@ -93,13 +93,19 @@ public enum OperationType {
     /** Custom query property. */
     QUERY_PROPERTY(21),
 
+    /** Cache put all conflict. */
+    CACHE_PUT_ALL_CONFLICT(22),
+
+    /** Cache remove all conflict. */
+    CACHE_REMOVE_ALL_CONFLICT(23),
+
     /** Version. */
     VERSION(255);
 
     /** Cache operations. */
     public static final EnumSet<OperationType> CACHE_OPS = 
EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE,
         CACHE_GET_AND_PUT, CACHE_GET_AND_REMOVE, CACHE_INVOKE, CACHE_LOCK, 
CACHE_GET_ALL, CACHE_PUT_ALL,
-        CACHE_REMOVE_ALL, CACHE_INVOKE_ALL);
+        CACHE_REMOVE_ALL, CACHE_INVOKE_ALL, CACHE_PUT_ALL_CONFLICT, 
CACHE_REMOVE_ALL_CONFLICT);
 
     /** Transaction operations. */
     public static final EnumSet<OperationType> TX_OPS = EnumSet.of(TX_COMMIT, 
TX_ROLLBACK);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
index 769c1776a22..0efc0779e4d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
@@ -19,12 +19,14 @@ package 
org.apache.ignite.internal.processors.performancestatistics;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
 import org.apache.ignite.client.ClientTransaction;
 import org.apache.ignite.client.Config;
 import org.apache.ignite.client.IgniteClient;
@@ -33,26 +35,38 @@ import 
org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.ThinClientConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
 import org.apache.ignite.internal.client.thin.TestTask;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.lang.ConsumerX;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL;
+import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL_CONFLICT;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE;
 import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL;
+import static 
org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL_CONFLICT;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests thin client performance statistics.
  */
+@RunWith(Parameterized.class)
 public class PerformanceStatisticsThinClientTest extends 
AbstractPerformanceStatisticsTest {
     /** Test task name. */
     public static final String TEST_TASK_NAME = "TestTask";
@@ -66,12 +80,20 @@ public class PerformanceStatisticsThinClientTest extends 
AbstractPerformanceStat
     /** Thin client. */
     private static IgniteClient thinClient;
 
+    /** */
+    @Parameterized.Parameter
+    public CacheAtomicityMode atomicityMode;
+
+    /** */
+    @Parameterized.Parameters(name = "atomicityMode={0}")
+    public static Collection<?> parameters() {
+        return EnumSet.of(ATOMIC, TRANSACTIONAL);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setCacheConfiguration(defaultCacheConfiguration());
-
         cfg.setClientConnectorConfiguration(
             new ClientConnectorConfiguration().setThinClientConfiguration(
                 new 
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(ACTIVE_TASKS_LIMIT)));
@@ -96,9 +118,22 @@ public class PerformanceStatisticsThinClientTest extends 
AbstractPerformanceStat
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
 
+        stopAllGrids();
         thinClient.close();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        thinClient.createCache(new ClientCacheConfiguration()
+            .setName(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicityMode));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        thinClient.destroyCache(DEFAULT_CACHE_NAME);
+    }
+
     /** @throws Exception If failed. */
     @Test
     public void testCompute() throws Exception {
@@ -169,10 +204,24 @@ public class PerformanceStatisticsThinClientTest extends 
AbstractPerformanceStat
         checkCacheOperation(CACHE_REMOVE_ALL, cache -> 
cache.removeAll(Collections.singleton(3)));
 
         checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> 
cache.getAndRemove(5));
+
+        GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2);
+
+        checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache -> 
((TcpClientCache<Object, Object>)cache)
+            .putAllConflict(F.asMap(6, new T3<>(1, confl, 
CU.EXPIRE_TIME_ETERNAL))));
+
+        checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache -> 
((TcpClientCache<Object, Object>)cache)
+            .removeAllConflict(F.asMap(6, confl)));
+
+        checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache -> 
((TcpClientCache<Object, Object>)cache)
+            .putAllConflictAsync(F.asMap(7, new T3<>(2, confl, 
CU.EXPIRE_TIME_ETERNAL))).get());
+
+        checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache -> 
((TcpClientCache<Object, Object>)cache)
+            .removeAllConflictAsync(F.asMap(7, confl)).get());
     }
 
     /** Checks cache operation. */
-    private void checkCacheOperation(OperationType op, 
Consumer<ClientCache<Object, Object>> clo) throws Exception {
+    private void checkCacheOperation(OperationType op, 
ConsumerX<ClientCache<Object, Object>> clo) throws Exception {
         long startTime = U.currentTimeMillis();
 
         cleanPerformanceStatisticsDir();
@@ -202,6 +251,8 @@ public class PerformanceStatisticsThinClientTest extends 
AbstractPerformanceStat
     /** @throws Exception If failed. */
     @Test
     public void testTransaction() throws Exception {
+        assumeTrue(atomicityMode == TRANSACTIONAL);
+
         checkTx(true);
 
         checkTx(false);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
index 9c3e4fed385..2af2ea878cd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
@@ -18,13 +18,12 @@
 package org.apache.ignite.internal.util.lang;
 
 import java.util.function.Consumer;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents an operation that accepts a single input argument and returns
  * no result. Unlike most other functional interfaces,
  * {@code ConsumerX} is expected to operate via side-effects.
- *
+ * <p>
  * Also it is able to throw {@link Exception} unlike {@link Consumer}.
  *
  * @param <T> The type of the input to the operation.
@@ -36,5 +35,5 @@ public interface ConsumerX<T> {
      *
      * @param t the input argument.
      */
-    public void accept(@Nullable T t) throws Exception;
+    public void accept(T t) throws Exception;
 }

Reply via email to