This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d9460a0 Add non-blocking mode for CDC writes
d9460a0 is described below
commit d9460a04daee5fa97639abf2b6e28ff9b29cf636
Author: Yifan Cai <[email protected]>
AuthorDate: Fri Dec 3 12:18:31 2021 -0800
Add non-blocking mode for CDC writes
patch by Yifan Cai; reviewed by Josh McKenzie for CASSANDRA-17001
---
CHANGES.txt | 1 +
NEWS.txt | 4 +
src/java/org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 17 ++
.../apache/cassandra/db/commitlog/CommitLog.java | 24 ++
.../db/commitlog/CommitLogDescriptor.java | 18 +-
.../cassandra/db/commitlog/CommitLogMBean.java | 4 +
.../cassandra/db/commitlog/CommitLogSegment.java | 2 +-
.../db/commitlog/CommitLogSegmentManagerCDC.java | 121 +++++++---
.../cassandra/utils/DirectorySizeCalculator.java | 10 +-
.../db/commitlog/CommitLogDescriptorTest.java | 17 ++
.../commitlog/CommitLogSegmentManagerCDCTest.java | 262 ++++++++++++---------
12 files changed, 327 insertions(+), 156 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 14a1623..91ff5a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Add non-blocking mode for CDC writes (CASSANDRA-17001)
* Add guardrails framework (CASSANDRA-17147)
* Harden resource management on SSTable components to prevent future leaks
(CASSANDRA-17174)
* Make nodes more resilient to local unrelated files during startup
(CASSANDRA-17082)
diff --git a/NEWS.txt b/NEWS.txt
index 9707cce..f365a68 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,10 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - CDC data flushing now can be configured to be non-blocking with the
configuration cdc_block_writes. Setting to true,
+ any writes to the CDC-enabled tables will be blocked when reaching to
the limit for CDC data on disk, which is the
+ existing and the default behavior. Setting to false, the writes to the
CDC-enabled tables will be accepted and
+ the oldest CDC data on disk will be deleted to ensure the size
constraint.
- New native functions to convert unix time values into C* native types:
toDate(bigint), toTimestamp(bigint),
mintimeuuid(bigint) and maxtimeuuid(bigint)
- Support for multiple permission in a single GRANT/REVOKE/LIST statement
has been added. It allows to
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index cdf83b8..9fb57797 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,6 +281,9 @@ public class Config
// Change-data-capture logs
public boolean cdc_enabled = false;
+ // When true, new CDC mutations are rejected/blocked when reaching max CDC
storage.
+ // When false, new CDC mutations can always be added. But it will remove
the oldest CDC commit log segment on full.
+ public volatile boolean cdc_block_writes = true;
public String cdc_raw_directory;
public int cdc_total_space_in_mb = 0;
public int cdc_free_space_check_interval_ms = 250;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8ed8612..d3abeb0 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2095,6 +2095,13 @@ public class DatabaseDescriptor
return (int)
ByteUnit.MEBI_BYTES.toBytes(conf.commitlog_segment_size_in_mb);
}
+ /**
+ * Update commitlog_segment_size_in_mb in the tests.
+ * {@link CommitLogSegmentManagerCDC} uses the CommitLogSegmentSize to
estimate the file size on allocation.
+ * It is important to keep the value unchanged for the estimation to be
correct.
+ * @param sizeMegabytes
+ */
+ @VisibleForTesting /* Only for testing */
public static void setCommitLogSegmentSize(int sizeMegabytes)
{
conf.commitlog_segment_size_in_mb = sizeMegabytes;
@@ -3157,6 +3164,16 @@ public class DatabaseDescriptor
conf.cdc_enabled = cdc_enabled;
}
+ public static boolean getCDCBlockWrites()
+ {
+ return conf.cdc_block_writes;
+ }
+
+ public static void setCDCBlockWrites(boolean val)
+ {
+ conf.cdc_block_writes = val;
+ }
+
public static String getCDCLogLocation()
{
return conf.cdc_raw_directory;
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a4be769..c605234 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -28,6 +28,7 @@ import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.io.util.File;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -420,6 +421,29 @@ public class CommitLog implements CommitLogMBean
return segmentRatios;
}
+ @Override
+ public boolean getCDCBlockWrites()
+ {
+ return DatabaseDescriptor.getCDCBlockWrites();
+ }
+
+ @Override
+ public void setCDCBlockWrites(boolean val)
+ {
+ Preconditions.checkState(DatabaseDescriptor.isCDCEnabled(),
+ "Unable to set block_writes (%s): CDC is not
enabled.", val);
+ Preconditions.checkState(segmentManager instanceof
CommitLogSegmentManagerCDC,
+ "CDC is enabled but we have the wrong
CommitLogSegmentManager type: %s. " +
+ "Please report this as bug.",
segmentManager.getClass().getName());
+ boolean oldVal = DatabaseDescriptor.getCDCBlockWrites();
+ CommitLogSegment currentSegment = segmentManager.allocatingFrom();
+ // Update the current segment CDC state to PERMITTED if block_writes
is disabled now, and it was in FORBIDDEN state
+ if (!val && currentSegment.getCDCState() ==
CommitLogSegment.CDCState.FORBIDDEN)
+ currentSegment.setCDCState(CommitLogSegment.CDCState.PERMITTED);
+ DatabaseDescriptor.setCDCBlockWrites(val);
+ logger.info("Updated CDC block_writes from {} to {}", oldVal, val);
+ }
+
/**
* Shuts down the threads used by the commit log, blocking until
completion.
* TODO this should accept a timeout, and throw TimeoutException
diff --git
a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 9e95658..82207ee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -35,6 +35,7 @@ import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
@@ -51,6 +52,7 @@ public class CommitLogDescriptor
private static final String SEPARATOR = "-";
private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
private static final String FILENAME_EXTENSION = ".log";
+ private static final String INDEX_FILENAME_SUFFIX = "_cdc.idx";
// match both legacy and new version of commitlogs Ex: CommitLog-12345.log
and CommitLog-4-12345.log.
private static final Pattern COMMIT_LOG_FILE_PATTERN =
Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" +
FILENAME_EXTENSION);
@@ -220,7 +222,21 @@ public class CommitLogDescriptor
public String cdcIndexFileName()
{
- return FILENAME_PREFIX + version + SEPARATOR + id + "_cdc.idx";
+ return FILENAME_PREFIX + version + SEPARATOR + id +
INDEX_FILENAME_SUFFIX;
+ }
+
+ /**
+ * Infer the corresponding cdc index file using its cdc commitlog file
+ * @param cdcCommitLogSegment
+ * @return cdc index file or null if the cdc index file cannot be inferred.
+ */
+ public static File inferCdcIndexFile(File cdcCommitLogSegment)
+ {
+ if (!isValid(cdcCommitLogSegment.name()))
+ return null;
+ String cdcFileName = cdcCommitLogSegment.name();
+ String indexFileName = cdcFileName.substring(0, cdcFileName.length() -
FILENAME_EXTENSION.length()) + INDEX_FILENAME_SUFFIX;
+ return new File(DatabaseDescriptor.getCDCLogLocation(), indexFileName);
}
/**
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index 3b20bbc..7e8deca 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -84,4 +84,8 @@ public interface CommitLogMBean
* @return A map between active log segments and the compression ratio
achieved for each.
*/
public Map<String, Double> getActiveSegmentCompressionRatios();
+
+ public boolean getCDCBlockWrites();
+
+ public void setCDCBlockWrites(boolean val);
}
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 45678f5..06218f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -69,7 +69,7 @@ public abstract class CommitLogSegment
FORBIDDEN,
CONTAINS
}
- Object cdcStateLock = new Object();
+ final Object cdcStateLock = new Object();
private final static AtomicInteger nextId = new AtomicInteger(1);
private static long replayLimitId;
diff --git
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
index 6f6a1c2..4a2ddf2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -19,13 +19,16 @@
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
-import java.nio.file.FileVisitResult;
import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
@@ -67,20 +70,52 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
cdcSizeTracker.processDiscardedSegment(segment);
if (delete)
- FileUtils.deleteWithConfirm(segment.logFile);
+ segment.logFile.delete();
if (segment.getCDCState() != CDCState.CONTAINS)
{
// Always delete hard-link from cdc folder if this segment didn't
contain CDC data. Note: File may not exist
// if processing discard during startup.
File cdcLink = segment.getCDCFile();
- if (cdcLink.exists())
- FileUtils.deleteWithConfirm(cdcLink);
-
File cdcIndexFile = segment.getCDCIndexFile();
- if (cdcIndexFile.exists())
- FileUtils.deleteWithConfirm(cdcIndexFile);
+ deleteCDCFiles(cdcLink, cdcIndexFile);
+ }
+ }
+
+ /**
+ * Delete the oldest hard-linked CDC commit log segment to free up space.
+ * @return total deleted file size in bytes
+ */
+ public long deleteOldestLinkedCDCCommitLogSegment()
+ {
+ File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
+ Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does
not exist.");
+ File[] files = cdcDir.tryList(f ->
CommitLogDescriptor.isValid(f.name()));
+ Preconditions.checkState(files != null && files.length > 0,
+ "There should be at least 1 CDC commit log
segment.");
+ List<File> sorted = Arrays.stream(files)
+
.sorted(Comparator.comparingLong(File::lastModified))
+ .collect(Collectors.toList());
+ File oldestCdcFile = sorted.get(0);
+ File cdcIndexFile =
CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile);
+ return deleteCDCFiles(oldestCdcFile, cdcIndexFile);
+ }
+
+ private long deleteCDCFiles(File cdcLink, File cdcIndexFile)
+ {
+ long total = 0;
+ if (cdcLink != null && cdcLink.exists())
+ {
+ total += cdcLink.length();
+ cdcLink.delete();
+ }
+
+ if (cdcIndexFile != null && cdcIndexFile.exists())
+ {
+ total += cdcIndexFile.length();
+ cdcIndexFile.delete();
}
+ return total;
}
/**
@@ -170,7 +205,7 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
if (cdcFile.exists() && !cdcIndexFile.exists())
{
logger.trace("(Unopened) CDC segment {} is no longer needed and
will be deleted now", cdcFile);
- FileUtils.deleteWithConfirm(cdcFile);
+ cdcFile.delete();
}
}
@@ -193,15 +228,15 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
{
private final RateLimiter rateLimiter = RateLimiter.create(1000.0 /
DatabaseDescriptor.getCDCDiskCheckInterval());
private ExecutorService cdcSizeCalculationExecutor;
- private CommitLogSegmentManagerCDC segmentManager;
-
- // Used instead of size during walk to remove chance of over-allocation
- private volatile long sizeInProgress = 0;
+ private final CommitLogSegmentManagerCDC segmentManager;
+ // track the total size between two dictionary size calculations
+ private final AtomicLong sizeInProgress;
CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
{
super(path);
this.segmentManager = segmentManager;
+ this.sizeInProgress = new AtomicLong(0);
}
/**
@@ -209,7 +244,7 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
*/
public void start()
{
- size = 0;
+ sizeInProgress.getAndSet(0);
cdcSizeCalculationExecutor =
executorFactory().configureSequential("CDCSizeCalculationExecutor")
.withRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy())
.withQueueLimit(0)
@@ -221,7 +256,7 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
* Synchronous size recalculation on each segment creation/deletion
call could lead to very long delays in new
* segment allocation, thus long delays in thread signaling to wake
waiting allocation / writer threads.
*
- * This can be reached either from the segment management thread in
ABstractCommitLogSegmentManager or from the
+ * This can be reached either from the segment management thread in
AbstractCommitLogSegmentManager or from the
* size recalculation executor, so we synchronize on this object to
reduce the race overlap window available for
* size to get off.
*
@@ -232,11 +267,26 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
// See synchronization in CommitLogSegment.setCDCState
synchronized(segment.cdcStateLock)
{
- segment.setCDCState(defaultSegmentSize() +
totalCDCSizeOnDisk() > allowableCDCBytes()
+ int segmentSize = defaultSegmentSize();
+ long allowance = allowableCDCBytes();
+ boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+ segment.setCDCState(blocking && segmentSize +
sizeInProgress.get() > allowance
? CDCState.FORBIDDEN
: CDCState.PERMITTED);
+
+ // Remove the oldest cdc segment file when exceeding the CDC
storage allowance
+ while (!blocking && segmentSize + sizeInProgress.get() >
allowance)
+ {
+ long releasedSize =
segmentManager.deleteOldestLinkedCDCCommitLogSegment();
+ sizeInProgress.getAndAdd(-releasedSize);
+ logger.debug("Freed up {} bytes after deleting the oldest
CDC commit log segment in non-blocking mode. " +
+ "Total on-disk CDC size: {}; allowed CDC
size: {}",
+ releasedSize, sizeInProgress.get() +
segmentSize, allowance);
+ }
+
+ // Aggresively count in the (estimated) size of new segments.
if (segment.getCDCState() == CDCState.PERMITTED)
- size += defaultSegmentSize();
+ sizeInProgress.getAndAdd(segmentSize);
}
// Take this opportunity to kick off a recalc to pick up any
consumer file deletion.
@@ -250,9 +300,13 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
{
// Add to flushed size before decrementing unflushed so we
don't have a window of false generosity
if (segment.getCDCState() == CDCState.CONTAINS)
- size += segment.onDiskSize();
+ sizeInProgress.getAndAdd(segment.onDiskSize());
+
+ // Subtract the (estimated) size of the segment from
processNewSegment.
+ // For the segement that CONTAINS, we update with adding the
actual onDiskSize and removing the estimated size.
+ // For the segment that remains in PERMITTED, the file is to
be deleted and the estimate should be returned.
if (segment.getCDCState() != CDCState.FORBIDDEN)
- size -= defaultSegmentSize();
+ sizeInProgress.getAndAdd(-defaultSegmentSize());
}
// Take this opportunity to kick off a recalc to pick up any
consumer file deletion.
@@ -268,7 +322,7 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
{
try
{
- cdcSizeCalculationExecutor.submit(() ->
recalculateOverflowSize());
+
cdcSizeCalculationExecutor.submit(this::recalculateOverflowSize);
}
catch (RejectedExecutionException e)
{
@@ -287,6 +341,8 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
private int defaultSegmentSize()
{
+ // CommitLogSegmentSize is only loaded from yaml.
+ // There is a setter but is used only for testing.
return DatabaseDescriptor.getCommitLogSegmentSize();
}
@@ -294,25 +350,17 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
{
try
{
+ resetSize();
// The Arrays.stream approach is considerably slower on
Windows than linux
- sizeInProgress = 0;
Files.walkFileTree(path.toPath(), this);
- size = sizeInProgress;
+ sizeInProgress.getAndSet(getAllocatedSize());
}
catch (IOException ie)
{
- CommitLog.instance.handleCommitError("Failed CDC Size
Calculation", ie);
+ CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
}
}
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException
- {
- sizeInProgress += attrs.size();
- return FileVisitResult.CONTINUE;
- }
-
-
public void shutdown()
{
if (cdcSizeCalculationExecutor != null &&
!cdcSizeCalculationExecutor.isShutdown())
@@ -323,12 +371,7 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
private void addSize(long toAdd)
{
- size += toAdd;
- }
-
- private long totalCDCSizeOnDisk()
- {
- return size;
+ sizeInProgress.getAndAdd(toAdd);
}
}
@@ -347,6 +390,6 @@ public class CommitLogSegmentManagerCDC extends
AbstractCommitLogSegmentManager
}
catch (InterruptedException e) {}
- return cdcSizeTracker.totalCDCSizeOnDisk();
+ return cdcSizeTracker.getAllocatedSize();
}
}
diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
index 97fc22e..f0cfdea 100644
--- a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
+++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.util.File;
public class DirectorySizeCalculator extends SimpleFileVisitor<Path>
{
- protected volatile long size = 0;
+ private volatile long size = 0;
protected final File path;
public DirectorySizeCalculator(File path)
@@ -63,4 +63,12 @@ public class DirectorySizeCalculator extends
SimpleFileVisitor<Path>
{
return size;
}
+
+ /**
+ * Reset the size to 0 in case that the size calculator is used multiple
times
+ */
+ protected void resetSize()
+ {
+ size = 0;
+ }
}
diff --git
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index 53c6769..87b5fb0 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@ -29,10 +29,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.config.TransparentDataEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileSegmentInputStream;
import org.apache.cassandra.net.MessagingService;
@@ -309,4 +311,19 @@ public class CommitLogDescriptorTest
CommitLogDescriptor desc2 = new
CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression,
enabledEncryption);
Assert.assertEquals(desc1, desc2);
}
+
+ @Test
+ public void testInferCDCIndexFile()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ String fileNameSuffix = "CommitLog-2-1340512736956320000";
+ File validCdcLink = new File(fileNameSuffix + ".log");
+ File inferredIndexFile =
CommitLogDescriptor.inferCdcIndexFile(validCdcLink);
+ Assert.assertNotNull(inferredIndexFile);
+ Assert.assertEquals(fileNameSuffix + "_cdc.idx",
inferredIndexFile.name());
+
+ File invalidCdcLink = new File(fileNameSuffix + ".invalidlog");
+ inferredIndexFile =
CommitLogDescriptor.inferCdcIndexFile(invalidCdcLink);
+ Assert.assertNull(inferredIndexFile);
+ }
}
diff --git
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index cbfdadb..a6e5ab1 100644
---
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -65,31 +65,8 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
@Test
public void testCDCWriteFailure() throws Throwable
{
- createTable("CREATE TABLE %s (idx int, data text, primary key(idx))
WITH cdc=true;");
- CommitLogSegmentManagerCDC cdcMgr =
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
- TableMetadata cfm = currentTableMetadata();
-
- // Confirm that logic to check for whether or not we can allocate new
CDC segments works
- Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
- try
- {
- DatabaseDescriptor.setCDCSpaceInMB(32);
- // Spin until we hit CDC capacity and make sure we get a
CDCWriteException
- try
- {
- // Should trigger on anything < 20:1 compression ratio during
compressed test
- for (int i = 0; i < 100; i++)
- {
- new RowUpdateBuilder(cfm, 0, i)
- .add("data",
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
- .build().apply();
- }
- Assert.fail("Expected CDCWriteException from full CDC but did
not receive it.");
- }
- catch (CDCWriteException e)
- {
- // expected, do nothing
- }
+ testWithCDCSpaceInMb(32, () -> {
+ createTableAndBulkWrite();
expectCurrentCDCState(CDCState.FORBIDDEN);
// Confirm we can create a non-cdc table and write to it even
while at cdc capacity
@@ -97,6 +74,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
execute("INSERT INTO %s (idx, data) VALUES (1, '1');");
// Confirm that, on flush+recyle, we see files show up in cdc_raw
+ CommitLogSegmentManagerCDC cdcMgr =
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
CommitLog.instance.forceRecycleAllSegments();
cdcMgr.awaitManagementTasksCompletion();
@@ -109,57 +87,55 @@ public class CommitLogSegmentManagerCDCTest extends
CQLTester
// Update size tracker to reflect deleted files. Should flip flag
on current allocatingFrom to allow.
cdcMgr.updateCDCTotalSize();
expectCurrentCDCState(CDCState.PERMITTED);
- }
- finally
- {
- DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
- }
+ });
}
@Test
public void testSegmentFlaggingOnCreation() throws Throwable
{
- CommitLogSegmentManagerCDC cdcMgr =
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
- String ct = createTable("CREATE TABLE %s (idx int, data text, primary
key(idx)) WITH cdc=true;");
-
- int origSize = DatabaseDescriptor.getCDCSpaceInMB();
- try
- {
- DatabaseDescriptor.setCDCSpaceInMB(16);
- TableMetadata ccfm =
Keyspace.open(keyspace()).getColumnFamilyStore(ct).metadata();
- // Spin until we hit CDC capacity and make sure we get a
CDCWriteException
- try
- {
- for (int i = 0; i < 1000; i++)
- {
- new RowUpdateBuilder(ccfm, 0, i)
- .add("data",
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
- .build().apply();
- }
- Assert.fail("Expected CDCWriteException from full CDC but did
not receive it.");
- }
- catch (CDCWriteException e) { }
-
- expectCurrentCDCState(CDCState.FORBIDDEN);
- CommitLog.instance.forceRecycleAllSegments();
+ testSegmentFlaggingOnCreation0();
+ }
- cdcMgr.awaitManagementTasksCompletion();
- // Delete all files in cdc_raw
- for (File f : new
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
- f.tryDelete();
- cdcMgr.updateCDCTotalSize();
- // Confirm cdc update process changes flag on active segment
- expectCurrentCDCState(CDCState.PERMITTED);
+ @Test
+ public void testSegmentFlaggingWithNonblockingOnCreation() throws Throwable
+ {
+ testWithNonblockingMode(this::testSegmentFlaggingOnCreation0);
+ }
- // Clear out archived CDC files
- for (File f : new
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
- FileUtils.deleteWithConfirm(f);
- }
- }
- finally
- {
- DatabaseDescriptor.setCDCSpaceInMB(origSize);
+ @Test
+ public void testNonblockingShouldMaintainSteadyDiskUsage() throws Throwable
+ {
+ final int commitlogSize = DatabaseDescriptor.getCommitLogSegmentSize()
/ 1024 / 1024;
+ final int cdcSizeLimit = commitlogSize + 1;
+ // Clear out all CDC files
+ for (File f : new
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
+ FileUtils.deleteWithConfirm(f);
}
+ testWithNonblockingMode(() -> testWithCDCSpaceInMb(cdcSizeLimit, () ->
{
+ CommitLogSegmentManagerCDC cdcMgr =
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+ Assert.assertEquals(0, cdcMgr.updateCDCTotalSize());
+
+ createTableAndBulkWrite();
+
+ // Only the current commit log will be kept.
+ // The older ones are deleted immediately on creating a new
segment due to exceeding size limit.
+ long actualSize = cdcMgr.updateCDCTotalSize();
+ Assert.assertTrue(actualSize <= cdcSizeLimit * 1024 * 1024);
+ Assert.assertTrue(actualSize >=
DatabaseDescriptor.getCommitLogSegmentSize());
+ }));
+ }
+
+ @Test // switch from blocking to nonblocking, then back to blocking
+ public void testSwitchingCDCWriteModes() throws Throwable
+ {
+ String tableName = createTableAndBulkWrite();
+ expectCurrentCDCState(CDCState.FORBIDDEN);
+ testWithNonblockingMode(() -> {
+ bulkWrite(tableName);
+ expectCurrentCDCState(CDCState.CONTAINS);
+ });
+ bulkWrite(tableName);
+ expectCurrentCDCState(CDCState.FORBIDDEN);
}
@Test
@@ -187,30 +163,12 @@ public class CommitLogSegmentManagerCDCTest extends
CQLTester
}
@Test
- public void testCompletedFlag() throws IOException
+ public void testCompletedFlag() throws Throwable
{
- createTable("CREATE TABLE %s (idx int, data text, primary key(idx))
WITH cdc=true;");
+ String tableName = createTable("CREATE TABLE %s (idx int, data text,
primary key(idx)) WITH cdc=true;");
CommitLogSegment initialSegment =
CommitLog.instance.segmentManager.allocatingFrom();
- Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
- DatabaseDescriptor.setCDCSpaceInMB(8);
- try
- {
- for (int i = 0; i < 1000; i++)
- {
- new RowUpdateBuilder(currentTableMetadata(), 0, 1)
- .add("data",
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
- .build().apply();
- }
- }
- catch (CDCWriteException ce)
- {
- // pass. Expected since we'll have a file or two linked on restart
of CommitLog due to replay
- }
- finally
- {
- DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
- }
+ testWithCDCSpaceInMb(8, () -> bulkWrite(tableName));
CommitLog.instance.forceRecycleAllSegments();
@@ -280,32 +238,10 @@ public class CommitLogSegmentManagerCDCTest extends
CQLTester
}
@Test
- public void testReplayLogic() throws IOException
+ public void testReplayLogic() throws Throwable
{
- // Assert.assertEquals(0, new
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
- String table_name = createTable("CREATE TABLE %s (idx int, data text,
primary key(idx)) WITH cdc=true;");
- Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
-
- DatabaseDescriptor.setCDCSpaceInMB(8);
- TableMetadata ccfm =
Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
- try
- {
- for (int i = 0; i < 1000; i++)
- {
- new RowUpdateBuilder(ccfm, 0, i)
- .add("data",
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
- .build().apply();
- }
- Assert.fail("Expected CDCWriteException from full CDC but did not
receive it.");
- }
- catch (CDCWriteException e)
- {
- // pass
- }
- finally
- {
- DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
- }
+ // Assert.assertEquals(0, new
File(DatabaseDescriptor.getCDCLogLocation()).tryList().length);
+ testWithCDCSpaceInMb(8, this::createTableAndBulkWrite);
CommitLog.instance.sync(true);
CommitLog.instance.stopUnsafe(false);
@@ -449,4 +385,102 @@ public class CommitLogSegmentManagerCDCTest extends
CQLTester
expectedState, currentState));
}
}
+
+ private void testWithNonblockingMode(Testable test) throws Throwable
+ {
+ boolean original = DatabaseDescriptor.getCDCBlockWrites();
+ CommitLog.instance.setCDCBlockWrites(false);
+ try
+ {
+ test.run();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ CommitLog.instance.setCDCBlockWrites(original);
+ }
+ }
+
+ private void testWithCDCSpaceInMb(int size, Testable test) throws Throwable
+ {
+ int origSize = DatabaseDescriptor.getCDCSpaceInMB();
+ DatabaseDescriptor.setCDCSpaceInMB(size);
+ try
+ {
+ test.run();
+ }
+ finally
+ {
+ DatabaseDescriptor.setCDCSpaceInMB(origSize);
+ }
+ }
+
+ private String createTableAndBulkWrite() throws Throwable
+ {
+ String tableName = createTable("CREATE TABLE %s (idx int, data text,
primary key(idx)) WITH cdc=true;");
+ bulkWrite(tableName);
+ return tableName;
+ }
+
+ private void bulkWrite(String tableName) throws Throwable
+ {
+ TableMetadata ccfm =
Keyspace.open(keyspace()).getColumnFamilyStore(tableName).metadata();
+ boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites();
+ // Spin to make sure we hit CDC capacity
+ try
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ new RowUpdateBuilder(ccfm, 0, i)
+ .add("data",
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().applyFuture().get();
+ }
+ if (blockWrites)
+ Assert.fail("Expected CDCWriteException from full CDC but did
not receive it.");
+ }
+ catch (CDCWriteException e)
+ {
+ if (!blockWrites)
+ Assert.fail("Excepted no CDCWriteException when not blocking
writes but received it.");
+ }
+ }
+
+ private void testSegmentFlaggingOnCreation0() throws Throwable
+ {
+ testWithCDCSpaceInMb(16, () -> {
+ boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites();
+
+ createTableAndBulkWrite();
+
+ CommitLogSegmentManagerCDC cdcMgr =
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+ expectCurrentCDCState(blockWrites? CDCState.FORBIDDEN :
CDCState.CONTAINS);
+
+ // When block writes, releasing CDC commit logs should update the
CDC state to PERMITTED
+ if (blockWrites)
+ {
+ CommitLog.instance.forceRecycleAllSegments();
+
+ cdcMgr.awaitManagementTasksCompletion();
+ // Delete all files in cdc_raw
+ for (File f : new
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
+ f.delete();
+ cdcMgr.updateCDCTotalSize();
+ // Confirm cdc update process changes flag on active segment
+ expectCurrentCDCState(CDCState.PERMITTED);
+ }
+
+ // Clear out archived CDC files
+ for (File f : new
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
+ FileUtils.deleteWithConfirm(f);
+ }
+ });
+ }
+
+ private interface Testable
+ {
+ void run() throws Throwable;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]