This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new f7e4e3f6 IGNITE-22536 Added conflict resolver plugin metrics (#278)
f7e4e3f6 is described below

commit f7e4e3f6ae6be4ebf9925861b1805be4d1765fde
Author: Maksim Davydov <[email protected]>
AuthorDate: Thu Jul 4 18:16:53 2024 +0300

    IGNITE-22536 Added conflict resolver plugin metrics (#278)
---
 docs/_docs/cdc/change-data-capture-extensions.adoc | 13 +++++
 .../CacheConflictResolutionManagerImpl.java        | 12 ++++-
 .../CacheVersionConflictResolverImpl.java          | 39 ++++++++++++--
 .../DebugCacheVersionConflictResolverImpl.java     | 11 +++-
 .../ignite/cdc/CacheConflictOperationsTest.java    | 60 ++++++++++++++--------
 ...heConflictOperationsWithCustomResolverTest.java |  5 ++
 6 files changed, 113 insertions(+), 27 deletions(-)

diff --git a/docs/_docs/cdc/change-data-capture-extensions.adoc 
b/docs/_docs/cdc/change-data-capture-extensions.adoc
index 9f6a1a70..ac6b29d3 100644
--- a/docs/_docs/cdc/change-data-capture-extensions.adoc
+++ b/docs/_docs/cdc/change-data-capture-extensions.adoc
@@ -173,6 +173,19 @@ Conflict resolution field should contain user provided 
monotonically increasing
 . If `conflictResolveField` if provided then field values comparison used to 
determine order.
 . Conflict resolution failed. Update will be ignored.
 
+=== Conflict Resolver Metrics
+
+The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides 
the following metrics:
+
+[cols="35%,65%",opts="header"]
+|===
+|Name |Description
+| `AcceptedCount` | Count of accepted entries.
+| `RejectedCount` | Count of rejected entries.
+|===
+
+These metrics are registered under `conflict-resolver` registry for each node 
configured with this plugin.
+
 === Configuration example
 Configuration is done via Ignite node plugin:
 
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
index 970a562c..3d42d53f 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
@@ -21,6 +21,7 @@ import org.apache.ignite.IgniteLogger;
 import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.lang.IgniteFuture;
 
 /**
@@ -30,6 +31,9 @@ import org.apache.ignite.lang.IgniteFuture;
  * @see CacheVersionConflictResolver
  */
 public class CacheConflictResolutionManagerImpl<K, V> implements 
CacheConflictResolutionManager<K, V> {
+    /** Conflict resolver metrics registry name. */
+    public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME = 
"conflict-resolver";
+
     /** Logger. */
     private IgniteLogger log;
 
@@ -72,20 +76,24 @@ public class CacheConflictResolutionManagerImpl<K, V> 
implements CacheConflictRe
     @Override public CacheVersionConflictResolver conflictResolver() {
         CacheVersionConflictResolver rslvr;
 
+        MetricRegistryImpl mreg = 
cctx.grid().context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);
+
         if (resolver != null)
             rslvr = resolver;
         else if (conflictResolverLog.isDebugEnabled()) {
             rslvr = new DebugCacheVersionConflictResolverImpl(
                 clusterId,
                 conflictResolveField,
-                conflictResolverLog
+                conflictResolverLog,
+                mreg
             );
         }
         else {
             rslvr = new CacheVersionConflictResolverImpl(
                 clusterId,
                 conflictResolveField,
-                conflictResolverLog
+                conflictResolverLog,
+                mreg
             );
         }
 
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
index 422d8701..010eb3d9 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -23,6 +23,8 @@ import 
org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -42,6 +44,18 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  * </ul>
  */
 public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictResolver {
+    /** Accepted entries count name. */
+    public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount";
+
+    /** Accepted entries count description. */
+    public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted 
entries";
+
+    /** Rejected entries count name. */
+    public static final String REJECTED_EVENTS_CNT = "RejectedCount";
+
+    /** Rejected entries count description. */
+    public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected 
entries";
+
     /**
      * Cluster id.
      */
@@ -66,17 +80,32 @@ public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictRes
     @GridToStringInclude
     protected final boolean conflictResolveFieldEnabled;
 
+    /** Accepted entries count. */
+    private final LongAdderMetric acceptedCnt;
+
+    /** Rejected entries count. */
+    private final LongAdderMetric rejectedCnt;
+
     /**
      * @param clusterId Data center id.
      * @param conflictResolveField Field to resolve conflicts.
      * @param log Logger.
+     * @param mreg Metric registry.
      */
-    public CacheVersionConflictResolverImpl(byte clusterId, String 
conflictResolveField, IgniteLogger log) {
+    public CacheVersionConflictResolverImpl(
+        byte clusterId,
+        String conflictResolveField,
+        IgniteLogger log,
+        MetricRegistryImpl mreg
+    ) {
         this.clusterId = clusterId;
         this.conflictResolveField = conflictResolveField;
         this.log = log;
 
         conflictResolveFieldEnabled = conflictResolveField != null;
+
+        acceptedCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, 
ACCEPTED_EVENTS_CNT_DESC);
+        rejectedCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, 
REJECTED_EVENTS_CNT_DESC);
     }
 
     /** {@inheritDoc} */
@@ -90,10 +119,14 @@ public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictRes
 
         boolean useNew = isUseNew(ctx, oldEntry, newEntry);
 
-        if (useNew)
+        if (useNew) {
             res.useNew();
-        else
+            acceptedCnt.increment();
+        }
+        else {
             res.useOld();
+            rejectedCnt.increment();
+        }
 
         return res;
     }
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
index 9b1b4908..b19585a4 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc.conflictresolve;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /** Debug aware resolver. */
@@ -28,9 +29,15 @@ public class DebugCacheVersionConflictResolverImpl extends 
CacheVersionConflictR
      * @param clusterId Data center id.
      * @param conflictResolveField Field to resolve conflicts.
      * @param log Logger.
+     * @param mreg Metric registry.
      */
-    public DebugCacheVersionConflictResolverImpl(byte clusterId, String 
conflictResolveField, IgniteLogger log) {
-        super(clusterId, conflictResolveField, log);
+    public DebugCacheVersionConflictResolverImpl(
+        byte clusterId,
+        String conflictResolveField,
+        IgniteLogger log,
+        MetricRegistryImpl mreg
+    ) {
+        super(clusterId, conflictResolveField, log, mreg);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
index 20a0e5f2..7131f49a 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
@@ -41,6 +41,8 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,6 +51,9 @@ import org.junit.runners.Parameterized;
 import static java.util.Collections.singletonMap;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl.CONFLICT_RESOLVER_METRICS_REGISTRY_NAME;
+import static 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.ACCEPTED_EVENTS_CNT;
+import static 
org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.REJECTED_EVENTS_CNT;
 
 /**
  * Cache conflict operations test.
@@ -76,22 +81,25 @@ public class CacheConflictOperationsTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    private static IgniteCache<String, ConflictResolvableTestData> cache;
+    private static final byte FIRST_CLUSTER_ID = 1;
 
     /** */
-    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+    private static final byte SECOND_CLUSTER_ID = 2;
 
     /** */
-    private static IgniteEx client;
+    private static final byte THIRD_CLUSTER_ID = 3;
 
     /** */
-    private static final byte FIRST_CLUSTER_ID = 1;
+    private IgniteCache<String, ConflictResolvableTestData> cache;
 
     /** */
-    private static final byte SECOND_CLUSTER_ID = 2;
+    private IgniteInternalCache<BinaryObject, BinaryObject> cachex;
 
     /** */
-    private static final byte THIRD_CLUSTER_ID = 3;
+    private IgniteEx client;
+
+    /** */
+    private IgniteEx ign;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -104,24 +112,14 @@ public class CacheConflictOperationsTest extends 
GridCommonAbstractTest {
         return 
super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        startGrid(1);
-
-        client = startClientGrid(2);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() {
-        cache = null;
-        cachex = null;
-        client = null;
-    }
-
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
+        ign = startGrid(1);
+
+        client = startClientGrid(2);
+
         if (cachex == null || cachex.configuration().getAtomicityMode() != 
cacheMode) {
             if (cachex != null)
                 client.cache(DEFAULT_CACHE_NAME).destroy();
@@ -133,6 +131,11 @@ public class CacheConflictOperationsTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
     /** Tests that regular cache operations works with the conflict resolver 
when there is no update conflicts. */
     @Test
     public void testSimpleUpdates() {
@@ -197,6 +200,8 @@ public class CacheConflictOperationsTest extends 
GridCommonAbstractTest {
         // Remove with the higher topVer should succeed.
         putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), 
true);
 
+        checkMetrics(4, 8);
+
         key = key("UpdateClusterUpdateReorder3", otherClusterId);
 
         int topVer = 1;
@@ -207,12 +212,16 @@ public class CacheConflictOperationsTest extends 
GridCommonAbstractTest {
         putConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
         putConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
 
+        checkMetrics(5, 10);
+
         // Remove with the equal or lower nodeOrder should ignored.
         removeConflict(key, new GridCacheVersion(topVer, order, 2, 
otherClusterId), false);
         removeConflict(key, new GridCacheVersion(topVer, order, 1, 
otherClusterId), false);
 
         // Remove with the higher nodeOrder should succeed.
         putConflict(key, new GridCacheVersion(topVer, order, 3, 
otherClusterId), true);
+
+        checkMetrics(6, 12);
     }
 
     /** Tests cache operations for entry replicated from another cluster. */
@@ -334,4 +343,15 @@ public class CacheConflictOperationsTest extends 
GridCommonAbstractTest {
     protected String conflictResolveField() {
         return null;
     }
+
+    /** Checks metrics for conflict resolver. */
+    protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
+        MetricRegistryImpl mreg = 
ign.context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);
+
+        assertNotNull(mreg.findMetric(ACCEPTED_EVENTS_CNT));
+        assertNotNull(mreg.findMetric(REJECTED_EVENTS_CNT));
+
+        assertEquals(acceptedCnt, 
((LongAdderMetric)mreg.findMetric(ACCEPTED_EVENTS_CNT)).value());
+        assertEquals(rejectedCnt, 
((LongAdderMetric)mreg.findMetric(REJECTED_EVENTS_CNT)).value());
+    }
 }
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
index ae48a2c2..621aaf60 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java
@@ -68,4 +68,9 @@ public class CacheConflictOperationsWithCustomResolverTest 
extends CacheConflict
             return res;
         }
     }
+
+    /** {@inheritDoc} */
+    @Override protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
+        // No op.
+    }
 }

Reply via email to