This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5759c797fe4 IGNITE-23169 Add operation time metric for putAllConflict, 
removeAllConflict (#11627)
5759c797fe4 is described below

commit 5759c797fe4bc159a6619090c517f34beb345ed2
Author: Maksim Davydov <[email protected]>
AuthorDate: Tue Nov 26 14:17:23 2024 +0300

    IGNITE-23169 Add operation time metric for putAllConflict, 
removeAllConflict (#11627)
---
 docs/_docs/monitoring-metrics/new-metrics.adoc     |  2 +
 .../processors/cache/CacheMetricsImpl.java         | 38 +++++++++++
 .../processors/cache/GridCacheAdapter.java         | 78 +++++++++++++++++++++-
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 22 +++++-
 .../main/resources/META-INF/classnames.properties  |  2 +
 .../cache/GridCacheAbstractMetricsSelfTest.java    | 29 ++++++++
 6 files changed, 167 insertions(+), 4 deletions(-)

diff --git a/docs/_docs/monitoring-metrics/new-metrics.adoc 
b/docs/_docs/monitoring-metrics/new-metrics.adoc
index e723e93fa9f..3bc1efd2a13 100644
--- a/docs/_docs/monitoring-metrics/new-metrics.adoc
+++ b/docs/_docs/monitoring-metrics/new-metrics.adoc
@@ -95,6 +95,7 @@ Register name: `cache.{cache_name}.{near}`
 |OffHeapPrimaryEntriesCount|long|Offheap primary entries count.
 |OffHeapPuts |long|The total number of put requests to the off-heap memory.
 |OffHeapRemovals |long|The total number of removals from the off-heap memory.
+|PutAllConflictTime | histogram | PutAllConflict time for which this node is 
the initiator, in nanoseconds.
 |PutAllTime | histogram | PutAll time for which this node is the initiator, in 
nanoseconds.
 |PutTime | histogram | Put time for which this node is the initiator, in 
nanoseconds.
 |PutTimeTotal | long | The total time of cache puts for which this node is the 
initiator, in nanoseconds.
@@ -109,6 +110,7 @@ Register name: `cache.{cache_name}.{near}`
 |RebalancedKeys |long| Number of already rebalanced keys.
 |RebalancingBytesRate|long|Estimated rebalancing speed in bytes.
 |RebalancingKeysRate |long|Estimated rebalancing speed in keys.
+|RemoveAllConflictTime | histogram | RemoveAllConflict time for which this 
node is the initiator, in nanoseconds.
 |RemoveAllTime | histogram | RemoveAll time for which this node is the 
initiator, in nanoseconds.
 |RemoveTime | histogram | Remove time for which this node is the initiator. in 
nanoseconds.
 |RemoveTimeTotal | long | The total time of cache removal, in nanoseconds.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 02892764994..a67d265dbf7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -193,12 +193,18 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** PutAll time. */
     private final HistogramMetricImpl putAllTime;
 
+    /** PutAllConflict time. */
+    private final HistogramMetricImpl putAllConflictTime;
+
     /** Remove time. */
     private final HistogramMetricImpl rmvTime;
 
     /** RemoveAll time. */
     private final HistogramMetricImpl rmvAllTime;
 
+    /** RemoveAllConflict time. */
+    private final HistogramMetricImpl rmvAllConflictTime;
+
     /** Commit time. */
     private final HistogramMetricImpl commitTime;
 
@@ -395,12 +401,18 @@ public class CacheMetricsImpl implements CacheMetrics {
         putAllTime = mreg.histogram("PutAllTime", HISTOGRAM_BUCKETS,
             "PutAll time for which this node is the initiator, in 
nanoseconds.");
 
+        putAllConflictTime = mreg.histogram("PutAllConflictTime", 
HISTOGRAM_BUCKETS,
+            "PutAllConflict time for which this node is the initiator, in 
nanoseconds.");
+
         rmvTime = mreg.histogram("RemoveTime", HISTOGRAM_BUCKETS,
             "Remove time for which this node is the initiator, in 
nanoseconds.");
 
         rmvAllTime = mreg.histogram("RemoveAllTime", HISTOGRAM_BUCKETS,
             "RemoveAll time for which this node is the initiator, in 
nanoseconds.");
 
+        rmvAllConflictTime = mreg.histogram("RemoveAllConflictTime", 
HISTOGRAM_BUCKETS,
+            "RemoveAllConflict time for which this node is the initiator, in 
nanoseconds.");
+
         commitTime = mreg.histogram("CommitTime", HISTOGRAM_BUCKETS, "Commit 
time in nanoseconds.");
 
         rollbackTime = mreg.histogram("RollbackTime", HISTOGRAM_BUCKETS, 
"Rollback time in nanoseconds.");
@@ -731,8 +743,10 @@ public class CacheMetricsImpl implements CacheMetrics {
         getAllTime.reset();
         putTime.reset();
         putAllTime.reset();
+        putAllConflictTime.reset();
         rmvTime.reset();
         rmvAllTime.reset();
+        rmvAllConflictTime.reset();
         commitTime.reset();
         rollbackTime.reset();
 
@@ -1201,6 +1215,18 @@ public class CacheMetricsImpl implements CacheMetrics {
             delegate.addPutAllTimeNanos(duration);
     }
 
+    /**
+     * Increments the putAllConflict time accumulator.
+     *
+     * @param duration the time taken in nanoseconds.
+     */
+    public void addPutAllConflictTimeNanos(long duration) {
+        putAllConflictTime.value(duration);
+
+        if (delegate != null)
+            delegate.addPutAllConflictTimeNanos(duration);
+    }
+
     /**
      * Increments the remove time accumulator.
      *
@@ -1229,6 +1255,18 @@ public class CacheMetricsImpl implements CacheMetrics {
             delegate.addRemoveAllTimeNanos(duration);
     }
 
+    /**
+     * Increments the removeAllConflict time accumulator.
+     *
+     * @param duration the time taken in nanoseconds.
+     */
+    public void addRemoveAllConflictTimeNanos(long duration) {
+        rmvAllConflictTime.value(duration);
+
+        if (delegate != null)
+            delegate.addRemoveAllConflictTimeNanos(duration);
+    }
+
     /**
      * Increments remove and get time accumulators.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fac7c9a2e6e..18efb388fe3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1961,6 +1961,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (F.isEmpty(drMap))
             return;
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        long start = statsEnabled ? System.nanoTime() : 0L;
+
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
         syncOp(new SyncInOp(drMap.size() == 1) {
@@ -1972,6 +1976,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 return "putAllConflict [drMap=" + drMap + ']';
             }
         });
+
+        if (statsEnabled)
+            metrics0().addPutAllConflictTimeNanos(System.nanoTime() - start);
     }
 
     /** {@inheritDoc} */
@@ -1980,9 +1987,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (F.isEmpty(drMap))
             return new GridFinishedFuture<Object>();
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        long start = statsEnabled ? System.nanoTime() : 0L;
+
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
-        return asyncOp(new AsyncOp(drMap.keySet()) {
+        IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(drMap.keySet()) {
             @Override public IgniteInternalFuture op(GridNearTxLocal tx, 
AffinityTopologyVersion readyTopVer) {
                 return tx.putAllDrAsync(ctx, drMap);
             }
@@ -1991,6 +2002,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 return "putAllConflictAsync [drMap=" + drMap + ']';
             }
         });
+
+        if (statsEnabled)
+            fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), 
start));
+
+        return fut;
     }
 
     /** {@inheritDoc} */
@@ -2815,6 +2831,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (F.isEmpty(drMap))
             return;
 
+        boolean statsEnabled = ctx.statisticsEnabled();
+
+        long start = statsEnabled ? System.nanoTime() : 0L;
+
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
         syncOp(new SyncInOp(false) {
@@ -2826,6 +2846,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 return "removeAllConflict [drMap=" + drMap + ']';
             }
         });
+
+        if (statsEnabled)
+            metrics0().addRemoveAllConflictTimeNanos(System.nanoTime() - 
start);
     }
 
     /** {@inheritDoc} */
@@ -2834,9 +2857,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (F.isEmpty(drMap))
             return new GridFinishedFuture<Object>();
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
-        return asyncOp(new AsyncOp(drMap.keySet()) {
+        IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(drMap.keySet()) {
             @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, 
AffinityTopologyVersion readyTopVer) {
                 return tx.removeAllDrAsync(ctx, drMap);
             }
@@ -2845,6 +2872,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 return "removeAllDrASync [drMap=" + drMap + ']';
             }
         });
+
+        if (statsEnabled)
+            fut.listen(new 
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
+
+        return fut;
     }
 
     /** {@inheritDoc} */
@@ -5993,6 +6025,27 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
     }
 
+    /**
+     *
+     */
+    protected static class UpdateRemoveAllConflictTimeStatClosure<T> extends 
UpdateTimeStatClosure<T> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param metrics Metrics.
+         * @param start Start time.
+         */
+        public UpdateRemoveAllConflictTimeStatClosure(CacheMetricsImpl 
metrics, long start) {
+            super(metrics, start);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void updateTimeStat(T res) {
+            metrics.addRemoveAllConflictTimeNanos(System.nanoTime() - start);
+        }
+    }
+
     /**
      *
      */
@@ -6035,6 +6088,27 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
     }
 
+    /**
+     *
+     */
+    protected static class UpdatePutAllConflictTimeStatClosure<T> extends 
UpdateTimeStatClosure<T> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param metrics Metrics.
+         * @param start Start time.
+         */
+        public UpdatePutAllConflictTimeStatClosure(CacheMetricsImpl metrics, 
long start) {
+            super(metrics, start);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void updateTimeStat(T res) {
+            metrics.addPutAllConflictTimeNanos(System.nanoTime() - start);
+        }
+    }
+
     /**
      *
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4d07658898b..95774ba9480 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -674,11 +674,15 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        long start = statsEnabled ? System.nanoTime() : 0L;
+
         ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
 
         warnIfUnordered(conflictMap, BulkOperation.PUT);
 
-        return updateAll0(
+        IgniteInternalFuture<?> fut = updateAll0(
             null,
             null,
             null,
@@ -687,6 +691,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             false,
             UPDATE,
             true);
+
+        if (statsEnabled)
+            fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), 
start));
+
+        return fut;
     }
 
     /** {@inheritDoc} */
@@ -729,9 +738,18 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
 
-        return removeAllAsync0(null, conflictMap, false, true);
+        IgniteInternalFuture<?> fut = removeAllAsync0(null, conflictMap, 
false, true);
+
+        if (statsEnabled)
+            fut.listen(new 
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
+
+        return fut;
     }
 
     /**
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 2a6c3252031..f1c748588ae 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -914,9 +914,11 @@ 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetAllTimeSta
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetAndRemoveTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllTimeStatClosure
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllConflictTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllTimeStatClosure
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllConflictTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAttributes
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
index 42c9049fd2e..4a55a7733d4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.expiry.TouchedExpiryPolicy;
@@ -42,6 +43,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1578,6 +1581,32 @@ public abstract class GridCacheAbstractMetricsSelfTest 
extends GridCacheAbstract
         assertTrue(waitForCondition(() -> removeTimeTotal.value() > 0, 
getTestTimeout()));
     }
 
+    /** */
+    @Test
+    public void testPutAndRemoveAllConflict() throws Exception {
+        IgniteInternalCache<Integer, Integer> cachex = 
grid(0).cachex(DEFAULT_CACHE_NAME);
+
+        HistogramMetricImpl putAllConflictTime = metric("PutAllConflictTime");
+        HistogramMetricImpl removeAllConflictTime = 
metric("RemoveAllConflictTime");
+
+        assertTrue(stream(putAllConflictTime.value()).allMatch(v -> v == 0));
+        assertTrue(stream(removeAllConflictTime.value()).allMatch(v -> v == 
0));
+
+        GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2);
+        GridCacheDrInfo val = new GridCacheDrInfo(new CacheObjectImpl(1, 
null), confl);
+
+        IntFunction<KeyCacheObject> keyGen = i -> new KeyCacheObjectImpl(i, 
null, cachex.affinity().partition(0));
+
+        cachex.putAllConflict(F.asMap(keyGen.apply(1), val));
+        cachex.removeAllConflict(F.asMap(keyGen.apply(1), confl));
+
+        cachex.putAllConflictAsync(F.asMap(keyGen.apply(2), val)).get();
+        cachex.removeAllConflictAsync(F.asMap(keyGen.apply(2), confl)).get();
+
+        assertTrue(waitForCondition(() -> 
stream(putAllConflictTime.value()).sum() == 2, getTestTimeout()));
+        assertTrue(waitForCondition(() -> 
stream(removeAllConflictTime.value()).sum() == 2, getTestTimeout()));
+    }
+
     /**
      * @param name Metric name to find.
      * @return Metric.

Reply via email to