This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch IGNITE-17700__realtime_cdc
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17700__realtime_cdc by
this push:
new 213d0951c14 IGNITE-19639 Add realtime CDC records (#10840)
213d0951c14 is described below
commit 213d0951c14344629223b16e6e114067fbfddf78
Author: Maksim Timonin <[email protected]>
AuthorDate: Thu Jul 13 14:12:30 2023 +0300
IGNITE-19639 Add realtime CDC records (#10840)
---
.../wal/record/RealtimeCdcRecord.java} | 21 ++---
.../wal/record/RealtimeCdcStopRecord.java} | 21 ++---
.../internal/pagemem/wal/record/WALRecord.java | 8 +-
.../cache/persistence/cdc/CdcBuffer.java | 16 +++-
.../cache/persistence/cdc/CdcBufferConsumer.java | 3 +-
.../cache/persistence/cdc/CdcProcessor.java | 9 +--
.../cache/persistence/cdc/CdcWorker.java | 55 ++++++++++---
.../wal/serializer/RecordDataV1Serializer.java | 16 ++++
.../persistence/cdc/RealtimeCdcBufferTest.java | 90 +++++++++++++++++++---
9 files changed, 178 insertions(+), 61 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcRecord.java
similarity index 69%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcRecord.java
index 67cd422fb0f..5a9e68067ab 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcRecord.java
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.cdc;
+package org.apache.ignite.internal.pagemem.wal.record;
-import java.nio.ByteBuffer;
-
-/** Mock for Realtime CDC buffer consumer. */
-public interface CdcBufferConsumer {
- /**
- * Consumes raw WAL data.
- *
- * @param data Raw data to consume.
- */
- public void consume(ByteBuffer data);
-
- /** */
- public void close();
+/** */
+public class RealtimeCdcRecord extends WALRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.REALTIME_CDC_RECORD;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcStopRecord.java
similarity index 69%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcStopRecord.java
index 67cd422fb0f..5b89bcd564a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcStopRecord.java
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.cdc;
+package org.apache.ignite.internal.pagemem.wal.record;
-import java.nio.ByteBuffer;
-
-/** Mock for Realtime CDC buffer consumer. */
-public interface CdcBufferConsumer {
- /**
- * Consumes raw WAL data.
- *
- * @param data Raw data to consume.
- */
- public void consume(ByteBuffer data);
-
- /** */
- public void close();
+/** */
+public class RealtimeCdcStopRecord extends WALRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.REALTIME_STOP_CDC_RECORD;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 3457251e0f4..02f4125215e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -291,7 +291,13 @@ public abstract class WALRecord {
INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL),
/** CDC data record. */
- CDC_DATA_RECORD(78, CUSTOM);
+ CDC_DATA_RECORD(78, CUSTOM),
+
+ /** Realtime CDC record. */
+ REALTIME_CDC_RECORD(79, CUSTOM),
+
+ /** Realtime CDC record. */
+ REALTIME_STOP_CDC_RECORD(80, CUSTOM);
/** Index for serialization. Should be consistent throughout all
versions. */
private final int idx;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
index 1b105bac5db..3f7b3ef8722 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
@@ -109,9 +109,18 @@ public class CdcBuffer {
return size.get();
}
- /** Cleans the buffer if overflowed. Performs by the consumer thread. */
- public void cleanIfOverflowed() {
- if (!overflowed || consumerNode == null)
+ /**
+ * @return {@code True} if buffer is overflowed.
+ */
+ public boolean overflowed() {
+ return overflowed;
+ }
+
+ /**
+ * Cleans the buffer. Must be performed by the consumer thread.
+ */
+ public void clean() {
+ if (consumerNode == null)
return;
ByteBuffer data;
@@ -123,6 +132,7 @@ public class CdcBuffer {
consumerNode = null;
producerNode = null;
+ overflowed = false;
size.set(0);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
index 67cd422fb0f..5e96fa2c98f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
@@ -25,8 +25,9 @@ public interface CdcBufferConsumer {
* Consumes raw WAL data.
*
* @param data Raw data to consume.
+ * @return {@code True} if current offset in WAL should be commited.
*/
- public void consume(ByteBuffer data);
+ public boolean consume(ByteBuffer data);
/** */
public void close();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
index 92adcf2acaf..9e5cb0dc4d8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.processors.cache.persistence.cdc;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -42,10 +41,8 @@ public class CdcProcessor {
public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) {
this.log = log;
- DataStorageConfiguration dsCfg =
cctx.gridConfig().getDataStorageConfiguration();
-
- cdcBuf = new CdcBuffer(dsCfg.getMaxCdcBufferSize());
- worker = new CdcWorker(cctx, log, cdcBuf, dsCfg.getCdcConsumer());
+ cdcBuf = new
CdcBuffer(cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize());
+ worker = new CdcWorker(cctx, log, cdcBuf);
}
/**
@@ -62,8 +59,6 @@ public class CdcProcessor {
enabled = false;
log.warning("CDC buffer has overflowed. Stop realtime mode of
CDC.");
-
- worker.cancel();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
index 27655413c29..5fad6492023 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
@@ -19,12 +19,19 @@ package
org.apache.ignite.internal.processors.cache.persistence.cdc;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcStopRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+
/** */
public class CdcWorker extends GridWorker {
/** Default throttling timeout in millis for polling CDC buffer. */
@@ -41,19 +48,19 @@ public class CdcWorker extends GridWorker {
private final CdcBufferConsumer consumer;
/** */
- public CdcWorker(
- GridCacheSharedContext<?, ?> cctx,
- IgniteLogger log,
- CdcBuffer cdcBuf,
- CdcBufferConsumer consumer
- ) {
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** */
+ public CdcWorker(GridCacheSharedContext<?, ?> cctx, IgniteLogger log,
CdcBuffer cdcBuf) {
super(cctx.igniteInstanceName(),
"cdc-worker%" + cctx.igniteInstanceName(),
log,
cctx.kernalContext().workersRegistry());
+ this.cctx = cctx;
this.cdcBuf = cdcBuf;
- this.consumer = consumer;
+
+ consumer =
cctx.gridConfig().getDataStorageConfiguration().getCdcConsumer();
}
/** */
@@ -61,6 +68,14 @@ public class CdcWorker extends GridWorker {
while (!isCancelled()) {
updateHeartbeat();
+ if (cdcBuf.overflowed()) {
+ log(new RealtimeCdcStopRecord());
+
+ cancel();
+
+ return;
+ }
+
ByteBuffer data = cdcBuf.poll();
if (data == null) {
@@ -72,16 +87,34 @@ public class CdcWorker extends GridWorker {
if (log.isDebugEnabled())
log.debug("Poll a data bucket from CDC buffer [len=" +
(data.limit() - data.position()) + ']');
- // TODO: Consumer must not block this system thread.
- consumer.consume(data);
+ // TODO: Consumer must not block this system thread. Or this
thread should not be system thread?
+ if (consumer.consume(data))
+ log(new RealtimeCdcRecord());
}
+ }
- consumer.close();
+ /** */
+ // TODO: rethink after IGNITE-19637. NULL might return during node start
up, then overflowing was during memory restore.
+ // What to do in such case?
+ private void log(WALRecord rec) {
+ try {
+ if (cctx.wal().log(rec) == null) {
+ long maxCdcBufSize =
cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize();
+
+ log.error("Realtime CDC misses writing WAL record. CDC buffer
size might be too low" +
+ " [rec=" + rec + ", maxCdcBufSize=" + maxCdcBufSize + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ cctx.kernalContext().failure().process(new
FailureContext(CRITICAL_ERROR, e));
+ }
}
/** {@inheritDoc} */
@Override protected void cleanup() {
- cdcBuf.cleanIfOverflowed();
+ consumer.close();
+
+ cdcBuf.clean();
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 3907bf2c98f..858b89239da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -51,6 +51,8 @@ import
org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import
org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcStopRecord;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -541,6 +543,8 @@ public class RecordDataV1Serializer implements
RecordDataSerializer {
return 4 + 8 + 1;
case SWITCH_SEGMENT_RECORD:
+ case REALTIME_CDC_RECORD:
+ case REALTIME_STOP_CDC_RECORD:
return 0;
case TX_RECORD:
@@ -1323,6 +1327,16 @@ public class RecordDataV1Serializer implements
RecordDataSerializer {
break;
+ case REALTIME_CDC_RECORD:
+ res = new RealtimeCdcRecord();
+
+ break;
+
+ case REALTIME_STOP_CDC_RECORD:
+ res = new RealtimeCdcStopRecord();
+
+ break;
+
default:
throw new UnsupportedOperationException("Type: " + type);
}
@@ -1912,6 +1926,8 @@ public class RecordDataV1Serializer implements
RecordDataSerializer {
break;
case SWITCH_SEGMENT_RECORD:
+ case REALTIME_CDC_RECORD:
+ case REALTIME_STOP_CDC_RECORD:
break;
case MASTER_KEY_CHANGE_RECORD_V2:
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
index 9b9ca8e6fc0..c42a31f9711 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
@@ -23,7 +23,9 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -33,6 +35,7 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.util.typedef.F;
@@ -40,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -68,6 +72,9 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
/** */
private int maxCdcBufSize;
+ /** */
+ private AtomicInteger commitCnt;
+
/** */
@Parameterized.Parameter()
public WALMode walMode;
@@ -86,12 +93,14 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(instanceName);
+ consumer = new ByteBufferCdcConsumer(maxCdcBufSize, commitCnt);
+
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setMaxCdcBufferSize(maxCdcBufSize)
.setCdcConsumer(consumer)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setCdcEnabled(cdcEnabled)
- .setPersistenceEnabled(true)
+ .setPersistenceEnabled(true) // TODO: test for in-memory mode.
)
.setWalMode(walMode)
);
@@ -117,15 +126,15 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
cdcEnabled = true;
- consumer = new ByteBufferCdcConsumer(10 * (int)U.MB);
-
stopLatch = null;
+ commitCnt = null;
+ consumer = null;
}
/** */
@Test
public void testCdcBufferOverflow() throws Exception {
- maxCdcBufSize = (int)U.KB;
+ maxCdcBufSize = 100 * (int)U.KB;
checkCdcBufferOverflow(10 * (int)U.KB, 100, true);
}
@@ -138,6 +147,44 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
checkCdcBufferOverflow(10 * (int)U.KB, 100, false);
}
+ /** */
+ @Test
+ public void testCdcRecords() throws Exception {
+ maxCdcBufSize = 100 * (int)U.MB;
+ commitCnt = new AtomicInteger();
+
+ IgniteEx crd = startGrid(0);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, Integer> cache = crd.cache(DEFAULT_CACHE_NAME);
+
+ // Await while cluster is fully activated.
+ cache.put(0, 0);
+
+ int expCommitCnt = 5;
+
+ commitCnt.set(expCommitCnt);
+
+ while (commitCnt.get() >= 0)
+ cache.put(0, 0);
+
+ forceCheckpoint(crd);
+
+ stopGrid(0);
+
+ try (WALIterator walIt = walIter(walSegments())) {
+ int cdcRecCnt = 0;
+
+ while (walIt.hasNext()) {
+ if (walIt.next().getValue().type() ==
WALRecord.RecordType.REALTIME_CDC_RECORD)
+ cdcRecCnt++;
+ }
+
+ assertEquals(expCommitCnt, cdcRecCnt);
+ }
+ }
+
/** */
@Test
public void testCdcBufferContent() throws Exception {
@@ -152,9 +199,7 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
U.awaitQuiet(stopLatch);
- File walSegments = U.resolveWorkDirectory(
- U.defaultWorkDirectory(),
- DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
+ File walSegments = walSegments();
WALIterator it = walIter(walSegments);
@@ -219,6 +264,24 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
stopGrid(0);
assertTrue(lsnr.check());
+
+ try (WALIterator walIt = walIter(walSegments())) {
+ int stopRecCnt = 0;
+
+ while (walIt.hasNext()) {
+ if (walIt.next().getValue().type() ==
WALRecord.RecordType.REALTIME_STOP_CDC_RECORD)
+ stopRecCnt++;
+ }
+
+ assertEquals(shouldOverflow ? 1 : 0, stopRecCnt);
+ }
+ }
+
+ /** */
+ private File walSegments() throws IgniteCheckedException {
+ return U.resolveWorkDirectory(
+ U.defaultWorkDirectory(),
+ DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
}
/** Get iterator over WAL. */
@@ -237,7 +300,12 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
private final ByteBuffer buf;
/** */
- ByteBufferCdcConsumer(int maxCdcBufSize) {
+ private final AtomicInteger commitCnt;
+
+ /** */
+ ByteBufferCdcConsumer(int maxCdcBufSize, @Nullable AtomicInteger
commitCnt) {
+ this.commitCnt = commitCnt;
+
buf = ByteBuffer.allocate(maxCdcBufSize);
Arrays.fill(buf.array(), (byte)0);
@@ -245,9 +313,11 @@ public class RealtimeCdcBufferTest extends
GridCommonAbstractTest {
buf.position(0);
}
- /** */
- @Override public void consume(ByteBuffer data) {
+ /** {@inheritDoc} */
+ @Override public boolean consume(ByteBuffer data) {
buf.put(data);
+
+ return commitCnt != null && commitCnt.decrementAndGet() >= 0;
}
/** */