This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f39591  IGNITE-14355 CDC Metrics (#9398)
2f39591 is described below

commit 2f395910925715e0a84e919591f1be6c49e8f631
Author: Nikolay <[email protected]>
AuthorDate: Tue Sep 28 09:47:27 2021 +0300

    IGNITE-14355 CDC Metrics (#9398)
---
 .../org/apache/ignite/cdc/CdcConfiguration.java    |  23 ++
 .../java/org/apache/ignite/cdc/CdcConsumer.java    |   8 +-
 .../org/apache/ignite/internal/IgnitionEx.java     |   9 +-
 .../org/apache/ignite/internal/cdc/CdcMain.java    | 243 ++++++++++++++-------
 .../ignite/internal/cdc/WalRecordsConsumer.java    |  29 ++-
 .../wal/reader/StandaloneGridKernalContext.java    |  21 +-
 .../processors/metric/GridMetricManager.java       |   4 +
 .../org/apache/ignite/cdc/AbstractCdcTest.java     | 186 ++++++++++++++--
 .../org/apache/ignite/cdc/CdcCacheVersionTest.java |   9 +-
 .../java/org/apache/ignite/cdc/CdcSelfTest.java    | 142 ++++++++----
 .../org/apache/ignite/internal/cdc/SqlCdcTest.java |  28 ++-
 .../apache/ignite/cdc/CdcConfigurationTest.java    |   3 +-
 12 files changed, 542 insertions(+), 163 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java 
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
index ca2d89c..f2f1873 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
 
 /**
  * This class defines {@link CdcMain} runtime configuration.
@@ -39,6 +40,9 @@ public class CdcConfiguration {
     /** Change Data Capture consumer. */
     private CdcConsumer consumer;
 
+    /** Metric exporter SPI. */
+    private MetricExporterSpi[] metricExporterSpi;
+
     /** Keep binary flag.<br>Default value {@code true}. */
     private boolean keepBinary = DFLT_KEEP_BINARY;
 
@@ -66,6 +70,25 @@ public class CdcConfiguration {
         this.consumer = consumer;
     }
 
+    /**
+     * Sets fully configured instances of {@link MetricExporterSpi}.
+     *
+     * @param metricExporterSpi Fully configured instances of {@link 
MetricExporterSpi}.
+     * @see CdcConfiguration#getMetricExporterSpi()
+     */
+    public void setMetricExporterSpi(MetricExporterSpi... metricExporterSpi) {
+        this.metricExporterSpi = metricExporterSpi;
+    }
+
+    /**
+     * Gets fully configured metric SPI implementations.
+     *
+     * @return Metric exporter SPI implementations.
+     */
+    public MetricExporterSpi[] getMetricExporterSpi() {
+        return metricExporterSpi;
+    }
+
     /** @return keep binary value. */
     public boolean isKeepBinary() {
         return keepBinary;
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java 
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
index af119e9..b0bf648 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.resources.LoggerResource;
 
@@ -29,7 +30,7 @@ import org.apache.ignite.resources.LoggerResource;
  * This consumer will receive data change events during {@link CdcMain} 
application invocation.
  * The lifecycle of the consumer is the following:
  * <ul>
- *     <li>Start of the consumer {@link #start()}.</li>
+ *     <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
  *     <li>Notification of the consumer by the {@link #onEvents(Iterator)} 
call.</li>
  *     <li>Stop of the consumer {@link #stop()}.</li>
  * </ul>
@@ -55,8 +56,9 @@ import org.apache.ignite.resources.LoggerResource;
 public interface CdcConsumer {
     /**
      * Starts the consumer.
+     * @param mreg Metric registry for consumer specific metrics.
      */
-    public void start();
+    public void start(MetricRegistry mreg);
 
     /**
      * Handles entry changes events.
@@ -71,7 +73,7 @@ public interface CdcConsumer {
 
     /**
      * Stops the consumer.
-     * This methods can be invoked only after {@link #start()}.
+     * This methods can be invoked only after {@link #start(MetricRegistry)}.
      */
     public void stop();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index e11cf31..b6d4fe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1991,8 +1991,7 @@ public class IgnitionEx {
             if (myCfg.getUserAttributes() == null)
                 myCfg.setUserAttributes(Collections.<String, 
Object>emptyMap());
 
-            if (myCfg.getMBeanServer() == null && !U.IGNITE_MBEANS_DISABLED)
-                
myCfg.setMBeanServer(ManagementFactory.getPlatformMBeanServer());
+            initializeDefaultMBeanServer(myCfg);
 
             Marshaller marsh = myCfg.getMarshaller();
 
@@ -2674,6 +2673,12 @@ public class IgnitionEx {
         }
     }
 
+    /** Initialize default mbean server. */
+    public static void initializeDefaultMBeanServer(IgniteConfiguration myCfg) 
{
+        if (myCfg.getMBeanServer() == null && !U.IGNITE_MBEANS_DISABLED)
+            myCfg.setMBeanServer(ManagementFactory.getPlatformMBeanServer());
+    }
+
     /**
      * @param cfg Ignite Configuration with legacy data storage configuration.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index a2bb534..73bd629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cdc.CdcConsumer;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridLoggerProxy;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.MarshallerContextImpl;
@@ -47,23 +48,23 @@ import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
-import org.apache.ignite.internal.processors.resource.GridResourceIoc;
-import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.resources.SpringApplicationContextResource;
-import org.apache.ignite.resources.SpringResource;
 import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
 
 import static org.apache.ignite.internal.IgniteKernal.NL;
 import static org.apache.ignite.internal.IgniteKernal.SITE;
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
 import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static 
org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 
 /**
  * Change Data Capture (CDC) application.
@@ -114,14 +115,50 @@ public class CdcMain implements Runnable {
     /** State dir. */
     public static final String STATE_DIR = "state";
 
+    /** Current segment index metric name. */
+    public static final String CUR_SEG_IDX = "CurrentSegmentIndex";
+
+    /** Committed segment index metric name. */
+    public static final String COMMITTED_SEG_IDX = "CommittedSegmentIndex";
+
+    /** Committed segment offset metric name. */
+    public static final String COMMITTED_SEG_OFFSET = "CommittedSegmentOffset";
+
+    /** Last segment consumption time. */
+    public static final String LAST_SEG_CONSUMPTION_TIME = 
"LastSegmentConsumptionTime";
+
+    /** Binary metadata metric name. */
+    public static final String BINARY_META_DIR = "BinaryMetaDir";
+
+    /** Marshaller metric name. */
+    public static final String MARSHALLER_DIR = "MarshallerDir";
+
+    /** Cdc directory metric name. */
+    public static final String CDC_DIR = "CdcDir";
+
     /** Ignite configuration. */
     private final IgniteConfiguration igniteCfg;
 
     /** Spring resource context. */
     private final GridSpringResourceContext ctx;
 
+    /** CDC metrics registry. */
+    private MetricRegistry mreg;
+
+    /** Current segment index metric. */
+    private AtomicLongMetric curSegmentIdx;
+
+    /** Committed state segment index metric. */
+    private AtomicLongMetric committedSegmentIdx;
+
+    /** Committed state segment offset metric. */
+    private AtomicLongMetric committedSegmentOffset;
+
+    /** Time of last segment consumption. */
+    private AtomicLongMetric lastSegmentConsumptionTs;
+
     /** Change Data Capture configuration. */
-    private final CdcConfiguration cdcCfg;
+    protected final CdcConfiguration cdcCfg;
 
     /** WAL iterator factory. */
     private final IgniteWalIteratorFactory factory;
@@ -161,7 +198,8 @@ public class CdcMain implements Runnable {
     public CdcMain(
         IgniteConfiguration cfg,
         GridSpringResourceContext ctx,
-        CdcConfiguration cdcCfg) {
+        CdcConfiguration cdcCfg
+    ) {
         igniteCfg = new IgniteConfiguration(cfg);
         this.ctx = ctx;
         this.cdcCfg = cdcCfg;
@@ -207,29 +245,7 @@ public class CdcMain implements Runnable {
             throw new IllegalArgumentException(ERR_MSG);
         }
 
-        PdsFolderSettings<CdcFileLockHolder> settings =
-            new PdsFolderResolver<>(igniteCfg, log, null, 
this::tryLock).resolve();
-
-        if (settings == null) {
-            throw new IgniteException("Can't find folder to read WAL segments 
from based on provided configuration! " +
-                "[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId=" 
+ igniteCfg.getConsistentId() + ']');
-        }
-
-        CdcFileLockHolder lock = settings.getLockedFileLockHolder();
-
-        if (lock == null) {
-            File consIdDir = settings.persistentStoreNodePath();
-
-            lock = tryLock(consIdDir);
-
-            if (lock == null) {
-                throw new IgniteException(
-                    "Can't acquire lock for Change Data Capture folder [dir=" 
+ consIdDir.getAbsolutePath() + ']'
-                );
-            }
-        }
-
-        try {
+        try (CdcFileLockHolder lock = lockPds()) {
             String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
 
             Files.createDirectories(cdcDir.resolve(STATE_DIR));
@@ -244,30 +260,120 @@ public class CdcMain implements Runnable {
                 log.info("Ignite node Marshaller [dir=" + marshaller + ']');
             }
 
-            injectResources(consumer.consumer());
+            StandaloneGridKernalContext kctx = startStandaloneKernal();
 
-            state = new CdcConsumerState(cdcDir.resolve(STATE_DIR));
+            initMetrics();
 
-            initState = state.load();
+            try {
+                kctx.resource().injectGeneric(consumer.consumer());
 
-            if (initState != null && log.isInfoEnabled())
-                log.info("Initial state loaded [state=" + initState + ']');
+                state = createState(cdcDir.resolve(STATE_DIR));
 
-            consumer.start();
+                initState = state.load();
 
-            try {
-                consumeWalSegmentsUntilStopped();
+                if (initState != null) {
+                    committedSegmentIdx.value(initState.index());
+                    committedSegmentOffset.value(initState.fileOffset());
+
+                    if (log.isInfoEnabled())
+                        log.info("Initial state loaded [state=" + initState + 
']');
+                }
+
+                consumer.start(mreg, kctx.metric().registry(metricName("cdc", 
"consumer")));
+
+                try {
+                    consumeWalSegmentsUntilStopped();
+                }
+                finally {
+                    consumer.stop();
+
+                    if (log.isInfoEnabled())
+                        log.info("Ignite Change Data Capture Application 
stopped.");
+                }
             }
             finally {
-                consumer.stop();
+                for (GridComponent comp : kctx)
+                    comp.stop(false);
+            }
+        }
+    }
+
+    /** Creates consumer state. */
+    protected CdcConsumerState createState(Path stateDir) {
+        return new CdcConsumerState(stateDir);
+    }
 
-                if (log.isInfoEnabled())
-                    log.info("Ignite Change Data Capture Application 
stopped.");
+    /**
+     * @return Kernal instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    private StandaloneGridKernalContext startStandaloneKernal() throws 
IgniteCheckedException {
+        StandaloneGridKernalContext kctx = new 
StandaloneGridKernalContext(log, binaryMeta, marshaller) {
+            @Override protected IgniteConfiguration 
prepareIgniteConfiguration() {
+                IgniteConfiguration cfg = super.prepareIgniteConfiguration();
+
+                
cfg.setIgniteInstanceName(cdcInstanceName(igniteCfg.getIgniteInstanceName()));
+
+                if (!F.isEmpty(cdcCfg.getMetricExporterSpi()))
+                    cfg.setMetricExporterSpi(cdcCfg.getMetricExporterSpi());
+
+                initializeDefaultMBeanServer(cfg);
+
+                return cfg;
             }
+        };
+
+        kctx.resource().setSpringContext(ctx);
+
+        for (GridComponent comp : kctx)
+            comp.start();
+
+        mreg = kctx.metric().registry("cdc");
+
+        return kctx;
+    }
+
+    /** Initialize metrics. */
+    private void initMetrics() {
+        mreg.objectMetric(BINARY_META_DIR, String.class, "Binary meta 
directory").value(binaryMeta.getAbsolutePath());
+        mreg.objectMetric(MARSHALLER_DIR, String.class, "Marshaller 
directory").value(marshaller.getAbsolutePath());
+        mreg.objectMetric(CDC_DIR, String.class, "CDC 
directory").value(cdcDir.toFile().getAbsolutePath());
+
+        curSegmentIdx = mreg.longMetric(CUR_SEG_IDX, "Current segment index");
+        committedSegmentIdx = mreg.longMetric(COMMITTED_SEG_IDX, "Committed 
segment index");
+        committedSegmentOffset = mreg.longMetric(COMMITTED_SEG_OFFSET, 
"Committed segment offset");
+        lastSegmentConsumptionTs =
+            mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of 
consumption of WAL segment");
+    }
+
+    /**
+     * @return CDC lock holder for specifi folder.
+     * @throws IgniteCheckedException If failed.
+     */
+    private CdcFileLockHolder lockPds() throws IgniteCheckedException {
+        PdsFolderSettings<CdcFileLockHolder> settings =
+            new PdsFolderResolver<>(igniteCfg, log, 
igniteCfg.getConsistentId(), this::tryLock).resolve();
+
+        if (settings == null) {
+            throw new IgniteException("Can't find folder to read WAL segments 
from based on provided configuration! " +
+                "[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId=" 
+ igniteCfg.getConsistentId() + ']');
         }
-        finally {
-            U.closeQuiet(lock);
+
+        CdcFileLockHolder lock = settings.getLockedFileLockHolder();
+
+        if (lock == null) {
+            File consIdDir = settings.persistentStoreNodePath();
+
+            lock = tryLock(consIdDir);
+
+            if (lock == null) {
+                throw new IgniteException(
+                    "Can't acquire lock for Change Data Capture folder [dir=" 
+ consIdDir.getAbsolutePath() + ']'
+                );
+            }
         }
+
+        return lock;
     }
 
     /** Waits and consumes new WAL segments until stopped. */
@@ -313,6 +419,8 @@ public class CdcMain implements Runnable {
         if (log.isInfoEnabled())
             log.info("Processing WAL segment [segment=" + segment + ']');
 
+        lastSegmentConsumptionTs.value(System.currentTimeMillis());
+
         IgniteWalIteratorFactory.IteratorParametersBuilder builder =
             new IgniteWalIteratorFactory.IteratorParametersBuilder()
                 .log(log)
@@ -322,9 +430,11 @@ public class CdcMain implements Runnable {
                 .filesOrDirs(segment.toFile())
                 .addFilter((type, ptr) -> type == DATA_RECORD_V2);
 
-        if (initState != null) {
-            long segmentIdx = segmentIndex(segment);
+        long segmentIdx = segmentIndex(segment);
+
+        curSegmentIdx.value(segmentIdx);
 
+        if (initState != null) {
             if (segmentIdx > initState.index()) {
                 throw new IgniteException("Found segment greater then saved 
state. Some events are missed. Exiting! " +
                     "[state=" + initState + ", segment=" + segmentIdx + ']');
@@ -364,7 +474,12 @@ public class CdcMain implements Runnable {
                 if (commit) {
                     assert it.lastRead().isPresent();
 
-                    state.save(it.lastRead().get());
+                    WALPointer ptr = it.lastRead().get();
+
+                    state.save(ptr);
+
+                    committedSegmentIdx.value(ptr.index());
+                    committedSegmentOffset.value(ptr.fileOffset());
 
                     // Can delete after new file state save.
                     if (!processedSegments.isEmpty()) {
@@ -474,37 +589,6 @@ public class CdcMain implements Runnable {
     }
 
     /** */
-    private void injectResources(CdcConsumer dataConsumer) throws 
IgniteCheckedException {
-        GridResourceIoc ioc = new GridResourceIoc();
-
-        ioc.inject(
-            dataConsumer,
-            LoggerResource.class,
-            new GridResourceLoggerInjector(log),
-            null,
-            null
-        );
-
-        if (ctx != null) {
-            ioc.inject(
-                dataConsumer,
-                SpringResource.class,
-                ctx.springBeanInjector(),
-                null,
-                null
-            );
-
-            ioc.inject(
-                dataConsumer,
-                SpringApplicationContextResource.class,
-                ctx.springContextInjector(),
-                null,
-                null
-            );
-        }
-    }
-
-    /** */
     private void ackAsciiLogo() {
         String ver = "ver. " + ACK_VER_STR;
 
@@ -553,4 +637,9 @@ public class CdcMain implements Runnable {
                 "");
         }
     }
+
+    /** */
+    public static String cdcInstanceName(String igniteInstanceName) {
+        return "cdc-" + igniteInstanceName;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 38c2938..66853a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -28,6 +28,8 @@ import 
org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -44,12 +46,24 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD
  * @see CdcConsumer
  */
 public class WalRecordsConsumer<K, V> {
+    /** Events count metric name. */
+    public static final String EVTS_CNT = "EventsCount";
+
+    /** Last event time metric name. */
+    public static final String LAST_EVT_TIME = "LastEventTime";
+
     /** Ignite logger. */
     private final IgniteLogger log;
 
     /** Data change events consumer. */
     private final CdcConsumer consumer;
 
+    /** Event count metric */
+    private AtomicLongMetric evtsCnt;
+
+    /** Timestamp of last event process. */
+    private AtomicLongMetric lastEvtTs;
+
     /** Operations types we interested in. */
     private static final EnumSet<GridCacheOperation> OPERATIONS_TYPES = 
EnumSet.of(CREATE, UPDATE, DELETE, TRANSFORM);
 
@@ -99,6 +113,10 @@ public class WalRecordsConsumer<K, V> {
                 if (!hasCurrent())
                     throw new NoSuchElementException();
 
+                evtsCnt.increment();
+
+                lastEvtTs.value(System.currentTimeMillis());
+
                 return entries.next();
             }
 
@@ -142,10 +160,15 @@ public class WalRecordsConsumer<K, V> {
     /**
      * Starts the consumer.
      *
+     * @param cdcReg CDC metric registry.
+     * @param cdcConsumerReg CDC consumer metric registry.
      * @throws IgniteCheckedException If failed.
      */
-    public void start() throws IgniteCheckedException {
-        consumer.start();
+    public void start(MetricRegistry cdcReg, MetricRegistry cdcConsumerReg) 
throws IgniteCheckedException {
+        consumer.start(cdcConsumerReg);
+
+        evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by 
the consumer");
+        lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event 
process");
 
         if (log.isDebugEnabled())
             log.debug("WalRecordsConsumer started [consumer=" + 
consumer.getClass() + ']');
@@ -153,7 +176,7 @@ public class WalRecordsConsumer<K, V> {
 
     /**
      * Stops the consumer.
-     * This methods can be invoked only after {@link #start()}.
+     * This methods can be invoked only after {@link #start(MetricRegistry, 
MetricRegistry)}.
      */
     public void stop() {
         consumer.stop();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e22d2bf..19a7d41 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -126,6 +126,9 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     /** Empty plugin processor. */
     private IgnitePluginProcessor pluginProc;
 
+    /** */
+    private GridResourceProcessor rsrcProc;
+
     /** Metrics manager. */
     private final GridMetricManager metricMgr;
 
@@ -166,6 +169,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
         this.marshallerCtx = new MarshallerContextImpl(null, null);
         this.cfg = prepareIgniteConfiguration();
+        this.rsrcProc = new GridResourceProcessor(this);
         this.metricMgr = new GridMetricManager(this);
         this.sysViewMgr = new GridSystemViewManager(this);
 
@@ -175,7 +179,9 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
         this.cacheObjProcessor = binaryProcessor(this, 
binaryMetadataFileStoreDir);
 
+        comps.add(rsrcProc);
         comps.add(cacheObjProcessor);
+        comps.add(metricMgr);
 
         if (marshallerMappingFileStoreDir != null) {
             
marshallerCtx.setMarshallerMappingFileStoreDir(marshallerMappingFileStoreDir);
@@ -226,6 +232,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
         cfg.setMetricExporterSpi(new NoopMetricExporterSpi());
         cfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi());
+        cfg.setGridLogger(log);
 
         return cfg;
     }
@@ -269,9 +276,8 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     @Override public IgniteEx grid() {
         final IgniteEx kernal = new IgniteKernal();
         try {
-            Field fieldCfg = kernal.getClass().getDeclaredField("cfg");
-            fieldCfg.setAccessible(true);
-            fieldCfg.set(kernal, cfg);
+            setField(kernal, "cfg", cfg);
+            setField(kernal, "igniteInstanceName", 
cfg.getIgniteInstanceName());
         }
         catch (NoSuchFieldException | IllegalAccessException e) {
             log.error("", e);
@@ -279,6 +285,13 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
         return kernal;
     }
 
+    /** */
+    private void setField(IgniteEx kernal, String name, Object val) throws 
NoSuchFieldException, IllegalAccessException {
+        Field field = kernal.getClass().getDeclaredField(name);
+        field.setAccessible(true);
+        field.set(kernal, val);
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteConfiguration config() {
         return cfg;
@@ -306,7 +319,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
     /** {@inheritDoc} */
     @Override public GridResourceProcessor resource() {
-        return null;
+        return rsrcProc;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index 20aea69..c6c4708 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -259,6 +259,10 @@ public class GridMetricManager extends 
GridManagerAdapter<MetricExporterSpi> imp
 
         startSpi();
 
+        // In case standalone kernal start.
+        if (ctx.internalSubscriptionProcessor() == null)
+            return;
+
         
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(
             new DistributedMetastorageLifecycleListener() {
                 /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index e250f68..ec43d9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -17,26 +17,52 @@
 
 package org.apache.ignite.cdc;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import javax.management.DynamicMBean;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cdc.CdcConsumerState;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.CI3;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
+import org.apache.ignite.spi.metric.ObjectMetric;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
+import static org.apache.ignite.internal.cdc.CdcMain.BINARY_META_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.CDC_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET;
+import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME;
+import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -61,16 +87,71 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    public void addAndWaitForConsumption(
+    protected CdcMain createCdc(CdcConsumer cnsmr, IgniteConfiguration cfg) {
+        return createCdc(cnsmr, cfg, null);
+    }
+
+    /** */
+    protected CdcMain createCdc(
+        CdcConsumer cnsmr,
+        IgniteConfiguration cfg,
+        CountDownLatch latch,
+        GridAbsPredicate... conditions
+    ) {
+        CdcConfiguration cdcCfg = new CdcConfiguration();
+
+        cdcCfg.setConsumer(cnsmr);
+        cdcCfg.setKeepBinary(keepBinary());
+        cdcCfg.setMetricExporterSpi(metricExporters());
+
+        return new CdcMain(cfg, null, cdcCfg) {
+            @Override protected CdcConsumerState createState(Path stateDir) {
+                return new CdcConsumerState(stateDir) {
+                    @Override public void save(WALPointer ptr) throws 
IOException {
+                        super.save(ptr);
+
+                        if (!F.isEmpty(conditions)) {
+                            for (GridAbsPredicate p : conditions) {
+                                if (!p.apply())
+                                    return;
+                            }
+
+                            latch.countDown();
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    /** */
+    protected void addAndWaitForConsumption(
         UserCdcConsumer cnsmr,
-        CdcMain cdc,
+        IgniteConfiguration cfg,
         IgniteCache<Integer, CdcSelfTest.User> cache,
         IgniteCache<Integer, CdcSelfTest.User> txCache,
         CI3<IgniteCache<Integer, CdcSelfTest.User>, Integer, Integer> addData,
         int from,
         int to,
-        long timeout
-    ) throws IgniteCheckedException {
+        boolean waitForCommit
+    ) throws Exception {
+        GridAbsPredicate cachePredicate = sizePredicate(to - from, 
cache.getName(), UPDATE, cnsmr);
+        GridAbsPredicate txPredicate = txCache == null
+            ? null
+            : sizePredicate(to - from, txCache.getName(), UPDATE, cnsmr);
+
+        CdcMain cdc;
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        if (waitForCommit) {
+            cdc = txCache == null
+                ? createCdc(cnsmr, cfg, latch, cachePredicate)
+                : createCdc(cnsmr, cfg, latch, cachePredicate, txPredicate);
+        }
+        else
+            cdc = createCdc(cnsmr, cfg);
+
         IgniteInternalFuture<?> fut = runAsync(cdc);
 
         addData.apply(cache, from, to);
@@ -78,10 +159,16 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
         if (txCache != null)
             addData.apply(txCache, from, to);
 
-        assertTrue(waitForSize(to - from, cache.getName(), UPDATE, timeout, 
cnsmr));
+        if (waitForCommit)
+            latch.await(getTestTimeout(), MILLISECONDS);
+        else {
+            assertTrue(waitForCondition(cachePredicate, getTestTimeout()));
 
-        if (txCache != null)
-            assertTrue(waitForSize(to - from, txCache.getName(), UPDATE, 
timeout, cnsmr));
+            if (txCache != null)
+                assertTrue(waitForCondition(txPredicate, getTestTimeout()));
+        }
+
+        checkMetrics(cdc, txCache == null ? to : to * 2);
 
         fut.cancel();
 
@@ -96,29 +183,84 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    public boolean waitForSize(
+    public void waitForSize(
         int expSz,
         String cacheName,
         CdcSelfTest.ChangeEventType evtType,
-        long timeout,
         TestCdcConsumer<?>... cnsmrs
     ) throws IgniteInterruptedCheckedException {
-        return waitForCondition(
-            () -> {
-                int sum = Arrays.stream(cnsmrs).mapToInt(c -> 
F.size(c.data(evtType, cacheId(cacheName)))).sum();
-                return sum == expSz;
-            },
-            timeout);
+        assertTrue(waitForCondition(sizePredicate(expSz, cacheName, evtType, 
cnsmrs), getTestTimeout()));
     }
 
     /** */
-    public CdcConfiguration cdcConfig(CdcConsumer cnsmr) {
-        CdcConfiguration cdcCfg = new CdcConfiguration();
+    protected GridAbsPredicate sizePredicate(
+        int expSz,
+        String cacheName,
+        ChangeEventType evtType,
+        TestCdcConsumer<?>... cnsmrs
+    ) {
+        return () -> {
+            int sum = Arrays.stream(cnsmrs).mapToInt(c -> 
F.size(c.data(evtType, cacheId(cacheName)))).sum();
+            return sum == expSz;
+        };
+    }
 
-        cdcCfg.setConsumer(cnsmr);
-        cdcCfg.setKeepBinary(false);
+    /** */
+    protected void checkMetrics(CdcMain cdc, int expCnt) throws Exception {
+        if (metricExporters() != null) {
+            IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg");
+
+            DynamicMBean jmxCdcReg = 
metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+            Function<String, ?> jmxVal = m -> {
+                try {
+                    return jmxCdcReg.getAttribute(m);
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            };
+
+            checkMetrics(expCnt, (Function<String, Long>)jmxVal, 
(Function<String, String>)jmxVal);
+        }
+
+        MetricRegistry mreg = getFieldValue(cdc, "mreg");
+
+        assertNotNull(mreg);
+
+        checkMetrics(
+            expCnt,
+            m -> mreg.<LongMetric>findMetric(m).value(),
+            m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+        );
+    }
+
+    /** */
+    private void checkMetrics(long expCnt, Function<String, Long> longMetric, 
Function<String, String> strMetric) {
+        long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+        long curSegIdx = longMetric.apply(CUR_SEG_IDX);
 
-        return cdcCfg;
+        assertTrue(committedSegIdx <= curSegIdx);
+
+        assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+        assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+        assertTrue(longMetric.apply(LAST_EVT_TIME) > 0);
+
+        for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR, 
CDC_DIR})
+            assertTrue(new File(strMetric.apply(m)).exists());
+
+        assertEquals(expCnt, (long)longMetric.apply(EVTS_CNT));
+    }
+
+    /** */
+    protected boolean keepBinary() {
+        return false;
+    }
+
+    /** */
+    protected MetricExporterSpi[] metricExporters() {
+        return null;
     }
 
     /** */
@@ -130,7 +272,7 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
         private volatile boolean stopped;
 
         /** {@inheritDoc} */
-        @Override public void start() {
+        @Override public void start(MetricRegistry mreg) {
             stopped = false;
         }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
index 30fd1e9..050f4bc 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictR
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.plugin.AbstractCachePluginProvider;
 import org.apache.ignite.plugin.AbstractTestPluginProvider;
@@ -118,11 +119,9 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
             }
         };
 
-        CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
-
         IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(FOR_OTHER_CLUSTER_ID);
 
-        addAndWaitForConsumption(cnsmr, cdc, cache, null, 
this::addConflictData, 0, KEYS_CNT, getTestTimeout());
+        addAndWaitForConsumption(cnsmr, cfg, cache, null, 
this::addConflictData, 0, KEYS_CNT, true);
     }
 
     /** */
@@ -153,7 +152,7 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
                 return true;
             }
 
-            @Override public void start() {
+            @Override public void start(MetricRegistry mreg) {
                 // No-op.
             }
 
@@ -162,7 +161,7 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
             }
         };
 
-        CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+        CdcMain cdc = createCdc(cnsmr, cfg);
 
         IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
 
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index 9f1e55f..11ab8f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -17,16 +17,17 @@
 
 package org.apache.ignite.cdc;
 
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -38,11 +39,17 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
@@ -68,16 +75,23 @@ public class CdcSelfTest extends AbstractCdcTest {
     public WALMode walMode;
 
     /** */
-    @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1}")
+    @Parameterized.Parameter(2)
+    public Supplier<MetricExporterSpi> metricExporter;
+
+    /** */
+    @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1}, 
metricExporter={2}")
     public static Collection<?> parameters() {
-        return Arrays.asList(new Object[][] {
-            {true, WALMode.FSYNC},
-            {false, WALMode.FSYNC},
-            {true, WALMode.LOG_ONLY},
-            {false, WALMode.LOG_ONLY},
-            {true, WALMode.BACKGROUND},
-            {false, WALMode.BACKGROUND}
-        });
+        List<Object[]> params = new ArrayList<>();
+
+        for (WALMode mode : EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, 
WALMode.BACKGROUND))
+            for (boolean specificConsistentId : new boolean[] {false, true}) {
+                Supplier<MetricExporterSpi> jmx = JmxMetricExporterSpi::new;
+
+                params.add(new Object[] {specificConsistentId, mode, null});
+                params.add(new Object[] {specificConsistentId, mode, jmx});
+            }
+
+        return params;
     }
 
     /** Consistent id. */
@@ -107,7 +121,7 @@ public class CdcSelfTest extends AbstractCdcTest {
     @Test
     public void testReadAllKeys() throws Exception {
         // Read all records from iterator.
-        readAll(new UserCdcConsumer());
+        readAll(new UserCdcConsumer(), true);
 
         // Read one record per call.
         readAll(new UserCdcConsumer() {
@@ -116,7 +130,7 @@ public class CdcSelfTest extends AbstractCdcTest {
 
                 return false;
             }
-        });
+        }, false);
 
         // Read one record per call and commit.
         readAll(new UserCdcConsumer() {
@@ -125,38 +139,40 @@ public class CdcSelfTest extends AbstractCdcTest {
 
                 return true;
             }
-        });
+        }, true);
     }
 
     /** */
-    private void readAll(UserCdcConsumer cnsmr) throws Exception {
+    private void readAll(UserCdcConsumer cnsmr, boolean offsetCommit) throws 
Exception {
         IgniteConfiguration cfg = getConfiguration("ignite-0");
 
         Ignite ign = startGrid(cfg);
 
         ign.cluster().state(ACTIVE);
 
-        CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
-
         IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
         IgniteCache<Integer, User> txCache = 
ign.getOrCreateCache(TX_CACHE_NAME);
 
         addAndWaitForConsumption(
             cnsmr,
-            cdc,
+            cfg,
             cache,
             txCache,
             CdcSelfTest::addData,
             0,
             KEYS_CNT + 3,
-            getTestTimeout()
+            offsetCommit
         );
 
         removeData(cache, 0, KEYS_CNT);
 
-        IgniteInternalFuture<?> rmvFut = runAsync(cdc);
+        CdcMain cdcMain = createCdc(cnsmr, cfg);
+
+        IgniteInternalFuture<?> rmvFut = runAsync(cdcMain);
 
-        assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, 
getTestTimeout(), cnsmr));
+        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
+
+        checkMetrics(cdcMain, offsetCommit ? KEYS_CNT : ((KEYS_CNT + 3) * 2 + 
KEYS_CNT));
 
         rmvFut.cancel();
 
@@ -184,7 +200,7 @@ public class CdcSelfTest extends AbstractCdcTest {
                 cnsmrStarted.countDown();
 
                 try {
-                    startProcEvts.await(getTestTimeout(), 
TimeUnit.MILLISECONDS);
+                    startProcEvts.await(getTestTimeout(), MILLISECONDS);
                 }
                 catch (InterruptedException e) {
                     throw new RuntimeException(e);
@@ -194,7 +210,7 @@ public class CdcSelfTest extends AbstractCdcTest {
             }
         };
 
-        CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+        CdcMain cdc = createCdc(cnsmr, cfg);
 
         runAsync(cdc);
 
@@ -205,14 +221,15 @@ public class CdcSelfTest extends AbstractCdcTest {
         // Make sure all streamed data will become available for consumption.
         Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT);
 
-        cnsmrStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+        cnsmrStarted.await(getTestTimeout(), MILLISECONDS);
 
         // Initiate graceful shutdown.
         cdc.stop();
 
         startProcEvts.countDown();
 
-        assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, 
getTestTimeout(), cnsmr));
+        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
         assertTrue(waitForCondition(cnsmr::stopped, getTestTimeout()));
 
         List<Integer> keys = cnsmr.data(UPDATE, cacheId(DEFAULT_CACHE_NAME));
@@ -225,6 +242,7 @@ public class CdcSelfTest extends AbstractCdcTest {
 
     /** */
     @Test
+    @WithSystemProperty(key = IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID, 
value = "true")
     public void testMultiNodeConsumption() throws Exception {
         IgniteEx ign1 = startGrid(0);
 
@@ -237,6 +255,17 @@ public class CdcSelfTest extends AbstractCdcTest {
 
         IgniteCache<Integer, User> cache = 
ign1.getOrCreateCache(DEFAULT_CACHE_NAME);
 
+        // Calculate expected count of key for each node.
+        int[] keysCnt = new int[2];
+
+        for (int i = 0; i < KEYS_CNT * 2; i++) {
+            Ignite primary = primaryNode(i, DEFAULT_CACHE_NAME);
+
+            assertTrue(primary == ign1 || primary == ign2);
+
+            keysCnt[primary == ign1 ? 0 : 1]++;
+        }
+
         // Adds data concurrently with CDC start.
         IgniteInternalFuture<?> addDataFut = runAsync(() -> addData(cache, 0, 
KEYS_CNT));
 
@@ -246,19 +275,32 @@ public class CdcSelfTest extends AbstractCdcTest {
         IgniteConfiguration cfg1 = ign1.configuration();
         IgniteConfiguration cfg2 = ign2.configuration();
 
-        CdcMain cdc1 = new CdcMain(cfg1, null, cdcConfig(cnsmr1));
-        CdcMain cdc2 = new CdcMain(cfg2, null, cdcConfig(cnsmr2));
+        // Always run CDC with consistent id to ensure instance read data for 
specific node.
+        if (!specificConsistentId) {
+            
cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
+            
cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
+        }
+
+        CountDownLatch latch = new CountDownLatch(2);
+
+        GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0], 
DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
+        GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1], 
DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
+
+        CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
+        CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
 
         IgniteInternalFuture<?> fut1 = runAsync(cdc1);
         IgniteInternalFuture<?> fut2 = runAsync(cdc2);
 
         addDataFut.get(getTestTimeout());
 
-        addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2));
+        runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 
2)).get(getTestTimeout());
 
-        addDataFut.get(getTestTimeout());
+        // Wait while predicate will become true and state saved on the disk 
for both cdc.
+        assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
 
-        assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, 
getTestTimeout(), cnsmr1, cnsmr2));
+        checkMetrics(cdc1, keysCnt[0]);
+        checkMetrics(cdc2, keysCnt[1]);
 
         assertFalse(cnsmr1.stopped());
         assertFalse(cnsmr2.stopped());
@@ -271,10 +313,16 @@ public class CdcSelfTest extends AbstractCdcTest {
 
         removeData(cache, 0, KEYS_CNT * 2);
 
+        cdc1 = createCdc(cnsmr1, cfg1);
+        cdc2 = createCdc(cnsmr2, cfg2);
+
         IgniteInternalFuture<?> rmvFut1 = runAsync(cdc1);
         IgniteInternalFuture<?> rmvFut2 = runAsync(cdc2);
 
-        assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, DELETE, 
getTestTimeout(), cnsmr1, cnsmr2));
+        waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, DELETE, cnsmr1, cnsmr2);
+
+        checkMetrics(cdc1, keysCnt[0]);
+        checkMetrics(cdc2, keysCnt[1]);
 
         rmvFut1.cancel();
         rmvFut2.cancel();
@@ -291,8 +339,8 @@ public class CdcSelfTest extends AbstractCdcTest {
         UserCdcConsumer cnsmr1 = new UserCdcConsumer();
         UserCdcConsumer cnsmr2 = new UserCdcConsumer();
 
-        IgniteInternalFuture<?> fut1 = runAsync(new 
CdcMain(ign.configuration(), null, cdcConfig(cnsmr1)));
-        IgniteInternalFuture<?> fut2 = runAsync(new 
CdcMain(ign.configuration(), null, cdcConfig(cnsmr2)));
+        IgniteInternalFuture<?> fut1 = runAsync(createCdc(cnsmr1, 
ign.configuration()));
+        IgniteInternalFuture<?> fut2 = runAsync(createCdc(cnsmr2, 
ign.configuration()));
 
         assertTrue(waitForCondition(() -> fut1.isDone() || fut2.isDone(), 
getTestTimeout()));
 
@@ -330,11 +378,13 @@ public class CdcSelfTest extends AbstractCdcTest {
                 }
             };
 
-            CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+            CdcMain cdc = createCdc(cnsmr, cfg);
 
             IgniteInternalFuture<?> fut = runAsync(cdc);
 
-            assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, 
getTestTimeout(), cnsmr));
+            waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+            checkMetrics(cdc, KEYS_CNT);
 
             fut.cancel();
 
@@ -374,11 +424,13 @@ public class CdcSelfTest extends AbstractCdcTest {
             }
         };
 
-        CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+        CdcMain cdc = createCdc(cnsmr, cfg);
 
         IgniteInternalFuture<?> fut = runAsync(cdc);
 
-        waitForSize(half, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr);
+        waitForSize(half, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+        checkMetrics(cdc, half);
 
         waitForCondition(halfCommitted::get, getTestTimeout());
 
@@ -390,16 +442,28 @@ public class CdcSelfTest extends AbstractCdcTest {
 
         consumeHalf.set(false);
 
+        cdc = createCdc(cnsmr, cfg);
+
         fut = runAsync(cdc);
 
-        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), 
cnsmr);
-        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, getTestTimeout(), 
cnsmr);
+        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
+
+        checkMetrics(cdc, KEYS_CNT * 2 - half);
 
         fut.cancel();
 
         assertTrue(cnsmr.stopped());
     }
 
+    /** {@inheritDoc} */
+    @Override public MetricExporterSpi[] metricExporters() {
+        if (metricExporter == null)
+            return null;
+
+        return new MetricExporterSpi[] {metricExporter.get()};
+    }
+
     /** */
     public static void addData(IgniteCache<Integer, User> cache, int from, int 
to) {
         for (int i = from; i < to; i++)
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java 
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
index 1e24d3b..a6ce2a0 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
@@ -18,18 +18,20 @@
 package org.apache.ignite.internal.cdc;
 
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cdc.AbstractCdcTest;
-import org.apache.ignite.cdc.CdcConfiguration;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.junit.Test;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
@@ -76,11 +78,12 @@ public class SqlCdcTest extends AbstractCdcTest {
 
         BinaryCdcConsumer cnsmr = new BinaryCdcConsumer();
 
-        CdcConfiguration cdcCfg = new CdcConfiguration();
+        CountDownLatch latch = new CountDownLatch(1);
 
-        cdcCfg.setConsumer(cnsmr);
+        GridAbsPredicate userPredicate = sizePredicate(KEYS_CNT, USER, UPDATE, 
cnsmr);
+        GridAbsPredicate cityPredicate = sizePredicate(KEYS_CNT, CITY, UPDATE, 
cnsmr);
 
-        CdcMain cdc = new CdcMain(cfg, null, cdcCfg);
+        CdcMain cdc = createCdc(cnsmr, cfg, latch, userPredicate, 
cityPredicate);
 
         IgniteInternalFuture<?> fut = runAsync(cdc);
 
@@ -110,8 +113,10 @@ public class SqlCdcTest extends AbstractCdcTest {
                 Integer.toString(127000 + i));
         }
 
-        assertTrue(waitForSize(KEYS_CNT, USER, UPDATE, getTestTimeout(), 
cnsmr));
-        assertTrue(waitForSize(KEYS_CNT, CITY, UPDATE, getTestTimeout(), 
cnsmr));
+        // Wait while both predicte will become true and state saved on the 
disk.
+        assertTrue(latch.await(getTestTimeout(), MILLISECONDS));;
+
+        checkMetrics(cdc, KEYS_CNT * 2);
 
         fut.cancel();
 
@@ -123,9 +128,13 @@ public class SqlCdcTest extends AbstractCdcTest {
         for (int i = 0; i < KEYS_CNT; i++)
             executeSql(ign, "DELETE FROM USER WHERE id = ?", i);
 
+        cdc = createCdc(cnsmr, cfg);
+
         IgniteInternalFuture<?> rmvFut = runAsync(cdc);
 
-        assertTrue(waitForSize(KEYS_CNT, USER, DELETE, getTestTimeout(), 
cnsmr));
+        waitForSize(KEYS_CNT, USER, DELETE, cnsmr);
+
+        checkMetrics(cdc, KEYS_CNT);
 
         rmvFut.cancel();
 
@@ -176,4 +185,9 @@ public class SqlCdcTest extends AbstractCdcTest {
     private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... 
args) {
         return node.context().query().querySqlFields(new 
SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
     }
+
+    /** {@inheritDoc} */
+    @Override protected boolean keepBinary() {
+        return true;
+    }
 }
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java 
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
index 1ed6244..53e10d9 100644
--- 
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
+++ 
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.cdc.WalRecordsConsumer;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.resources.SpringApplicationContextResource;
 import org.apache.ignite.resources.SpringResource;
@@ -121,7 +122,7 @@ public class CdcConfigurationTest extends 
GridCommonAbstractTest {
         public CountDownLatch startLatch = new CountDownLatch(1);
 
         /** {@inheritDoc} */
-        @Override public void start() {
+        @Override public void start(MetricRegistry mreg) {
             springString2 = ctx.getBean("springString2", String.class);
 
             startLatch.countDown();

Reply via email to