This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch IGNITE-17700__realtime_cdc
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17700__realtime_cdc by 
this push:
     new 213d0951c14 IGNITE-19639 Add realtime CDC records (#10840)
213d0951c14 is described below

commit 213d0951c14344629223b16e6e114067fbfddf78
Author: Maksim Timonin <[email protected]>
AuthorDate: Thu Jul 13 14:12:30 2023 +0300

    IGNITE-19639 Add realtime CDC records (#10840)
---
 .../wal/record/RealtimeCdcRecord.java}             | 21 ++---
 .../wal/record/RealtimeCdcStopRecord.java}         | 21 ++---
 .../internal/pagemem/wal/record/WALRecord.java     |  8 +-
 .../cache/persistence/cdc/CdcBuffer.java           | 16 +++-
 .../cache/persistence/cdc/CdcBufferConsumer.java   |  3 +-
 .../cache/persistence/cdc/CdcProcessor.java        |  9 +--
 .../cache/persistence/cdc/CdcWorker.java           | 55 ++++++++++---
 .../wal/serializer/RecordDataV1Serializer.java     | 16 ++++
 .../persistence/cdc/RealtimeCdcBufferTest.java     | 90 +++++++++++++++++++---
 9 files changed, 178 insertions(+), 61 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcRecord.java
similarity index 69%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcRecord.java
index 67cd422fb0f..5a9e68067ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcRecord.java
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.cache.persistence.cdc;
+package org.apache.ignite.internal.pagemem.wal.record;
 
-import java.nio.ByteBuffer;
-
-/** Mock for Realtime CDC buffer consumer. */
-public interface CdcBufferConsumer {
-    /**
-     * Consumes raw WAL data.
-     *
-     * @param data Raw data to consume.
-     */
-    public void consume(ByteBuffer data);
-
-    /** */
-    public void close();
+/** */
+public class RealtimeCdcRecord extends WALRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.REALTIME_CDC_RECORD;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcStopRecord.java
similarity index 69%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcStopRecord.java
index 67cd422fb0f..5b89bcd564a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RealtimeCdcStopRecord.java
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.cache.persistence.cdc;
+package org.apache.ignite.internal.pagemem.wal.record;
 
-import java.nio.ByteBuffer;
-
-/** Mock for Realtime CDC buffer consumer. */
-public interface CdcBufferConsumer {
-    /**
-     * Consumes raw WAL data.
-     *
-     * @param data Raw data to consume.
-     */
-    public void consume(ByteBuffer data);
-
-    /** */
-    public void close();
+/** */
+public class RealtimeCdcStopRecord extends WALRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.REALTIME_STOP_CDC_RECORD;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 3457251e0f4..02f4125215e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -291,7 +291,13 @@ public abstract class WALRecord {
         INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL),
 
         /** CDC data record. */
-        CDC_DATA_RECORD(78, CUSTOM);
+        CDC_DATA_RECORD(78, CUSTOM),
+
+        /** Realtime CDC record. */
+        REALTIME_CDC_RECORD(79, CUSTOM),
+
+        /** Realtime CDC record. */
+        REALTIME_STOP_CDC_RECORD(80, CUSTOM);
 
         /** Index for serialization. Should be consistent throughout all 
versions. */
         private final int idx;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
index 1b105bac5db..3f7b3ef8722 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
@@ -109,9 +109,18 @@ public class CdcBuffer {
         return size.get();
     }
 
-    /** Cleans the buffer if overflowed. Performs by the consumer thread. */
-    public void cleanIfOverflowed() {
-        if (!overflowed || consumerNode == null)
+    /**
+     * @return {@code True} if buffer is overflowed.
+     */
+    public boolean overflowed() {
+        return overflowed;
+    }
+
+    /**
+     * Cleans the buffer. Must be performed by the consumer thread.
+     */
+    public void clean() {
+        if (consumerNode == null)
             return;
 
         ByteBuffer data;
@@ -123,6 +132,7 @@ public class CdcBuffer {
 
         consumerNode = null;
         producerNode = null;
+        overflowed = false;
 
         size.set(0);
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
index 67cd422fb0f..5e96fa2c98f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
@@ -25,8 +25,9 @@ public interface CdcBufferConsumer {
      * Consumes raw WAL data.
      *
      * @param data Raw data to consume.
+     * @return {@code True} if current offset in WAL should be commited.
      */
-    public void consume(ByteBuffer data);
+    public boolean consume(ByteBuffer data);
 
     /** */
     public void close();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
index 92adcf2acaf..9e5cb0dc4d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.cache.persistence.cdc;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -42,10 +41,8 @@ public class CdcProcessor {
     public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) {
         this.log = log;
 
-        DataStorageConfiguration dsCfg = 
cctx.gridConfig().getDataStorageConfiguration();
-
-        cdcBuf = new CdcBuffer(dsCfg.getMaxCdcBufferSize());
-        worker = new CdcWorker(cctx, log, cdcBuf, dsCfg.getCdcConsumer());
+        cdcBuf = new 
CdcBuffer(cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize());
+        worker = new CdcWorker(cctx, log, cdcBuf);
     }
 
     /**
@@ -62,8 +59,6 @@ public class CdcProcessor {
             enabled = false;
 
             log.warning("CDC buffer has overflowed. Stop realtime mode of 
CDC.");
-
-            worker.cancel();
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
index 27655413c29..5fad6492023 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
@@ -19,12 +19,19 @@ package 
org.apache.ignite.internal.processors.cache.persistence.cdc;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcStopRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
 
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+
 /** */
 public class CdcWorker extends GridWorker {
     /** Default throttling timeout in millis for polling CDC buffer. */
@@ -41,19 +48,19 @@ public class CdcWorker extends GridWorker {
     private final CdcBufferConsumer consumer;
 
     /** */
-    public CdcWorker(
-        GridCacheSharedContext<?, ?> cctx,
-        IgniteLogger log,
-        CdcBuffer cdcBuf,
-        CdcBufferConsumer consumer
-    ) {
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /** */
+    public CdcWorker(GridCacheSharedContext<?, ?> cctx, IgniteLogger log, 
CdcBuffer cdcBuf) {
         super(cctx.igniteInstanceName(),
             "cdc-worker%" + cctx.igniteInstanceName(),
             log,
             cctx.kernalContext().workersRegistry());
 
+        this.cctx = cctx;
         this.cdcBuf = cdcBuf;
-        this.consumer = consumer;
+
+        consumer = 
cctx.gridConfig().getDataStorageConfiguration().getCdcConsumer();
     }
 
     /** */
@@ -61,6 +68,14 @@ public class CdcWorker extends GridWorker {
         while (!isCancelled()) {
             updateHeartbeat();
 
+            if (cdcBuf.overflowed()) {
+                log(new RealtimeCdcStopRecord());
+
+                cancel();
+
+                return;
+            }
+
             ByteBuffer data = cdcBuf.poll();
 
             if (data == null) {
@@ -72,16 +87,34 @@ public class CdcWorker extends GridWorker {
             if (log.isDebugEnabled())
                 log.debug("Poll a data bucket from CDC buffer [len=" + 
(data.limit() - data.position()) + ']');
 
-            // TODO: Consumer must not block this system thread.
-            consumer.consume(data);
+            // TODO: Consumer must not block this system thread. Or this 
thread should not be system thread?
+            if (consumer.consume(data))
+                log(new RealtimeCdcRecord());
         }
+    }
 
-        consumer.close();
+    /** */
+    // TODO: rethink after IGNITE-19637. NULL might return during node start 
up, then overflowing was during memory restore.
+    //       What to do in such case?
+    private void log(WALRecord rec) {
+        try {
+            if (cctx.wal().log(rec) == null) {
+                long maxCdcBufSize = 
cctx.gridConfig().getDataStorageConfiguration().getMaxCdcBufferSize();
+
+                log.error("Realtime CDC misses writing WAL record. CDC buffer 
size might be too low" +
+                    " [rec=" + rec + ", maxCdcBufSize=" + maxCdcBufSize + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, e));
+        }
     }
 
     /** {@inheritDoc} */
     @Override protected void cleanup() {
-        cdcBuf.cleanIfOverflowed();
+        consumer.close();
+
+        cdcBuf.clean();
     }
 
     /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 3907bf2c98f..858b89239da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -51,6 +51,8 @@ import 
org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import 
org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RealtimeCdcStopRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -541,6 +543,8 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
                 return 4 + 8 + 1;
 
             case SWITCH_SEGMENT_RECORD:
+            case REALTIME_CDC_RECORD:
+            case REALTIME_STOP_CDC_RECORD:
                 return 0;
 
             case TX_RECORD:
@@ -1323,6 +1327,16 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
 
                 break;
 
+            case REALTIME_CDC_RECORD:
+                res = new RealtimeCdcRecord();
+
+                break;
+
+            case REALTIME_STOP_CDC_RECORD:
+                res = new RealtimeCdcStopRecord();
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + type);
         }
@@ -1912,6 +1926,8 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
                 break;
 
             case SWITCH_SEGMENT_RECORD:
+            case REALTIME_CDC_RECORD:
+            case REALTIME_STOP_CDC_RECORD:
                 break;
 
             case MASTER_KEY_CHANGE_RECORD_V2:
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
index 9b9ca8e6fc0..c42a31f9711 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
@@ -23,7 +23,9 @@ import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -33,6 +35,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.util.typedef.F;
@@ -40,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,6 +72,9 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
     /** */
     private int maxCdcBufSize;
 
+    /** */
+    private AtomicInteger commitCnt;
+
     /** */
     @Parameterized.Parameter()
     public WALMode walMode;
@@ -86,12 +93,14 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(instanceName);
 
+        consumer = new ByteBufferCdcConsumer(maxCdcBufSize, commitCnt);
+
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setMaxCdcBufferSize(maxCdcBufSize)
             .setCdcConsumer(consumer)
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setCdcEnabled(cdcEnabled)
-                .setPersistenceEnabled(true)
+                .setPersistenceEnabled(true)  // TODO: test for in-memory mode.
             )
             .setWalMode(walMode)
         );
@@ -117,15 +126,15 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
 
         cdcEnabled = true;
 
-        consumer = new ByteBufferCdcConsumer(10 * (int)U.MB);
-
         stopLatch = null;
+        commitCnt = null;
+        consumer = null;
     }
 
     /** */
     @Test
     public void testCdcBufferOverflow() throws Exception {
-        maxCdcBufSize = (int)U.KB;
+        maxCdcBufSize = 100 * (int)U.KB;
 
         checkCdcBufferOverflow(10 * (int)U.KB, 100, true);
     }
@@ -138,6 +147,44 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
         checkCdcBufferOverflow(10 * (int)U.KB, 100, false);
     }
 
+    /** */
+    @Test
+    public void testCdcRecords() throws Exception {
+        maxCdcBufSize = 100 * (int)U.MB;
+        commitCnt = new AtomicInteger();
+
+        IgniteEx crd = startGrid(0);
+
+        crd.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, Integer> cache = crd.cache(DEFAULT_CACHE_NAME);
+
+        // Await while cluster is fully activated.
+        cache.put(0, 0);
+
+        int expCommitCnt = 5;
+
+        commitCnt.set(expCommitCnt);
+
+        while (commitCnt.get() >= 0)
+            cache.put(0, 0);
+
+        forceCheckpoint(crd);
+
+        stopGrid(0);
+
+        try (WALIterator walIt = walIter(walSegments())) {
+            int cdcRecCnt = 0;
+
+            while (walIt.hasNext()) {
+                if (walIt.next().getValue().type() == 
WALRecord.RecordType.REALTIME_CDC_RECORD)
+                    cdcRecCnt++;
+            }
+
+            assertEquals(expCommitCnt, cdcRecCnt);
+        }
+    }
+
     /** */
     @Test
     public void testCdcBufferContent() throws Exception {
@@ -152,9 +199,7 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
 
         U.awaitQuiet(stopLatch);
 
-        File walSegments = U.resolveWorkDirectory(
-            U.defaultWorkDirectory(),
-            DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
+        File walSegments = walSegments();
 
         WALIterator it = walIter(walSegments);
 
@@ -219,6 +264,24 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
         stopGrid(0);
 
         assertTrue(lsnr.check());
+
+        try (WALIterator walIt = walIter(walSegments())) {
+            int stopRecCnt = 0;
+
+            while (walIt.hasNext()) {
+                if (walIt.next().getValue().type() == 
WALRecord.RecordType.REALTIME_STOP_CDC_RECORD)
+                    stopRecCnt++;
+            }
+
+            assertEquals(shouldOverflow ? 1 : 0, stopRecCnt);
+        }
+    }
+
+    /** */
+    private File walSegments() throws IgniteCheckedException {
+        return U.resolveWorkDirectory(
+            U.defaultWorkDirectory(),
+            DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
     }
 
     /** Get iterator over WAL. */
@@ -237,7 +300,12 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
         private final ByteBuffer buf;
 
         /** */
-        ByteBufferCdcConsumer(int maxCdcBufSize) {
+        private final AtomicInteger commitCnt;
+
+        /** */
+        ByteBufferCdcConsumer(int maxCdcBufSize, @Nullable AtomicInteger 
commitCnt) {
+            this.commitCnt = commitCnt;
+
             buf = ByteBuffer.allocate(maxCdcBufSize);
 
             Arrays.fill(buf.array(), (byte)0);
@@ -245,9 +313,11 @@ public class RealtimeCdcBufferTest extends 
GridCommonAbstractTest {
             buf.position(0);
         }
 
-        /** */
-        @Override public void consume(ByteBuffer data) {
+        /** {@inheritDoc} */
+        @Override public boolean consume(ByteBuffer data) {
             buf.put(data);
+
+            return commitCnt != null && commitCnt.decrementAndGet() >= 0;
         }
 
         /** */

Reply via email to