This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ef68b6fff35  IGNITE-14450 Add Maximum CDC directory size configuration 
parameter (#10456)
ef68b6fff35 is described below

commit ef68b6fff355e78060d6ca5b53caa230ee246f60
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Jan 30 14:19:48 2023 +0300

     IGNITE-14450 Add Maximum CDC directory size configuration parameter 
(#10456)
---
 .../configuration/DataStorageConfiguration.java    | 33 +++++++++++
 .../org/apache/ignite/internal/cdc/CdcMain.java    |  5 +-
 .../persistence/wal/FileWriteAheadLogManager.java  | 31 +++++++++-
 .../java/org/apache/ignite/cdc/CdcSelfTest.java    | 66 +++++++++++++++++++++-
 4 files changed, 131 insertions(+), 4 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index b2b6d228588..8daa97a1c7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -164,6 +164,9 @@ public class DataStorageConfiguration implements 
Serializable {
     /** Default change data capture directory. */
     public static final String DFLT_WAL_CDC_PATH = "db/wal/cdc";
 
+    /** Default change data capture directory maximum size. */
+    public static final long DFLT_CDC_WAL_DIRECTORY_MAX_SIZE = 0;
+
     /** Default path (relative to working directory) of binary metadata folder 
*/
     public static final String DFLT_BINARY_METADATA_PATH = "db/binary_meta";
 
@@ -245,6 +248,10 @@ public class DataStorageConfiguration implements 
Serializable {
     @IgniteExperimental
     private String cdcWalPath = DFLT_WAL_CDC_PATH;
 
+    /** Change Data Capture directory size limit. */
+    @IgniteExperimental
+    private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+
     /**
      * Metrics enabled flag.
      * @deprecated Will be removed in upcoming releases.
@@ -801,6 +808,32 @@ public class DataStorageConfiguration implements 
Serializable {
         return this;
     }
 
+    /**
+     * Sets the {@link #getCdcWalPath CDC directory} maximum size in bytes.
+     *
+     * @return CDC directory maximum size in bytes.
+     */
+    @IgniteExperimental
+    public long getCdcWalDirectoryMaxSize() {
+        return cdcWalDirMaxSize;
+    }
+
+    /**
+     * Sets the CDC directory maximum size in bytes. Zero or negative means no 
limit. Creation of segment CDC link
+     * will be skipped when the total size of CDC files in the {@link 
#getCdcWalPath directory} exceeds the limit.
+     * The CDC application will log an error due to a gap in wal files 
sequence. Note that cache changes will be lost.
+     * Default is no limit.
+     *
+     * @param cdcWalDirMaxSize CDC directory maximum size in bytes.
+     * @return {@code this} for chaining.
+     */
+    @IgniteExperimental
+    public DataStorageConfiguration setCdcWalDirectoryMaxSize(long 
cdcWalDirMaxSize) {
+        this.cdcWalDirMaxSize = cdcWalDirMaxSize;
+
+        return this;
+    }
+
     /**
      * Gets flag indicating whether persistence metrics collection is enabled.
      * Default value is {@link #DFLT_METRICS_ENABLED}.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index e16e1b409f3..b7d449fa6a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -442,7 +442,10 @@ public class CdcMain implements Runnable {
                         .peek(p -> {
                             long nextSgmnt = segmentIndex(p);
 
-                            assert lastSgmnt.get() == -1 || nextSgmnt - 
lastSgmnt.get() == 1;
+                            if (lastSgmnt.get() != -1 && nextSgmnt - 
lastSgmnt.get() != 1) {
+                                throw new IgniteException("Found missed 
segments. Some events are missed. " +
+                                    "[lastSegment=" + lastSgmnt.get() + ", 
nextSegment=" + nextSgmnt + ']');
+                            }
 
                             lastSgmnt.set(nextSgmnt);
                         })
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 0dc36ad6174..619a84ef0fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2140,8 +2140,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 Files.move(dstTmpFile.toPath(), dstFile.toPath());
 
                 if (walCdcDir != null) {
-                    if (!cdcDisabled.getOrDefault(false))
-                        
Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), 
dstFile.toPath());
+                    if (!cdcDisabled.getOrDefault(false)) {
+                        if (checkCdcWalDirectorySize(dstFile.length()))
+                            
Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), 
dstFile.toPath());
+                        else {
+                            log.error("Creation of segment CDC link skipped. 
Configured CDC directory " +
+                                "maximum size exceeded.");
+                        }
+                    }
                     else {
                         log.warning("Creation of segment CDC link skipped. " +
                             "'" + CDC_DISABLED + "' distributed property is 
'true'.");
@@ -2212,6 +2218,27 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             new IgniteThread(archiver).start();
         }
+
+        /**
+         * @param len Length of file to check size.
+         * @return {@code True} if the CDC directory size check successful, 
otherwise {@code false}.
+         */
+        private boolean checkCdcWalDirectorySize(long len) {
+            long maxDirSize = 
igCfg.getDataStorageConfiguration().getCdcWalDirectoryMaxSize();
+
+            if (maxDirSize <= 0)
+                return true;
+
+            long dirSize = 
Arrays.stream(walCdcDir.listFiles(WAL_SEGMENT_FILE_FILTER)).mapToLong(File::length).sum();
+
+            if (dirSize + len <= maxDirSize)
+                return true;
+
+            log.warning("Configured CDC WAL directory maximum size exceeded 
[curDirSize=" + dirSize +
+                ", fileLength=" + len + ", cdcWalDirectoryMaxSize=" + 
maxDirSize + ']');
+
+            return false;
+        }
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index 369a7663aa1..6e748c2eb93 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc;
 import java.io.File;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -46,8 +47,11 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
@@ -55,6 +59,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.Ignite
 import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.RunnableX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -68,8 +73,11 @@ import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
 import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
 import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.junit.Assume.assumeTrue;
@@ -95,6 +103,9 @@ public class CdcSelfTest extends AbstractCdcTest {
     @Parameterized.Parameter(2)
     public boolean persistenceEnabled;
 
+    /** */
+    private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+
     /** */
     @Parameterized.Parameters(name = "consistentId={0}, wal={1}, 
persistence={2}")
     public static Collection<?> parameters() {
@@ -121,7 +132,8 @@ public class CdcSelfTest extends AbstractCdcTest {
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setPersistenceEnabled(persistenceEnabled)
                 .setCdcEnabled(true))
-            .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + 
U.maskForFileName(igniteInstanceName)));
+            .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + 
U.maskForFileName(igniteInstanceName))
+            .setCdcWalDirectoryMaxSize(cdcWalDirMaxSize));
 
         cfg.setCacheConfiguration(
             new CacheConfiguration<>(TX_CACHE_NAME)
@@ -726,6 +738,58 @@ public class CdcSelfTest extends AbstractCdcTest {
         assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * 
WAL_ARCHIVE_TIMEOUT));
     }
 
+    /** */
+    @Test
+    public void testCdcDirectoryMaxSize() throws Exception {
+        cdcWalDirMaxSize = 10 * U.MB;
+        int segmentSize = (int)(cdcWalDirMaxSize / 2);
+
+        IgniteEx ign = startGrid(0);
+
+        ign.cluster().state(ACTIVE);
+
+        IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+        IgniteWriteAheadLogManager wal = 
ign.context().cache().context().wal(true);
+        File walCdcDir = U.field(ign.context().cache().context().wal(true), 
"walCdcDir");
+
+        RunnableX writeSgmnt = () -> {
+            int sgmnts = wal.walArchiveSegments();
+            int dataSize = (int)(segmentSize * 0.8);
+
+            for (int i = 0; i < dataSize / DFLT_PAGE_SIZE; i++)
+                wal.log(new PageSnapshot(new FullPageId(-1, -1), new 
byte[DFLT_PAGE_SIZE], 1));
+
+            addData(cache, 0, 1);
+
+            waitForCondition(() -> wal.walArchiveSegments() > sgmnts, 2 * 
WAL_ARCHIVE_TIMEOUT);
+        };
+
+        // Write to the WAL to exceed the configured max size.
+        writeSgmnt.run();
+        writeSgmnt.run();
+
+        // The segment link creation should be skipped.
+        writeSgmnt.run();
+
+        assertTrue(cdcWalDirMaxSize >= 
Arrays.stream(walCdcDir.listFiles()).mapToLong(File::length).sum());
+
+        UserCdcConsumer cnsmr = new UserCdcConsumer();
+
+        CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name()));
+
+        IgniteInternalFuture<?> fut = runAsync(cdc);
+
+        waitForSize(2, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+        assertFalse(fut.isDone());
+
+        // Write next segment after skipped.
+        writeSgmnt.run();
+
+        assertThrows(log, () -> fut.get(getTestTimeout()), 
IgniteCheckedException.class,
+            "Found missed segments. Some events are missed.");
+    }
+
     /** */
     public static void addData(IgniteCache<Integer, User> cache, int from, int 
to) {
         for (int i = from; i < to; i++)

Reply via email to