This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new f7e4e3f6 IGNITE-22536 Added conflict resolver plugin metrics (#278)
f7e4e3f6 is described below
commit f7e4e3f6ae6be4ebf9925861b1805be4d1765fde
Author: Maksim Davydov <[email protected]>
AuthorDate: Thu Jul 4 18:16:53 2024 +0300
IGNITE-22536 Added conflict resolver plugin metrics (#278)
---
docs/_docs/cdc/change-data-capture-extensions.adoc | 13 +++++
.../CacheConflictResolutionManagerImpl.java | 12 ++++-
.../CacheVersionConflictResolverImpl.java | 39 ++++++++++++--
.../DebugCacheVersionConflictResolverImpl.java | 11 +++-
.../ignite/cdc/CacheConflictOperationsTest.java | 60 ++++++++++++++--------
...heConflictOperationsWithCustomResolverTest.java | 5 ++
6 files changed, 113 insertions(+), 27 deletions(-)
diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc
b/docs/_docs/cdc/change-data-capture-extensions.adoc
index 9f6a1a70..ac6b29d3 100644
--- a/docs/_docs/cdc/change-data-capture-extensions.adoc
+++ b/docs/_docs/cdc/change-data-capture-extensions.adoc
@@ -173,6 +173,19 @@ Conflict resolution field should contain user provided
monotonically increasing
. If `conflictResolveField` if provided then field values comparison used to
determine order.
. Conflict resolution failed. Update will be ignored.
+=== Conflict Resolver Metrics
+
+The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides
the following metrics:
+
+[cols="35%,65%",opts="header"]
+|===
+|Name |Description
+| `AcceptedCount` | Count of accepted entries.
+| `RejectedCount` | Count of rejected entries.
+|===
+
+These metrics are registered under `conflict-resolver` registry for each node
configured with this plugin.
+
=== Configuration example
Configuration is done via Ignite node plugin:
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
index 970a562c..3d42d53f 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
@@ -21,6 +21,7 @@ import org.apache.ignite.IgniteLogger;
import
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.lang.IgniteFuture;
/**
@@ -30,6 +31,9 @@ import org.apache.ignite.lang.IgniteFuture;
* @see CacheVersionConflictResolver
*/
public class CacheConflictResolutionManagerImpl<K, V> implements
CacheConflictResolutionManager<K, V> {
+ /** Conflict resolver metrics registry name. */
+ public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME =
"conflict-resolver";
+
/** Logger. */
private IgniteLogger log;
@@ -72,20 +76,24 @@ public class CacheConflictResolutionManagerImpl<K, V>
implements CacheConflictRe
@Override public CacheVersionConflictResolver conflictResolver() {
CacheVersionConflictResolver rslvr;
+ MetricRegistryImpl mreg =
cctx.grid().context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);
+
if (resolver != null)
rslvr = resolver;
else if (conflictResolverLog.isDebugEnabled()) {
rslvr = new DebugCacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
- conflictResolverLog
+ conflictResolverLog,
+ mreg
);
}
else {
rslvr = new CacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
- conflictResolverLog
+ conflictResolverLog,
+ mreg
);
}
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
index 422d8701..010eb3d9 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -23,6 +23,8 @@ import
org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -42,6 +44,18 @@ import org.apache.ignite.internal.util.typedef.internal.U;
* </ul>
*/
public class CacheVersionConflictResolverImpl implements
CacheVersionConflictResolver {
+ /** Accepted entries count name. */
+ public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount";
+
+ /** Accepted entries count description. */
+ public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted
entries";
+
+ /** Rejected entries count name. */
+ public static final String REJECTED_EVENTS_CNT = "RejectedCount";
+
+ /** Rejected entries count description. */
+ public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected
entries";
+
/**
* Cluster id.
*/
@@ -66,17 +80,32 @@ public class CacheVersionConflictResolverImpl implements
CacheVersionConflictRes
@GridToStringInclude
protected final boolean conflictResolveFieldEnabled;
+ /** Accepted entries count. */
+ private final LongAdderMetric acceptedCnt;
+
+ /** Rejected entries count. */
+ private final LongAdderMetric rejectedCnt;
+
/**
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
+ * @param mreg Metric registry.
*/
- public CacheVersionConflictResolverImpl(byte clusterId, String
conflictResolveField, IgniteLogger log) {
+ public CacheVersionConflictResolverImpl(
+ byte clusterId,
+ String conflictResolveField,
+ IgniteLogger log,
+ MetricRegistryImpl mreg
+ ) {
this.clusterId = clusterId;
this.conflictResolveField = conflictResolveField;
this.log = log;
conflictResolveFieldEnabled = conflictResolveField != null;
+
+ acceptedCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT,
ACCEPTED_EVENTS_CNT_DESC);
+ rejectedCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT,
REJECTED_EVENTS_CNT_DESC);
}
/** {@inheritDoc} */
@@ -90,10 +119,14 @@ public class CacheVersionConflictResolverImpl implements
CacheVersionConflictRes
boolean useNew = isUseNew(ctx, oldEntry, newEntry);
- if (useNew)
+ if (useNew) {
res.useNew();
- else
+ acceptedCnt.increment();
+ }
+ else {
res.useOld();
+ rejectedCnt.increment();
+ }
return res;
}
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
index 9b1b4908..b19585a4 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc.conflictresolve;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.typedef.internal.S;
/** Debug aware resolver. */
@@ -28,9 +29,15 @@ public class DebugCacheVersionConflictResolverImpl extends
CacheVersionConflictR
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
+ * @param mreg Metric registry.
*/
- public DebugCacheVersionConflictResolverImpl(byte clusterId, String
conflictResolveField, IgniteLogger log) {
- super(clusterId, conflictResolveField, log);
+ public DebugCacheVersionConflictResolverImpl(
+ byte clusterId,
+ String conflictResolveField,
+ IgniteLogger log,
+ MetricRegistryImpl mreg
+ ) {
+ super(clusterId, conflictResolveField, log, mreg);
}
/** {@inheritDoc} */
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
index 20a0e5f2..7131f49a 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
@@ -41,6 +41,8 @@ import
org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,6 +51,9 @@ import org.junit.runners.Parameterized;
import static java.util.Collections.singletonMap;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static
org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl.CONFLICT_RESOLVER_METRICS_REGISTRY_NAME;
+import static
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.ACCEPTED_EVENTS_CNT;
+import static
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.REJECTED_EVENTS_CNT;
/**
* Cache conflict operations test.
@@ -76,22 +81,25 @@ public class CacheConflictOperationsTest extends
GridCommonAbstractTest {
}
/** */
- private static IgniteCache<String, ConflictResolvableTestData> cache;
+ private static final byte FIRST_CLUSTER_ID = 1;
/** */
- private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+ private static final byte SECOND_CLUSTER_ID = 2;
/** */
- private static IgniteEx client;
+ private static final byte THIRD_CLUSTER_ID = 3;
/** */
- private static final byte FIRST_CLUSTER_ID = 1;
+ private IgniteCache<String, ConflictResolvableTestData> cache;
/** */
- private static final byte SECOND_CLUSTER_ID = 2;
+ private IgniteInternalCache<BinaryObject, BinaryObject> cachex;
/** */
- private static final byte THIRD_CLUSTER_ID = 3;
+ private IgniteEx client;
+
+ /** */
+ private IgniteEx ign;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
@@ -104,24 +112,14 @@ public class CacheConflictOperationsTest extends
GridCommonAbstractTest {
return
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
}
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- startGrid(1);
-
- client = startClientGrid(2);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() {
- cache = null;
- cachex = null;
- client = null;
- }
-
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
+ ign = startGrid(1);
+
+ client = startClientGrid(2);
+
if (cachex == null || cachex.configuration().getAtomicityMode() !=
cacheMode) {
if (cachex != null)
client.cache(DEFAULT_CACHE_NAME).destroy();
@@ -133,6 +131,11 @@ public class CacheConflictOperationsTest extends
GridCommonAbstractTest {
}
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
/** Tests that regular cache operations works with the conflict resolver
when there is no update conflicts. */
@Test
public void testSimpleUpdates() {
@@ -197,6 +200,8 @@ public class CacheConflictOperationsTest extends
GridCommonAbstractTest {
// Remove with the higher topVer should succeed.
putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId),
true);
+ checkMetrics(4, 8);
+
key = key("UpdateClusterUpdateReorder3", otherClusterId);
int topVer = 1;
@@ -207,12 +212,16 @@ public class CacheConflictOperationsTest extends
GridCommonAbstractTest {
putConflict(key, new GridCacheVersion(topVer, order, 2,
otherClusterId), false);
putConflict(key, new GridCacheVersion(topVer, order, 1,
otherClusterId), false);
+ checkMetrics(5, 10);
+
// Remove with the equal or lower nodeOrder should ignored.
removeConflict(key, new GridCacheVersion(topVer, order, 2,
otherClusterId), false);
removeConflict(key, new GridCacheVersion(topVer, order, 1,
otherClusterId), false);
// Remove with the higher nodeOrder should succeed.
putConflict(key, new GridCacheVersion(topVer, order, 3,
otherClusterId), true);
+
+ checkMetrics(6, 12);
}
/** Tests cache operations for entry replicated from another cluster. */
@@ -334,4 +343,15 @@ public class CacheConflictOperationsTest extends
GridCommonAbstractTest {
protected String conflictResolveField() {
return null;
}
+
+ /** Checks metrics for conflict resolver. */
+ protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
+ MetricRegistryImpl mreg =
ign.context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);
+
+ assertNotNull(mreg.findMetric(ACCEPTED_EVENTS_CNT));
+ assertNotNull(mreg.findMetric(REJECTED_EVENTS_CNT));
+
+ assertEquals(acceptedCnt,
((LongAdderMetric)mreg.findMetric(ACCEPTED_EVENTS_CNT)).value());
+ assertEquals(rejectedCnt,
((LongAdderMetric)mreg.findMetric(REJECTED_EVENTS_CNT)).value());
+ }
}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
index ae48a2c2..621aaf60 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
@@ -68,4 +68,9 @@ public class CacheConflictOperationsWithCustomResolverTest
extends CacheConflict
return res;
}
}
+
+ /** {@inheritDoc} */
+ @Override protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
+ // No op.
+ }
}