This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ef68b6fff35 IGNITE-14450 Add Maximum CDC directory size configuration
parameter (#10456)
ef68b6fff35 is described below
commit ef68b6fff355e78060d6ca5b53caa230ee246f60
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Jan 30 14:19:48 2023 +0300
IGNITE-14450 Add Maximum CDC directory size configuration parameter
(#10456)
---
.../configuration/DataStorageConfiguration.java | 33 +++++++++++
.../org/apache/ignite/internal/cdc/CdcMain.java | 5 +-
.../persistence/wal/FileWriteAheadLogManager.java | 31 +++++++++-
.../java/org/apache/ignite/cdc/CdcSelfTest.java | 66 +++++++++++++++++++++-
4 files changed, 131 insertions(+), 4 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index b2b6d228588..8daa97a1c7b 100644
---
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -164,6 +164,9 @@ public class DataStorageConfiguration implements
Serializable {
/** Default change data capture directory. */
public static final String DFLT_WAL_CDC_PATH = "db/wal/cdc";
+ /** Default change data capture directory maximum size. */
+ public static final long DFLT_CDC_WAL_DIRECTORY_MAX_SIZE = 0;
+
/** Default path (relative to working directory) of binary metadata folder
*/
public static final String DFLT_BINARY_METADATA_PATH = "db/binary_meta";
@@ -245,6 +248,10 @@ public class DataStorageConfiguration implements
Serializable {
@IgniteExperimental
private String cdcWalPath = DFLT_WAL_CDC_PATH;
+ /** Change Data Capture directory size limit. */
+ @IgniteExperimental
+ private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+
/**
* Metrics enabled flag.
* @deprecated Will be removed in upcoming releases.
@@ -801,6 +808,32 @@ public class DataStorageConfiguration implements
Serializable {
return this;
}
+ /**
+ * Sets the {@link #getCdcWalPath CDC directory} maximum size in bytes.
+ *
+ * @return CDC directory maximum size in bytes.
+ */
+ @IgniteExperimental
+ public long getCdcWalDirectoryMaxSize() {
+ return cdcWalDirMaxSize;
+ }
+
+ /**
+ * Sets the CDC directory maximum size in bytes. Zero or negative means no
limit. Creation of segment CDC link
+ * will be skipped when the total size of CDC files in the {@link
#getCdcWalPath directory} exceeds the limit.
+ * The CDC application will log an error due to a gap in wal files
sequence. Note that cache changes will be lost.
+ * Default is no limit.
+ *
+ * @param cdcWalDirMaxSize CDC directory maximum size in bytes.
+ * @return {@code this} for chaining.
+ */
+ @IgniteExperimental
+ public DataStorageConfiguration setCdcWalDirectoryMaxSize(long
cdcWalDirMaxSize) {
+ this.cdcWalDirMaxSize = cdcWalDirMaxSize;
+
+ return this;
+ }
+
/**
* Gets flag indicating whether persistence metrics collection is enabled.
* Default value is {@link #DFLT_METRICS_ENABLED}.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index e16e1b409f3..b7d449fa6a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -442,7 +442,10 @@ public class CdcMain implements Runnable {
.peek(p -> {
long nextSgmnt = segmentIndex(p);
- assert lastSgmnt.get() == -1 || nextSgmnt -
lastSgmnt.get() == 1;
+ if (lastSgmnt.get() != -1 && nextSgmnt -
lastSgmnt.get() != 1) {
+ throw new IgniteException("Found missed
segments. Some events are missed. " +
+ "[lastSegment=" + lastSgmnt.get() + ",
nextSegment=" + nextSgmnt + ']');
+ }
lastSgmnt.set(nextSgmnt);
})
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 0dc36ad6174..619a84ef0fe 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -2140,8 +2140,14 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
Files.move(dstTmpFile.toPath(), dstFile.toPath());
if (walCdcDir != null) {
- if (!cdcDisabled.getOrDefault(false))
-
Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()),
dstFile.toPath());
+ if (!cdcDisabled.getOrDefault(false)) {
+ if (checkCdcWalDirectorySize(dstFile.length()))
+
Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()),
dstFile.toPath());
+ else {
+ log.error("Creation of segment CDC link skipped.
Configured CDC directory " +
+ "maximum size exceeded.");
+ }
+ }
else {
log.warning("Creation of segment CDC link skipped. " +
"'" + CDC_DISABLED + "' distributed property is
'true'.");
@@ -2212,6 +2218,27 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
new IgniteThread(archiver).start();
}
+
+ /**
+ * @param len Length of file to check size.
+ * @return {@code True} if the CDC directory size check successful,
otherwise {@code false}.
+ */
+ private boolean checkCdcWalDirectorySize(long len) {
+ long maxDirSize =
igCfg.getDataStorageConfiguration().getCdcWalDirectoryMaxSize();
+
+ if (maxDirSize <= 0)
+ return true;
+
+ long dirSize =
Arrays.stream(walCdcDir.listFiles(WAL_SEGMENT_FILE_FILTER)).mapToLong(File::length).sum();
+
+ if (dirSize + len <= maxDirSize)
+ return true;
+
+ log.warning("Configured CDC WAL directory maximum size exceeded
[curDirSize=" + dirSize +
+ ", fileLength=" + len + ", cdcWalDirectoryMaxSize=" +
maxDirSize + ']');
+
+ return false;
+ }
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index 369a7663aa1..6e748c2eb93 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cdc;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -46,8 +47,11 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
@@ -55,6 +59,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.Ignite
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -68,8 +73,11 @@ import static
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeTrue;
@@ -95,6 +103,9 @@ public class CdcSelfTest extends AbstractCdcTest {
@Parameterized.Parameter(2)
public boolean persistenceEnabled;
+ /** */
+ private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+
/** */
@Parameterized.Parameters(name = "consistentId={0}, wal={1},
persistence={2}")
public static Collection<?> parameters() {
@@ -121,7 +132,8 @@ public class CdcSelfTest extends AbstractCdcTest {
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
.setCdcEnabled(true))
- .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" +
U.maskForFileName(igniteInstanceName)));
+ .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" +
U.maskForFileName(igniteInstanceName))
+ .setCdcWalDirectoryMaxSize(cdcWalDirMaxSize));
cfg.setCacheConfiguration(
new CacheConfiguration<>(TX_CACHE_NAME)
@@ -726,6 +738,58 @@ public class CdcSelfTest extends AbstractCdcTest {
assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 *
WAL_ARCHIVE_TIMEOUT));
}
+ /** */
+ @Test
+ public void testCdcDirectoryMaxSize() throws Exception {
+ cdcWalDirMaxSize = 10 * U.MB;
+ int segmentSize = (int)(cdcWalDirMaxSize / 2);
+
+ IgniteEx ign = startGrid(0);
+
+ ign.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, User> cache =
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+ IgniteWriteAheadLogManager wal =
ign.context().cache().context().wal(true);
+ File walCdcDir = U.field(ign.context().cache().context().wal(true),
"walCdcDir");
+
+ RunnableX writeSgmnt = () -> {
+ int sgmnts = wal.walArchiveSegments();
+ int dataSize = (int)(segmentSize * 0.8);
+
+ for (int i = 0; i < dataSize / DFLT_PAGE_SIZE; i++)
+ wal.log(new PageSnapshot(new FullPageId(-1, -1), new
byte[DFLT_PAGE_SIZE], 1));
+
+ addData(cache, 0, 1);
+
+ waitForCondition(() -> wal.walArchiveSegments() > sgmnts, 2 *
WAL_ARCHIVE_TIMEOUT);
+ };
+
+ // Write to the WAL to exceed the configured max size.
+ writeSgmnt.run();
+ writeSgmnt.run();
+
+ // The segment link creation should be skipped.
+ writeSgmnt.run();
+
+ assertTrue(cdcWalDirMaxSize >=
Arrays.stream(walCdcDir.listFiles()).mapToLong(File::length).sum());
+
+ UserCdcConsumer cnsmr = new UserCdcConsumer();
+
+ CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name()));
+
+ IgniteInternalFuture<?> fut = runAsync(cdc);
+
+ waitForSize(2, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+
+ assertFalse(fut.isDone());
+
+ // Write next segment after skipped.
+ writeSgmnt.run();
+
+ assertThrows(log, () -> fut.get(getTestTimeout()),
IgniteCheckedException.class,
+ "Found missed segments. Some events are missed.");
+ }
+
/** */
public static void addData(IgniteCache<Integer, User> cache, int from, int
to) {
for (int i = from; i < to; i++)