This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2f39591 IGNITE-14355 CDC Metrics (#9398)
2f39591 is described below
commit 2f395910925715e0a84e919591f1be6c49e8f631
Author: Nikolay <[email protected]>
AuthorDate: Tue Sep 28 09:47:27 2021 +0300
IGNITE-14355 CDC Metrics (#9398)
---
.../org/apache/ignite/cdc/CdcConfiguration.java | 23 ++
.../java/org/apache/ignite/cdc/CdcConsumer.java | 8 +-
.../org/apache/ignite/internal/IgnitionEx.java | 9 +-
.../org/apache/ignite/internal/cdc/CdcMain.java | 243 ++++++++++++++-------
.../ignite/internal/cdc/WalRecordsConsumer.java | 29 ++-
.../wal/reader/StandaloneGridKernalContext.java | 21 +-
.../processors/metric/GridMetricManager.java | 4 +
.../org/apache/ignite/cdc/AbstractCdcTest.java | 186 ++++++++++++++--
.../org/apache/ignite/cdc/CdcCacheVersionTest.java | 9 +-
.../java/org/apache/ignite/cdc/CdcSelfTest.java | 142 ++++++++----
.../org/apache/ignite/internal/cdc/SqlCdcTest.java | 28 ++-
.../apache/ignite/cdc/CdcConfigurationTest.java | 3 +-
12 files changed, 542 insertions(+), 163 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
index ca2d89c..f2f1873 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
/**
* This class defines {@link CdcMain} runtime configuration.
@@ -39,6 +40,9 @@ public class CdcConfiguration {
/** Change Data Capture consumer. */
private CdcConsumer consumer;
+ /** Metric exporter SPI. */
+ private MetricExporterSpi[] metricExporterSpi;
+
/** Keep binary flag.<br>Default value {@code true}. */
private boolean keepBinary = DFLT_KEEP_BINARY;
@@ -66,6 +70,25 @@ public class CdcConfiguration {
this.consumer = consumer;
}
+ /**
+ * Sets fully configured instances of {@link MetricExporterSpi}.
+ *
+ * @param metricExporterSpi Fully configured instances of {@link
MetricExporterSpi}.
+ * @see CdcConfiguration#getMetricExporterSpi()
+ */
+ public void setMetricExporterSpi(MetricExporterSpi... metricExporterSpi) {
+ this.metricExporterSpi = metricExporterSpi;
+ }
+
+ /**
+ * Gets fully configured metric SPI implementations.
+ *
+ * @return Metric exporter SPI implementations.
+ */
+ public MetricExporterSpi[] getMetricExporterSpi() {
+ return metricExporterSpi;
+ }
+
/** @return keep binary value. */
public boolean isKeepBinary() {
return keepBinary;
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
index af119e9..b0bf648 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;
@@ -29,7 +30,7 @@ import org.apache.ignite.resources.LoggerResource;
* This consumer will receive data change events during {@link CdcMain}
application invocation.
* The lifecycle of the consumer is the following:
* <ul>
- * <li>Start of the consumer {@link #start()}.</li>
+ * <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
* <li>Notification of the consumer by the {@link #onEvents(Iterator)}
call.</li>
* <li>Stop of the consumer {@link #stop()}.</li>
* </ul>
@@ -55,8 +56,9 @@ import org.apache.ignite.resources.LoggerResource;
public interface CdcConsumer {
/**
* Starts the consumer.
+ * @param mreg Metric registry for consumer specific metrics.
*/
- public void start();
+ public void start(MetricRegistry mreg);
/**
* Handles entry changes events.
@@ -71,7 +73,7 @@ public interface CdcConsumer {
/**
* Stops the consumer.
- * This methods can be invoked only after {@link #start()}.
+ * This methods can be invoked only after {@link #start(MetricRegistry)}.
*/
public void stop();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index e11cf31..b6d4fe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1991,8 +1991,7 @@ public class IgnitionEx {
if (myCfg.getUserAttributes() == null)
myCfg.setUserAttributes(Collections.<String,
Object>emptyMap());
- if (myCfg.getMBeanServer() == null && !U.IGNITE_MBEANS_DISABLED)
-
myCfg.setMBeanServer(ManagementFactory.getPlatformMBeanServer());
+ initializeDefaultMBeanServer(myCfg);
Marshaller marsh = myCfg.getMarshaller();
@@ -2674,6 +2673,12 @@ public class IgnitionEx {
}
}
+ /** Initialize default mbean server. */
+ public static void initializeDefaultMBeanServer(IgniteConfiguration myCfg)
{
+ if (myCfg.getMBeanServer() == null && !U.IGNITE_MBEANS_DISABLED)
+ myCfg.setMBeanServer(ManagementFactory.getPlatformMBeanServer());
+ }
+
/**
* @param cfg Ignite Configuration with legacy data storage configuration.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index a2bb534..73bd629 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
@@ -47,23 +48,23 @@ import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde
import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
-import org.apache.ignite.internal.processors.resource.GridResourceIoc;
-import
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.resources.SpringApplicationContextResource;
-import org.apache.ignite.resources.SpringResource;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
import static org.apache.ignite.internal.IgniteKernal.NL;
import static org.apache.ignite.internal.IgniteKernal.SITE;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static
org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
/**
* Change Data Capture (CDC) application.
@@ -114,14 +115,50 @@ public class CdcMain implements Runnable {
/** State dir. */
public static final String STATE_DIR = "state";
+ /** Current segment index metric name. */
+ public static final String CUR_SEG_IDX = "CurrentSegmentIndex";
+
+ /** Committed segment index metric name. */
+ public static final String COMMITTED_SEG_IDX = "CommittedSegmentIndex";
+
+ /** Committed segment offset metric name. */
+ public static final String COMMITTED_SEG_OFFSET = "CommittedSegmentOffset";
+
+ /** Last segment consumption time. */
+ public static final String LAST_SEG_CONSUMPTION_TIME =
"LastSegmentConsumptionTime";
+
+ /** Binary metadata metric name. */
+ public static final String BINARY_META_DIR = "BinaryMetaDir";
+
+ /** Marshaller metric name. */
+ public static final String MARSHALLER_DIR = "MarshallerDir";
+
+ /** Cdc directory metric name. */
+ public static final String CDC_DIR = "CdcDir";
+
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
/** Spring resource context. */
private final GridSpringResourceContext ctx;
+ /** CDC metrics registry. */
+ private MetricRegistry mreg;
+
+ /** Current segment index metric. */
+ private AtomicLongMetric curSegmentIdx;
+
+ /** Committed state segment index metric. */
+ private AtomicLongMetric committedSegmentIdx;
+
+ /** Committed state segment offset metric. */
+ private AtomicLongMetric committedSegmentOffset;
+
+ /** Time of last segment consumption. */
+ private AtomicLongMetric lastSegmentConsumptionTs;
+
/** Change Data Capture configuration. */
- private final CdcConfiguration cdcCfg;
+ protected final CdcConfiguration cdcCfg;
/** WAL iterator factory. */
private final IgniteWalIteratorFactory factory;
@@ -161,7 +198,8 @@ public class CdcMain implements Runnable {
public CdcMain(
IgniteConfiguration cfg,
GridSpringResourceContext ctx,
- CdcConfiguration cdcCfg) {
+ CdcConfiguration cdcCfg
+ ) {
igniteCfg = new IgniteConfiguration(cfg);
this.ctx = ctx;
this.cdcCfg = cdcCfg;
@@ -207,29 +245,7 @@ public class CdcMain implements Runnable {
throw new IllegalArgumentException(ERR_MSG);
}
- PdsFolderSettings<CdcFileLockHolder> settings =
- new PdsFolderResolver<>(igniteCfg, log, null,
this::tryLock).resolve();
-
- if (settings == null) {
- throw new IgniteException("Can't find folder to read WAL segments
from based on provided configuration! " +
- "[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId="
+ igniteCfg.getConsistentId() + ']');
- }
-
- CdcFileLockHolder lock = settings.getLockedFileLockHolder();
-
- if (lock == null) {
- File consIdDir = settings.persistentStoreNodePath();
-
- lock = tryLock(consIdDir);
-
- if (lock == null) {
- throw new IgniteException(
- "Can't acquire lock for Change Data Capture folder [dir="
+ consIdDir.getAbsolutePath() + ']'
- );
- }
- }
-
- try {
+ try (CdcFileLockHolder lock = lockPds()) {
String consIdDir = cdcDir.getName(cdcDir.getNameCount() -
1).toString();
Files.createDirectories(cdcDir.resolve(STATE_DIR));
@@ -244,30 +260,120 @@ public class CdcMain implements Runnable {
log.info("Ignite node Marshaller [dir=" + marshaller + ']');
}
- injectResources(consumer.consumer());
+ StandaloneGridKernalContext kctx = startStandaloneKernal();
- state = new CdcConsumerState(cdcDir.resolve(STATE_DIR));
+ initMetrics();
- initState = state.load();
+ try {
+ kctx.resource().injectGeneric(consumer.consumer());
- if (initState != null && log.isInfoEnabled())
- log.info("Initial state loaded [state=" + initState + ']');
+ state = createState(cdcDir.resolve(STATE_DIR));
- consumer.start();
+ initState = state.load();
- try {
- consumeWalSegmentsUntilStopped();
+ if (initState != null) {
+ committedSegmentIdx.value(initState.index());
+ committedSegmentOffset.value(initState.fileOffset());
+
+ if (log.isInfoEnabled())
+ log.info("Initial state loaded [state=" + initState +
']');
+ }
+
+ consumer.start(mreg, kctx.metric().registry(metricName("cdc",
"consumer")));
+
+ try {
+ consumeWalSegmentsUntilStopped();
+ }
+ finally {
+ consumer.stop();
+
+ if (log.isInfoEnabled())
+ log.info("Ignite Change Data Capture Application
stopped.");
+ }
}
finally {
- consumer.stop();
+ for (GridComponent comp : kctx)
+ comp.stop(false);
+ }
+ }
+ }
+
+ /** Creates consumer state. */
+ protected CdcConsumerState createState(Path stateDir) {
+ return new CdcConsumerState(stateDir);
+ }
- if (log.isInfoEnabled())
- log.info("Ignite Change Data Capture Application
stopped.");
+ /**
+ * @return Kernal instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ private StandaloneGridKernalContext startStandaloneKernal() throws
IgniteCheckedException {
+ StandaloneGridKernalContext kctx = new
StandaloneGridKernalContext(log, binaryMeta, marshaller) {
+ @Override protected IgniteConfiguration
prepareIgniteConfiguration() {
+ IgniteConfiguration cfg = super.prepareIgniteConfiguration();
+
+
cfg.setIgniteInstanceName(cdcInstanceName(igniteCfg.getIgniteInstanceName()));
+
+ if (!F.isEmpty(cdcCfg.getMetricExporterSpi()))
+ cfg.setMetricExporterSpi(cdcCfg.getMetricExporterSpi());
+
+ initializeDefaultMBeanServer(cfg);
+
+ return cfg;
}
+ };
+
+ kctx.resource().setSpringContext(ctx);
+
+ for (GridComponent comp : kctx)
+ comp.start();
+
+ mreg = kctx.metric().registry("cdc");
+
+ return kctx;
+ }
+
+ /** Initialize metrics. */
+ private void initMetrics() {
+ mreg.objectMetric(BINARY_META_DIR, String.class, "Binary meta
directory").value(binaryMeta.getAbsolutePath());
+ mreg.objectMetric(MARSHALLER_DIR, String.class, "Marshaller
directory").value(marshaller.getAbsolutePath());
+ mreg.objectMetric(CDC_DIR, String.class, "CDC
directory").value(cdcDir.toFile().getAbsolutePath());
+
+ curSegmentIdx = mreg.longMetric(CUR_SEG_IDX, "Current segment index");
+ committedSegmentIdx = mreg.longMetric(COMMITTED_SEG_IDX, "Committed
segment index");
+ committedSegmentOffset = mreg.longMetric(COMMITTED_SEG_OFFSET,
"Committed segment offset");
+ lastSegmentConsumptionTs =
+ mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of
consumption of WAL segment");
+ }
+
+ /**
+ * @return CDC lock holder for specifi folder.
+ * @throws IgniteCheckedException If failed.
+ */
+ private CdcFileLockHolder lockPds() throws IgniteCheckedException {
+ PdsFolderSettings<CdcFileLockHolder> settings =
+ new PdsFolderResolver<>(igniteCfg, log,
igniteCfg.getConsistentId(), this::tryLock).resolve();
+
+ if (settings == null) {
+ throw new IgniteException("Can't find folder to read WAL segments
from based on provided configuration! " +
+ "[workDir=" + igniteCfg.getWorkDirectory() + ", consistentId="
+ igniteCfg.getConsistentId() + ']');
}
- finally {
- U.closeQuiet(lock);
+
+ CdcFileLockHolder lock = settings.getLockedFileLockHolder();
+
+ if (lock == null) {
+ File consIdDir = settings.persistentStoreNodePath();
+
+ lock = tryLock(consIdDir);
+
+ if (lock == null) {
+ throw new IgniteException(
+ "Can't acquire lock for Change Data Capture folder [dir="
+ consIdDir.getAbsolutePath() + ']'
+ );
+ }
}
+
+ return lock;
}
/** Waits and consumes new WAL segments until stopped. */
@@ -313,6 +419,8 @@ public class CdcMain implements Runnable {
if (log.isInfoEnabled())
log.info("Processing WAL segment [segment=" + segment + ']');
+ lastSegmentConsumptionTs.value(System.currentTimeMillis());
+
IgniteWalIteratorFactory.IteratorParametersBuilder builder =
new IgniteWalIteratorFactory.IteratorParametersBuilder()
.log(log)
@@ -322,9 +430,11 @@ public class CdcMain implements Runnable {
.filesOrDirs(segment.toFile())
.addFilter((type, ptr) -> type == DATA_RECORD_V2);
- if (initState != null) {
- long segmentIdx = segmentIndex(segment);
+ long segmentIdx = segmentIndex(segment);
+
+ curSegmentIdx.value(segmentIdx);
+ if (initState != null) {
if (segmentIdx > initState.index()) {
throw new IgniteException("Found segment greater then saved
state. Some events are missed. Exiting! " +
"[state=" + initState + ", segment=" + segmentIdx + ']');
@@ -364,7 +474,12 @@ public class CdcMain implements Runnable {
if (commit) {
assert it.lastRead().isPresent();
- state.save(it.lastRead().get());
+ WALPointer ptr = it.lastRead().get();
+
+ state.save(ptr);
+
+ committedSegmentIdx.value(ptr.index());
+ committedSegmentOffset.value(ptr.fileOffset());
// Can delete after new file state save.
if (!processedSegments.isEmpty()) {
@@ -474,37 +589,6 @@ public class CdcMain implements Runnable {
}
/** */
- private void injectResources(CdcConsumer dataConsumer) throws
IgniteCheckedException {
- GridResourceIoc ioc = new GridResourceIoc();
-
- ioc.inject(
- dataConsumer,
- LoggerResource.class,
- new GridResourceLoggerInjector(log),
- null,
- null
- );
-
- if (ctx != null) {
- ioc.inject(
- dataConsumer,
- SpringResource.class,
- ctx.springBeanInjector(),
- null,
- null
- );
-
- ioc.inject(
- dataConsumer,
- SpringApplicationContextResource.class,
- ctx.springContextInjector(),
- null,
- null
- );
- }
- }
-
- /** */
private void ackAsciiLogo() {
String ver = "ver. " + ACK_VER_STR;
@@ -553,4 +637,9 @@ public class CdcMain implements Runnable {
"");
}
}
+
+ /** */
+ public static String cdcInstanceName(String igniteInstanceName) {
+ return "cdc-" + igniteInstanceName;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 38c2938..66853a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -28,6 +28,8 @@ import
org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgnitePredicate;
@@ -44,12 +46,24 @@ import static
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD
* @see CdcConsumer
*/
public class WalRecordsConsumer<K, V> {
+ /** Events count metric name. */
+ public static final String EVTS_CNT = "EventsCount";
+
+ /** Last event time metric name. */
+ public static final String LAST_EVT_TIME = "LastEventTime";
+
/** Ignite logger. */
private final IgniteLogger log;
/** Data change events consumer. */
private final CdcConsumer consumer;
+ /** Event count metric */
+ private AtomicLongMetric evtsCnt;
+
+ /** Timestamp of last event process. */
+ private AtomicLongMetric lastEvtTs;
+
/** Operations types we interested in. */
private static final EnumSet<GridCacheOperation> OPERATIONS_TYPES =
EnumSet.of(CREATE, UPDATE, DELETE, TRANSFORM);
@@ -99,6 +113,10 @@ public class WalRecordsConsumer<K, V> {
if (!hasCurrent())
throw new NoSuchElementException();
+ evtsCnt.increment();
+
+ lastEvtTs.value(System.currentTimeMillis());
+
return entries.next();
}
@@ -142,10 +160,15 @@ public class WalRecordsConsumer<K, V> {
/**
* Starts the consumer.
*
+ * @param cdcReg CDC metric registry.
+ * @param cdcConsumerReg CDC consumer metric registry.
* @throws IgniteCheckedException If failed.
*/
- public void start() throws IgniteCheckedException {
- consumer.start();
+ public void start(MetricRegistry cdcReg, MetricRegistry cdcConsumerReg)
throws IgniteCheckedException {
+ consumer.start(cdcConsumerReg);
+
+ evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by
the consumer");
+ lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event
process");
if (log.isDebugEnabled())
log.debug("WalRecordsConsumer started [consumer=" +
consumer.getClass() + ']');
@@ -153,7 +176,7 @@ public class WalRecordsConsumer<K, V> {
/**
* Stops the consumer.
- * This methods can be invoked only after {@link #start()}.
+ * This methods can be invoked only after {@link #start(MetricRegistry,
MetricRegistry)}.
*/
public void stop() {
consumer.stop();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e22d2bf..19a7d41 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -126,6 +126,9 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
/** Empty plugin processor. */
private IgnitePluginProcessor pluginProc;
+ /** */
+ private GridResourceProcessor rsrcProc;
+
/** Metrics manager. */
private final GridMetricManager metricMgr;
@@ -166,6 +169,7 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
this.marshallerCtx = new MarshallerContextImpl(null, null);
this.cfg = prepareIgniteConfiguration();
+ this.rsrcProc = new GridResourceProcessor(this);
this.metricMgr = new GridMetricManager(this);
this.sysViewMgr = new GridSystemViewManager(this);
@@ -175,7 +179,9 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
this.cacheObjProcessor = binaryProcessor(this,
binaryMetadataFileStoreDir);
+ comps.add(rsrcProc);
comps.add(cacheObjProcessor);
+ comps.add(metricMgr);
if (marshallerMappingFileStoreDir != null) {
marshallerCtx.setMarshallerMappingFileStoreDir(marshallerMappingFileStoreDir);
@@ -226,6 +232,7 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
cfg.setMetricExporterSpi(new NoopMetricExporterSpi());
cfg.setSystemViewExporterSpi(new JmxSystemViewExporterSpi());
+ cfg.setGridLogger(log);
return cfg;
}
@@ -269,9 +276,8 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
@Override public IgniteEx grid() {
final IgniteEx kernal = new IgniteKernal();
try {
- Field fieldCfg = kernal.getClass().getDeclaredField("cfg");
- fieldCfg.setAccessible(true);
- fieldCfg.set(kernal, cfg);
+ setField(kernal, "cfg", cfg);
+ setField(kernal, "igniteInstanceName",
cfg.getIgniteInstanceName());
}
catch (NoSuchFieldException | IllegalAccessException e) {
log.error("", e);
@@ -279,6 +285,13 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
return kernal;
}
+ /** */
+ private void setField(IgniteEx kernal, String name, Object val) throws
NoSuchFieldException, IllegalAccessException {
+ Field field = kernal.getClass().getDeclaredField(name);
+ field.setAccessible(true);
+ field.set(kernal, val);
+ }
+
/** {@inheritDoc} */
@Override public IgniteConfiguration config() {
return cfg;
@@ -306,7 +319,7 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
/** {@inheritDoc} */
@Override public GridResourceProcessor resource() {
- return null;
+ return rsrcProc;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index 20aea69..c6c4708 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -259,6 +259,10 @@ public class GridMetricManager extends
GridManagerAdapter<MetricExporterSpi> imp
startSpi();
+ // In case standalone kernal start.
+ if (ctx.internalSubscriptionProcessor() == null)
+ return;
+
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(
new DistributedMetastorageLifecycleListener() {
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index e250f68..ec43d9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -17,26 +17,52 @@
package org.apache.ignite.cdc;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cdc.CdcConsumerState;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
+import org.apache.ignite.spi.metric.ObjectMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
+import static org.apache.ignite.internal.cdc.CdcMain.BINARY_META_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.CDC_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET;
+import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME;
+import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -61,16 +87,71 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
}
/** */
- public void addAndWaitForConsumption(
+ protected CdcMain createCdc(CdcConsumer cnsmr, IgniteConfiguration cfg) {
+ return createCdc(cnsmr, cfg, null);
+ }
+
+ /** */
+ protected CdcMain createCdc(
+ CdcConsumer cnsmr,
+ IgniteConfiguration cfg,
+ CountDownLatch latch,
+ GridAbsPredicate... conditions
+ ) {
+ CdcConfiguration cdcCfg = new CdcConfiguration();
+
+ cdcCfg.setConsumer(cnsmr);
+ cdcCfg.setKeepBinary(keepBinary());
+ cdcCfg.setMetricExporterSpi(metricExporters());
+
+ return new CdcMain(cfg, null, cdcCfg) {
+ @Override protected CdcConsumerState createState(Path stateDir) {
+ return new CdcConsumerState(stateDir) {
+ @Override public void save(WALPointer ptr) throws
IOException {
+ super.save(ptr);
+
+ if (!F.isEmpty(conditions)) {
+ for (GridAbsPredicate p : conditions) {
+ if (!p.apply())
+ return;
+ }
+
+ latch.countDown();
+ }
+ }
+ };
+ }
+ };
+ }
+
+ /** */
+ protected void addAndWaitForConsumption(
UserCdcConsumer cnsmr,
- CdcMain cdc,
+ IgniteConfiguration cfg,
IgniteCache<Integer, CdcSelfTest.User> cache,
IgniteCache<Integer, CdcSelfTest.User> txCache,
CI3<IgniteCache<Integer, CdcSelfTest.User>, Integer, Integer> addData,
int from,
int to,
- long timeout
- ) throws IgniteCheckedException {
+ boolean waitForCommit
+ ) throws Exception {
+ GridAbsPredicate cachePredicate = sizePredicate(to - from,
cache.getName(), UPDATE, cnsmr);
+ GridAbsPredicate txPredicate = txCache == null
+ ? null
+ : sizePredicate(to - from, txCache.getName(), UPDATE, cnsmr);
+
+ CdcMain cdc;
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ if (waitForCommit) {
+ cdc = txCache == null
+ ? createCdc(cnsmr, cfg, latch, cachePredicate)
+ : createCdc(cnsmr, cfg, latch, cachePredicate, txPredicate);
+ }
+ else
+ cdc = createCdc(cnsmr, cfg);
+
IgniteInternalFuture<?> fut = runAsync(cdc);
addData.apply(cache, from, to);
@@ -78,10 +159,16 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
if (txCache != null)
addData.apply(txCache, from, to);
- assertTrue(waitForSize(to - from, cache.getName(), UPDATE, timeout,
cnsmr));
+ if (waitForCommit)
+ latch.await(getTestTimeout(), MILLISECONDS);
+ else {
+ assertTrue(waitForCondition(cachePredicate, getTestTimeout()));
- if (txCache != null)
- assertTrue(waitForSize(to - from, txCache.getName(), UPDATE,
timeout, cnsmr));
+ if (txCache != null)
+ assertTrue(waitForCondition(txPredicate, getTestTimeout()));
+ }
+
+ checkMetrics(cdc, txCache == null ? to : to * 2);
fut.cancel();
@@ -96,29 +183,84 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
}
/** */
- public boolean waitForSize(
+ public void waitForSize(
int expSz,
String cacheName,
CdcSelfTest.ChangeEventType evtType,
- long timeout,
TestCdcConsumer<?>... cnsmrs
) throws IgniteInterruptedCheckedException {
- return waitForCondition(
- () -> {
- int sum = Arrays.stream(cnsmrs).mapToInt(c ->
F.size(c.data(evtType, cacheId(cacheName)))).sum();
- return sum == expSz;
- },
- timeout);
+ assertTrue(waitForCondition(sizePredicate(expSz, cacheName, evtType,
cnsmrs), getTestTimeout()));
}
/** */
- public CdcConfiguration cdcConfig(CdcConsumer cnsmr) {
- CdcConfiguration cdcCfg = new CdcConfiguration();
+ protected GridAbsPredicate sizePredicate(
+ int expSz,
+ String cacheName,
+ ChangeEventType evtType,
+ TestCdcConsumer<?>... cnsmrs
+ ) {
+ return () -> {
+ int sum = Arrays.stream(cnsmrs).mapToInt(c ->
F.size(c.data(evtType, cacheId(cacheName)))).sum();
+ return sum == expSz;
+ };
+ }
- cdcCfg.setConsumer(cnsmr);
- cdcCfg.setKeepBinary(false);
+ /** */
+ protected void checkMetrics(CdcMain cdc, int expCnt) throws Exception {
+ if (metricExporters() != null) {
+ IgniteConfiguration cfg = getFieldValue(cdc, "igniteCfg");
+
+ DynamicMBean jmxCdcReg =
metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+ Function<String, ?> jmxVal = m -> {
+ try {
+ return jmxCdcReg.getAttribute(m);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ };
+
+ checkMetrics(expCnt, (Function<String, Long>)jmxVal,
(Function<String, String>)jmxVal);
+ }
+
+ MetricRegistry mreg = getFieldValue(cdc, "mreg");
+
+ assertNotNull(mreg);
+
+ checkMetrics(
+ expCnt,
+ m -> mreg.<LongMetric>findMetric(m).value(),
+ m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+ );
+ }
+
+ /** */
+ private void checkMetrics(long expCnt, Function<String, Long> longMetric,
Function<String, String> strMetric) {
+ long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+ long curSegIdx = longMetric.apply(CUR_SEG_IDX);
- return cdcCfg;
+ assertTrue(committedSegIdx <= curSegIdx);
+
+ assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+ assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+ assertTrue(longMetric.apply(LAST_EVT_TIME) > 0);
+
+ for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR,
CDC_DIR})
+ assertTrue(new File(strMetric.apply(m)).exists());
+
+ assertEquals(expCnt, (long)longMetric.apply(EVTS_CNT));
+ }
+
+ /** */
+ protected boolean keepBinary() {
+ return false;
+ }
+
+ /** */
+ protected MetricExporterSpi[] metricExporters() {
+ return null;
}
/** */
@@ -130,7 +272,7 @@ public abstract class AbstractCdcTest extends
GridCommonAbstractTest {
private volatile boolean stopped;
/** {@inheritDoc} */
- @Override public void start() {
+ @Override public void start(MetricRegistry mreg) {
stopped = false;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
index 30fd1e9..050f4bc 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictR
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.plugin.AbstractCachePluginProvider;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
@@ -118,11 +119,9 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
}
};
- CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
-
IgniteCache<Integer, User> cache =
ign.getOrCreateCache(FOR_OTHER_CLUSTER_ID);
- addAndWaitForConsumption(cnsmr, cdc, cache, null,
this::addConflictData, 0, KEYS_CNT, getTestTimeout());
+ addAndWaitForConsumption(cnsmr, cfg, cache, null,
this::addConflictData, 0, KEYS_CNT, true);
}
/** */
@@ -153,7 +152,7 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
return true;
}
- @Override public void start() {
+ @Override public void start(MetricRegistry mreg) {
// No-op.
}
@@ -162,7 +161,7 @@ public class CdcCacheVersionTest extends AbstractCdcTest {
}
};
- CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+ CdcMain cdc = createCdc(cnsmr, cfg);
IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index 9f1e55f..11ab8f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -17,16 +17,17 @@
package org.apache.ignite.cdc;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -38,11 +39,17 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
@@ -68,16 +75,23 @@ public class CdcSelfTest extends AbstractCdcTest {
public WALMode walMode;
/** */
- @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1}")
+ @Parameterized.Parameter(2)
+ public Supplier<MetricExporterSpi> metricExporter;
+
+ /** */
+ @Parameterized.Parameters(name = "specificConsistentId={0}, walMode={1},
metricExporter={2}")
public static Collection<?> parameters() {
- return Arrays.asList(new Object[][] {
- {true, WALMode.FSYNC},
- {false, WALMode.FSYNC},
- {true, WALMode.LOG_ONLY},
- {false, WALMode.LOG_ONLY},
- {true, WALMode.BACKGROUND},
- {false, WALMode.BACKGROUND}
- });
+ List<Object[]> params = new ArrayList<>();
+
+ for (WALMode mode : EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY,
WALMode.BACKGROUND))
+ for (boolean specificConsistentId : new boolean[] {false, true}) {
+ Supplier<MetricExporterSpi> jmx = JmxMetricExporterSpi::new;
+
+ params.add(new Object[] {specificConsistentId, mode, null});
+ params.add(new Object[] {specificConsistentId, mode, jmx});
+ }
+
+ return params;
}
/** Consistent id. */
@@ -107,7 +121,7 @@ public class CdcSelfTest extends AbstractCdcTest {
@Test
public void testReadAllKeys() throws Exception {
// Read all records from iterator.
- readAll(new UserCdcConsumer());
+ readAll(new UserCdcConsumer(), true);
// Read one record per call.
readAll(new UserCdcConsumer() {
@@ -116,7 +130,7 @@ public class CdcSelfTest extends AbstractCdcTest {
return false;
}
- });
+ }, false);
// Read one record per call and commit.
readAll(new UserCdcConsumer() {
@@ -125,38 +139,40 @@ public class CdcSelfTest extends AbstractCdcTest {
return true;
}
- });
+ }, true);
}
/** */
- private void readAll(UserCdcConsumer cnsmr) throws Exception {
+ private void readAll(UserCdcConsumer cnsmr, boolean offsetCommit) throws
Exception {
IgniteConfiguration cfg = getConfiguration("ignite-0");
Ignite ign = startGrid(cfg);
ign.cluster().state(ACTIVE);
- CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
-
IgniteCache<Integer, User> cache =
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, User> txCache =
ign.getOrCreateCache(TX_CACHE_NAME);
addAndWaitForConsumption(
cnsmr,
- cdc,
+ cfg,
cache,
txCache,
CdcSelfTest::addData,
0,
KEYS_CNT + 3,
- getTestTimeout()
+ offsetCommit
);
removeData(cache, 0, KEYS_CNT);
- IgniteInternalFuture<?> rmvFut = runAsync(cdc);
+ CdcMain cdcMain = createCdc(cnsmr, cfg);
+
+ IgniteInternalFuture<?> rmvFut = runAsync(cdcMain);
- assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE,
getTestTimeout(), cnsmr));
+ waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
+
+ checkMetrics(cdcMain, offsetCommit ? KEYS_CNT : ((KEYS_CNT + 3) * 2 +
KEYS_CNT));
rmvFut.cancel();
@@ -184,7 +200,7 @@ public class CdcSelfTest extends AbstractCdcTest {
cnsmrStarted.countDown();
try {
- startProcEvts.await(getTestTimeout(),
TimeUnit.MILLISECONDS);
+ startProcEvts.await(getTestTimeout(), MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -194,7 +210,7 @@ public class CdcSelfTest extends AbstractCdcTest {
}
};
- CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+ CdcMain cdc = createCdc(cnsmr, cfg);
runAsync(cdc);
@@ -205,14 +221,15 @@ public class CdcSelfTest extends AbstractCdcTest {
// Make sure all streamed data will become available for consumption.
Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT);
- cnsmrStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+ cnsmrStarted.await(getTestTimeout(), MILLISECONDS);
// Initiate graceful shutdown.
cdc.stop();
startProcEvts.countDown();
- assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE,
getTestTimeout(), cnsmr));
+ waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
assertTrue(waitForCondition(cnsmr::stopped, getTestTimeout()));
List<Integer> keys = cnsmr.data(UPDATE, cacheId(DEFAULT_CACHE_NAME));
@@ -225,6 +242,7 @@ public class CdcSelfTest extends AbstractCdcTest {
/** */
@Test
+ @WithSystemProperty(key = IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID,
value = "true")
public void testMultiNodeConsumption() throws Exception {
IgniteEx ign1 = startGrid(0);
@@ -237,6 +255,17 @@ public class CdcSelfTest extends AbstractCdcTest {
IgniteCache<Integer, User> cache =
ign1.getOrCreateCache(DEFAULT_CACHE_NAME);
+ // Calculate expected count of key for each node.
+ int[] keysCnt = new int[2];
+
+ for (int i = 0; i < KEYS_CNT * 2; i++) {
+ Ignite primary = primaryNode(i, DEFAULT_CACHE_NAME);
+
+ assertTrue(primary == ign1 || primary == ign2);
+
+ keysCnt[primary == ign1 ? 0 : 1]++;
+ }
+
// Adds data concurrently with CDC start.
IgniteInternalFuture<?> addDataFut = runAsync(() -> addData(cache, 0,
KEYS_CNT));
@@ -246,19 +275,32 @@ public class CdcSelfTest extends AbstractCdcTest {
IgniteConfiguration cfg1 = ign1.configuration();
IgniteConfiguration cfg2 = ign2.configuration();
- CdcMain cdc1 = new CdcMain(cfg1, null, cdcConfig(cnsmr1));
- CdcMain cdc2 = new CdcMain(cfg2, null, cdcConfig(cnsmr2));
+ // Always run CDC with consistent id to ensure instance read data for
specific node.
+ if (!specificConsistentId) {
+
cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
+
cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
+ }
+
+ CountDownLatch latch = new CountDownLatch(2);
+
+ GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0],
DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
+ GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1],
DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
+
+ CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
+ CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
IgniteInternalFuture<?> fut1 = runAsync(cdc1);
IgniteInternalFuture<?> fut2 = runAsync(cdc2);
addDataFut.get(getTestTimeout());
- addDataFut = runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2));
+ runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT *
2)).get(getTestTimeout());
- addDataFut.get(getTestTimeout());
+ // Wait while predicate will become true and state saved on the disk
for both cdc.
+ assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
- assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE,
getTestTimeout(), cnsmr1, cnsmr2));
+ checkMetrics(cdc1, keysCnt[0]);
+ checkMetrics(cdc2, keysCnt[1]);
assertFalse(cnsmr1.stopped());
assertFalse(cnsmr2.stopped());
@@ -271,10 +313,16 @@ public class CdcSelfTest extends AbstractCdcTest {
removeData(cache, 0, KEYS_CNT * 2);
+ cdc1 = createCdc(cnsmr1, cfg1);
+ cdc2 = createCdc(cnsmr2, cfg2);
+
IgniteInternalFuture<?> rmvFut1 = runAsync(cdc1);
IgniteInternalFuture<?> rmvFut2 = runAsync(cdc2);
- assertTrue(waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, DELETE,
getTestTimeout(), cnsmr1, cnsmr2));
+ waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, DELETE, cnsmr1, cnsmr2);
+
+ checkMetrics(cdc1, keysCnt[0]);
+ checkMetrics(cdc2, keysCnt[1]);
rmvFut1.cancel();
rmvFut2.cancel();
@@ -291,8 +339,8 @@ public class CdcSelfTest extends AbstractCdcTest {
UserCdcConsumer cnsmr1 = new UserCdcConsumer();
UserCdcConsumer cnsmr2 = new UserCdcConsumer();
- IgniteInternalFuture<?> fut1 = runAsync(new
CdcMain(ign.configuration(), null, cdcConfig(cnsmr1)));
- IgniteInternalFuture<?> fut2 = runAsync(new
CdcMain(ign.configuration(), null, cdcConfig(cnsmr2)));
+ IgniteInternalFuture<?> fut1 = runAsync(createCdc(cnsmr1,
ign.configuration()));
+ IgniteInternalFuture<?> fut2 = runAsync(createCdc(cnsmr2,
ign.configuration()));
assertTrue(waitForCondition(() -> fut1.isDone() || fut2.isDone(),
getTestTimeout()));
@@ -330,11 +378,13 @@ public class CdcSelfTest extends AbstractCdcTest {
}
};
- CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+ CdcMain cdc = createCdc(cnsmr, cfg);
IgniteInternalFuture<?> fut = runAsync(cdc);
- assertTrue(waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE,
getTestTimeout(), cnsmr));
+ waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+ checkMetrics(cdc, KEYS_CNT);
fut.cancel();
@@ -374,11 +424,13 @@ public class CdcSelfTest extends AbstractCdcTest {
}
};
- CdcMain cdc = new CdcMain(cfg, null, cdcConfig(cnsmr));
+ CdcMain cdc = createCdc(cnsmr, cfg);
IgniteInternalFuture<?> fut = runAsync(cdc);
- waitForSize(half, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(), cnsmr);
+ waitForSize(half, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+ checkMetrics(cdc, half);
waitForCondition(halfCommitted::get, getTestTimeout());
@@ -390,16 +442,28 @@ public class CdcSelfTest extends AbstractCdcTest {
consumeHalf.set(false);
+ cdc = createCdc(cnsmr, cfg);
+
fut = runAsync(cdc);
- waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, getTestTimeout(),
cnsmr);
- waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, getTestTimeout(),
cnsmr);
+ waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+ waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
+
+ checkMetrics(cdc, KEYS_CNT * 2 - half);
fut.cancel();
assertTrue(cnsmr.stopped());
}
+ /** {@inheritDoc} */
+ @Override public MetricExporterSpi[] metricExporters() {
+ if (metricExporter == null)
+ return null;
+
+ return new MetricExporterSpi[] {metricExporter.get()};
+ }
+
/** */
public static void addData(IgniteCache<Integer, User> cache, int from, int
to) {
for (int i = from; i < to; i++)
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
index 1e24d3b..a6ce2a0 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
@@ -18,18 +18,20 @@
package org.apache.ignite.internal.cdc;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.AbstractCdcTest;
-import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.junit.Test;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
@@ -76,11 +78,12 @@ public class SqlCdcTest extends AbstractCdcTest {
BinaryCdcConsumer cnsmr = new BinaryCdcConsumer();
- CdcConfiguration cdcCfg = new CdcConfiguration();
+ CountDownLatch latch = new CountDownLatch(1);
- cdcCfg.setConsumer(cnsmr);
+ GridAbsPredicate userPredicate = sizePredicate(KEYS_CNT, USER, UPDATE,
cnsmr);
+ GridAbsPredicate cityPredicate = sizePredicate(KEYS_CNT, CITY, UPDATE,
cnsmr);
- CdcMain cdc = new CdcMain(cfg, null, cdcCfg);
+ CdcMain cdc = createCdc(cnsmr, cfg, latch, userPredicate,
cityPredicate);
IgniteInternalFuture<?> fut = runAsync(cdc);
@@ -110,8 +113,10 @@ public class SqlCdcTest extends AbstractCdcTest {
Integer.toString(127000 + i));
}
- assertTrue(waitForSize(KEYS_CNT, USER, UPDATE, getTestTimeout(),
cnsmr));
- assertTrue(waitForSize(KEYS_CNT, CITY, UPDATE, getTestTimeout(),
cnsmr));
+ // Wait while both predicte will become true and state saved on the
disk.
+ assertTrue(latch.await(getTestTimeout(), MILLISECONDS));;
+
+ checkMetrics(cdc, KEYS_CNT * 2);
fut.cancel();
@@ -123,9 +128,13 @@ public class SqlCdcTest extends AbstractCdcTest {
for (int i = 0; i < KEYS_CNT; i++)
executeSql(ign, "DELETE FROM USER WHERE id = ?", i);
+ cdc = createCdc(cnsmr, cfg);
+
IgniteInternalFuture<?> rmvFut = runAsync(cdc);
- assertTrue(waitForSize(KEYS_CNT, USER, DELETE, getTestTimeout(),
cnsmr));
+ waitForSize(KEYS_CNT, USER, DELETE, cnsmr);
+
+ checkMetrics(cdc, KEYS_CNT);
rmvFut.cancel();
@@ -176,4 +185,9 @@ public class SqlCdcTest extends AbstractCdcTest {
private List<List<?>> executeSql(IgniteEx node, String sqlText, Object...
args) {
return node.context().query().querySqlFields(new
SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
}
+
+ /** {@inheritDoc} */
+ @Override protected boolean keepBinary() {
+ return true;
+ }
}
diff --git
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
index 1ed6244..53e10d9 100644
---
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
+++
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.cdc.WalRecordsConsumer;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.SpringApplicationContextResource;
import org.apache.ignite.resources.SpringResource;
@@ -121,7 +122,7 @@ public class CdcConfigurationTest extends
GridCommonAbstractTest {
public CountDownLatch startLatch = new CountDownLatch(1);
/** {@inheritDoc} */
- @Override public void start() {
+ @Override public void start(MetricRegistry mreg) {
springString2 = ctx.getBean("springString2", String.class);
startLatch.countDown();