This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5759c797fe4 IGNITE-23169 Add operation time metric for putAllConflict,
removeAllConflict (#11627)
5759c797fe4 is described below
commit 5759c797fe4bc159a6619090c517f34beb345ed2
Author: Maksim Davydov <[email protected]>
AuthorDate: Tue Nov 26 14:17:23 2024 +0300
IGNITE-23169 Add operation time metric for putAllConflict,
removeAllConflict (#11627)
---
docs/_docs/monitoring-metrics/new-metrics.adoc | 2 +
.../processors/cache/CacheMetricsImpl.java | 38 +++++++++++
.../processors/cache/GridCacheAdapter.java | 78 +++++++++++++++++++++-
.../distributed/dht/atomic/GridDhtAtomicCache.java | 22 +++++-
.../main/resources/META-INF/classnames.properties | 2 +
.../cache/GridCacheAbstractMetricsSelfTest.java | 29 ++++++++
6 files changed, 167 insertions(+), 4 deletions(-)
diff --git a/docs/_docs/monitoring-metrics/new-metrics.adoc
b/docs/_docs/monitoring-metrics/new-metrics.adoc
index e723e93fa9f..3bc1efd2a13 100644
--- a/docs/_docs/monitoring-metrics/new-metrics.adoc
+++ b/docs/_docs/monitoring-metrics/new-metrics.adoc
@@ -95,6 +95,7 @@ Register name: `cache.{cache_name}.{near}`
|OffHeapPrimaryEntriesCount|long|Offheap primary entries count.
|OffHeapPuts |long|The total number of put requests to the off-heap memory.
|OffHeapRemovals |long|The total number of removals from the off-heap memory.
+|PutAllConflictTime | histogram | PutAllConflict time for which this node is
the initiator, in nanoseconds.
|PutAllTime | histogram | PutAll time for which this node is the initiator, in
nanoseconds.
|PutTime | histogram | Put time for which this node is the initiator, in
nanoseconds.
|PutTimeTotal | long | The total time of cache puts for which this node is the
initiator, in nanoseconds.
@@ -109,6 +110,7 @@ Register name: `cache.{cache_name}.{near}`
|RebalancedKeys |long| Number of already rebalanced keys.
|RebalancingBytesRate|long|Estimated rebalancing speed in bytes.
|RebalancingKeysRate |long|Estimated rebalancing speed in keys.
+|RemoveAllConflictTime | histogram | RemoveAllConflict time for which this
node is the initiator, in nanoseconds.
|RemoveAllTime | histogram | RemoveAll time for which this node is the
initiator, in nanoseconds.
|RemoveTime | histogram | Remove time for which this node is the initiator. in
nanoseconds.
|RemoveTimeTotal | long | The total time of cache removal, in nanoseconds.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 02892764994..a67d265dbf7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -193,12 +193,18 @@ public class CacheMetricsImpl implements CacheMetrics {
/** PutAll time. */
private final HistogramMetricImpl putAllTime;
+ /** PutAllConflict time. */
+ private final HistogramMetricImpl putAllConflictTime;
+
/** Remove time. */
private final HistogramMetricImpl rmvTime;
/** RemoveAll time. */
private final HistogramMetricImpl rmvAllTime;
+ /** RemoveAllConflict time. */
+ private final HistogramMetricImpl rmvAllConflictTime;
+
/** Commit time. */
private final HistogramMetricImpl commitTime;
@@ -395,12 +401,18 @@ public class CacheMetricsImpl implements CacheMetrics {
putAllTime = mreg.histogram("PutAllTime", HISTOGRAM_BUCKETS,
"PutAll time for which this node is the initiator, in
nanoseconds.");
+ putAllConflictTime = mreg.histogram("PutAllConflictTime",
HISTOGRAM_BUCKETS,
+ "PutAllConflict time for which this node is the initiator, in
nanoseconds.");
+
rmvTime = mreg.histogram("RemoveTime", HISTOGRAM_BUCKETS,
"Remove time for which this node is the initiator, in
nanoseconds.");
rmvAllTime = mreg.histogram("RemoveAllTime", HISTOGRAM_BUCKETS,
"RemoveAll time for which this node is the initiator, in
nanoseconds.");
+ rmvAllConflictTime = mreg.histogram("RemoveAllConflictTime",
HISTOGRAM_BUCKETS,
+ "RemoveAllConflict time for which this node is the initiator, in
nanoseconds.");
+
commitTime = mreg.histogram("CommitTime", HISTOGRAM_BUCKETS, "Commit
time in nanoseconds.");
rollbackTime = mreg.histogram("RollbackTime", HISTOGRAM_BUCKETS,
"Rollback time in nanoseconds.");
@@ -731,8 +743,10 @@ public class CacheMetricsImpl implements CacheMetrics {
getAllTime.reset();
putTime.reset();
putAllTime.reset();
+ putAllConflictTime.reset();
rmvTime.reset();
rmvAllTime.reset();
+ rmvAllConflictTime.reset();
commitTime.reset();
rollbackTime.reset();
@@ -1201,6 +1215,18 @@ public class CacheMetricsImpl implements CacheMetrics {
delegate.addPutAllTimeNanos(duration);
}
+ /**
+ * Increments the putAllConflict time accumulator.
+ *
+ * @param duration the time taken in nanoseconds.
+ */
+ public void addPutAllConflictTimeNanos(long duration) {
+ putAllConflictTime.value(duration);
+
+ if (delegate != null)
+ delegate.addPutAllConflictTimeNanos(duration);
+ }
+
/**
* Increments the remove time accumulator.
*
@@ -1229,6 +1255,18 @@ public class CacheMetricsImpl implements CacheMetrics {
delegate.addRemoveAllTimeNanos(duration);
}
+ /**
+ * Increments the removeAllConflict time accumulator.
+ *
+ * @param duration the time taken in nanoseconds.
+ */
+ public void addRemoveAllConflictTimeNanos(long duration) {
+ rmvAllConflictTime.value(duration);
+
+ if (delegate != null)
+ delegate.addRemoveAllConflictTimeNanos(duration);
+ }
+
/**
* Increments remove and get time accumulators.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fac7c9a2e6e..18efb388fe3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1961,6 +1961,10 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (F.isEmpty(drMap))
return;
+ final boolean statsEnabled = ctx.statisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
@@ -1972,6 +1976,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return "putAllConflict [drMap=" + drMap + ']';
}
});
+
+ if (statsEnabled)
+ metrics0().addPutAllConflictTimeNanos(System.nanoTime() - start);
}
/** {@inheritDoc} */
@@ -1980,9 +1987,13 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (F.isEmpty(drMap))
return new GridFinishedFuture<Object>();
+ final boolean statsEnabled = ctx.statisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
- return asyncOp(new AsyncOp(drMap.keySet()) {
+ IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(drMap.keySet()) {
@Override public IgniteInternalFuture op(GridNearTxLocal tx,
AffinityTopologyVersion readyTopVer) {
return tx.putAllDrAsync(ctx, drMap);
}
@@ -1991,6 +2002,11 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return "putAllConflictAsync [drMap=" + drMap + ']';
}
});
+
+ if (statsEnabled)
+ fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(),
start));
+
+ return fut;
}
/** {@inheritDoc} */
@@ -2815,6 +2831,10 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (F.isEmpty(drMap))
return;
+ boolean statsEnabled = ctx.statisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
@@ -2826,6 +2846,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return "removeAllConflict [drMap=" + drMap + ']';
}
});
+
+ if (statsEnabled)
+ metrics0().addRemoveAllConflictTimeNanos(System.nanoTime() -
start);
}
/** {@inheritDoc} */
@@ -2834,9 +2857,13 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (F.isEmpty(drMap))
return new GridFinishedFuture<Object>();
+ final boolean statsEnabled = ctx.statisticsEnabled();
+
+ final long start = statsEnabled ? System.nanoTime() : 0L;
+
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
- return asyncOp(new AsyncOp(drMap.keySet()) {
+ IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(drMap.keySet()) {
@Override public IgniteInternalFuture<?> op(GridNearTxLocal tx,
AffinityTopologyVersion readyTopVer) {
return tx.removeAllDrAsync(ctx, drMap);
}
@@ -2845,6 +2872,11 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return "removeAllDrASync [drMap=" + drMap + ']';
}
});
+
+ if (statsEnabled)
+ fut.listen(new
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
+
+ return fut;
}
/** {@inheritDoc} */
@@ -5993,6 +6025,27 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
}
}
+ /**
+ *
+ */
+ protected static class UpdateRemoveAllConflictTimeStatClosure<T> extends
UpdateTimeStatClosure<T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param metrics Metrics.
+ * @param start Start time.
+ */
+ public UpdateRemoveAllConflictTimeStatClosure(CacheMetricsImpl
metrics, long start) {
+ super(metrics, start);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateTimeStat(T res) {
+ metrics.addRemoveAllConflictTimeNanos(System.nanoTime() - start);
+ }
+ }
+
/**
*
*/
@@ -6035,6 +6088,27 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
}
}
+ /**
+ *
+ */
+ protected static class UpdatePutAllConflictTimeStatClosure<T> extends
UpdateTimeStatClosure<T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param metrics Metrics.
+ * @param start Start time.
+ */
+ public UpdatePutAllConflictTimeStatClosure(CacheMetricsImpl metrics,
long start) {
+ super(metrics, start);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void updateTimeStat(T res) {
+ metrics.addPutAllConflictTimeNanos(System.nanoTime() - start);
+ }
+ }
+
/**
*
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4d07658898b..95774ba9480 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -674,11 +674,15 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?>
putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
+ final boolean statsEnabled = ctx.statisticsEnabled();
+
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
warnIfUnordered(conflictMap, BulkOperation.PUT);
- return updateAll0(
+ IgniteInternalFuture<?> fut = updateAll0(
null,
null,
null,
@@ -687,6 +691,11 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
false,
UPDATE,
true);
+
+ if (statsEnabled)
+ fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(),
start));
+
+ return fut;
}
/** {@inheritDoc} */
@@ -729,9 +738,18 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?>
removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
+ final boolean statsEnabled = ctx.statisticsEnabled();
+
+ final long start = statsEnabled ? System.nanoTime() : 0L;
+
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
- return removeAllAsync0(null, conflictMap, false, true);
+ IgniteInternalFuture<?> fut = removeAllAsync0(null, conflictMap,
false, true);
+
+ if (statsEnabled)
+ fut.listen(new
UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
+
+ return fut;
}
/**
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 2a6c3252031..f1c748588ae 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -914,9 +914,11 @@
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetAllTimeSta
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetAndRemoveTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllTimeStatClosure
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAllConflictTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllTimeStatClosure
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveAllConflictTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateRemoveTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAttributes
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
index 42c9049fd2e..4a55a7733d4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.TouchedExpiryPolicy;
@@ -42,6 +43,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.F;
@@ -1578,6 +1581,32 @@ public abstract class GridCacheAbstractMetricsSelfTest
extends GridCacheAbstract
assertTrue(waitForCondition(() -> removeTimeTotal.value() > 0,
getTestTimeout()));
}
+ /** */
+ @Test
+ public void testPutAndRemoveAllConflict() throws Exception {
+ IgniteInternalCache<Integer, Integer> cachex =
grid(0).cachex(DEFAULT_CACHE_NAME);
+
+ HistogramMetricImpl putAllConflictTime = metric("PutAllConflictTime");
+ HistogramMetricImpl removeAllConflictTime =
metric("RemoveAllConflictTime");
+
+ assertTrue(stream(putAllConflictTime.value()).allMatch(v -> v == 0));
+ assertTrue(stream(removeAllConflictTime.value()).allMatch(v -> v ==
0));
+
+ GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2);
+ GridCacheDrInfo val = new GridCacheDrInfo(new CacheObjectImpl(1,
null), confl);
+
+ IntFunction<KeyCacheObject> keyGen = i -> new KeyCacheObjectImpl(i,
null, cachex.affinity().partition(0));
+
+ cachex.putAllConflict(F.asMap(keyGen.apply(1), val));
+ cachex.removeAllConflict(F.asMap(keyGen.apply(1), confl));
+
+ cachex.putAllConflictAsync(F.asMap(keyGen.apply(2), val)).get();
+ cachex.removeAllConflictAsync(F.asMap(keyGen.apply(2), confl)).get();
+
+ assertTrue(waitForCondition(() ->
stream(putAllConflictTime.value()).sum() == 2, getTestTimeout()));
+ assertTrue(waitForCondition(() ->
stream(removeAllConflictTime.value()).sum() == 2, getTestTimeout()));
+ }
+
/**
* @param name Metric name to find.
* @return Metric.