This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f9b7c1e Allow CommitLogSegmentReader to optionally skip sync marker
CRC checks
f9b7c1e is described below
commit f9b7c1e6984f5b81aae1e3a2191d4e9599db15ae
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Jan 11 10:55:44 2021 +0100
Allow CommitLogSegmentReader to optionally skip sync marker CRC checks
patch by Caleb Rackliffe; reviewed by Josh McKenzie for CASSANDRA-16842
Co-authored-by: Jordan West <[email protected]>
Co-authored-by: Caleb Rackliffe <[email protected]>
Co-authored-by: Marcus Eriksson <[email protected]>
---
CHANGES.txt | 1 +
.../db/commitlog/CommitLogSegmentReader.java | 29 +++++
.../cassandra/db/commitlog/CommitLogTest.java | 128 +++++++++++++++++----
3 files changed, 137 insertions(+), 21 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a9c8ebd..be3ea40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks
(CASSANDRA-16842)
* allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
* Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
* Implement nodetool getauditlog command (CASSANDRA-16725)
diff --git
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
index e23a915..33e70c1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
@@ -26,6 +26,10 @@ import javax.crypto.Cipher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
import
org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler.*;
import org.apache.cassandra.io.FSReadError;
@@ -46,6 +50,11 @@ import static
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
*/
public class CommitLogSegmentReader implements
Iterable<CommitLogSegmentReader.SyncSegment>
{
+ public static final String ALLOW_IGNORE_SYNC_CRC = Config.PROPERTY_PREFIX
+ "commitlog.allow_ignore_sync_crc";
+ private static volatile boolean allowSkipSyncMarkerCrc =
Boolean.getBoolean(ALLOW_IGNORE_SYNC_CRC);
+
+ private static final Logger logger =
LoggerFactory.getLogger(CommitLogSegmentReader.class);
+
private final CommitLogReadHandler handler;
private final CommitLogDescriptor descriptor;
private final RandomAccessReader reader;
@@ -75,6 +84,11 @@ public class CommitLogSegmentReader implements
Iterable<CommitLogSegmentReader.S
else
segmenter = new NoOpSegmenter(reader);
}
+
+ public static void setAllowSkipSyncMarkerCrc(boolean allow)
+ {
+ allowSkipSyncMarkerCrc = allow;
+ }
public Iterator<SyncSegment> iterator()
{
@@ -151,8 +165,23 @@ public class CommitLogSegmentReader implements
Iterable<CommitLogSegmentReader.S
updateChecksumInt(crc, (int) reader.getPosition());
final int end = reader.readInt();
long filecrc = reader.readInt() & 0xffffffffL;
+
if (crc.getValue() != filecrc)
{
+ // The next marker position and CRC value are not written
atomically, so it is possible for the latter to
+ // still be zero after the former has been finalized, even though
the mutations that follow it are valid.
+ // When there is no compression or encryption enabled, we can
ignore a sync marker CRC mismatch and defer
+ // to the per-mutation CRCs, which may be preferable to preventing
startup altogether.
+ if (allowSkipSyncMarkerCrc
+ && descriptor.compression == null &&
!descriptor.getEncryptionContext().isEnabled()
+ && filecrc == 0 && end != 0)
+ {
+ logger.warn("Skipping sync marker CRC check at position {}
(end={}, calculated crc={}) of commit log {}." +
+ "Using per-mutation CRC checks to ensure
correctness...",
+ offset, end, crc.getValue(), reader.getPath());
+ return end;
+ }
+
if (end != 0 || filecrc != 0)
{
String msg = String.format("Encountered bad header at position
%d of commit log %s, with invalid CRC. " +
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index da3b83e..ee56efe 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -29,10 +29,16 @@ import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import javax.crypto.Cipher;
+
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -41,6 +47,7 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.io.compress.ZstdCompressor;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -61,6 +68,7 @@ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.security.CipherFactory;
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.security.EncryptionContextGenerator;
import org.apache.cassandra.service.StorageService;
@@ -79,6 +87,7 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.MapType;
@@ -105,15 +114,25 @@ public abstract class CommitLogTest
}
@Parameters()
- public static Collection<Object[]> generateData()
+ public static Collection<Object[]> generateData() throws Exception
+ {
+ return Arrays.asList(new Object[][]
+ {
+ { null, EncryptionContextGenerator.createDisabledContext()}, // No
compression, no encryption
+ { null, newEncryptionContext() }, // Encryption
+ { new ParameterizedClass(LZ4Compressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext() },
+ { new ParameterizedClass(SnappyCompressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+ { new ParameterizedClass(DeflateCompressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+ { new ParameterizedClass(ZstdCompressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}
+ });
+ }
+
+ private static EncryptionContext newEncryptionContext() throws Exception
{
- return Arrays.asList(new Object[][]{
- {null, EncryptionContextGenerator.createDisabledContext()}, // No
compression, no encryption
- {null, EncryptionContextGenerator.createContext(true)}, //
Encryption
- {new ParameterizedClass(LZ4Compressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
- {new ParameterizedClass(SnappyCompressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
- {new ParameterizedClass(DeflateCompressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
- {new ParameterizedClass(ZstdCompressor.class.getName(),
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ EncryptionContext context =
EncryptionContextGenerator.createContext(true);
+ CipherFactory cipherFactory = new
CipherFactory(context.getTransparentDataEncryptionOptions());
+ Cipher cipher =
cipherFactory.getEncryptor(context.getTransparentDataEncryptionOptions().cipher,
context.getTransparentDataEncryptionOptions().key_alias);
+ return EncryptionContextGenerator.createContext(cipher.getIV(), true);
}
public static void beforeClass() throws ConfigurationException
@@ -167,6 +186,7 @@ public abstract class CommitLogTest
@After
public void afterTest()
{
+ CommitLogSegmentReader.setAllowSkipSyncMarkerCrc(false);
testKiller.reset();
}
@@ -174,10 +194,8 @@ public abstract class CommitLogTest
public void testRecoveryWithEmptyLog() throws Exception
{
runExpecting(() -> {
- CommitLog.instance.recoverFiles(new File[]{
- tmpFile(CommitLogDescriptor.current_version),
- tmpFile(CommitLogDescriptor.current_version)
- });
+
CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.current_version),
+
tmpFile(CommitLogDescriptor.current_version));
return null;
}, CommitLogReplayException.class);
}
@@ -196,6 +214,8 @@ public abstract class CommitLogTest
@Test
public void testHeaderOnlyFileFiltering() throws Exception
{
+
Assume.assumeTrue(!DatabaseDescriptor.getEncryptionContext().isEnabled());
+
File directory = Files.createTempDir();
CommitLogDescriptor desc1 = new
CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null,
DatabaseDescriptor.getEncryptionContext());
@@ -470,7 +490,7 @@ public abstract class CommitLogTest
}
catch (MutationExceededMaxSizeException e)
{
- Assert.assertEquals(cnt + 1,
CommitLog.instance.metrics.oversizedMutations.getCount());
+ assertEquals(cnt + 1,
CommitLog.instance.metrics.oversizedMutations.getCount());
throw e;
}
throw new AssertionError("mutation larger than limit was accepted");
@@ -655,20 +675,18 @@ public abstract class CommitLogTest
caught = t;
}
if (expected != null && caught == null)
- Assert.fail("Expected exception " + expected + " but call
completed successfully.");
+ fail("Expected exception " + expected + " but call completed
successfully.");
assertEquals("JVM kill state doesn't match expectation.", expected !=
null, testKiller.wasKilled());
}
protected void testRecovery(final byte[] logData, Class<?> expected)
throws Exception
{
- ParameterizedClass commitLogCompression =
DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext encryptionContext =
DatabaseDescriptor.getEncryptionContext();
runExpecting(() -> testRecovery(logData,
CommitLogDescriptor.current_version), expected);
}
@Test
- public void testTruncateWithoutSnapshot() throws ExecutionException,
InterruptedException, IOException
+ public void testTruncateWithoutSnapshot()
{
boolean originalState = DatabaseDescriptor.isAutoSnapshot();
try
@@ -765,6 +783,74 @@ public abstract class CommitLogTest
}
@Test
+ public void replayWithBadSyncMarkerCRC() throws IOException
+ {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ Mutation rm2 = new RowUpdateBuilder(cfs.metadata(), 0,
"k2").clustering("bytes")
+
.add("val", bytes("this is a string"))
+ .build();
+ CommitLog.instance.add(rm2);
+ CommitLog.instance.sync(true);
+
+ List<String> activeSegments =
CommitLog.instance.getActiveSegmentNames();
+ assertFalse(activeSegments.isEmpty());
+
+ File directory = new
File(CommitLog.instance.segmentManager.storageDirectory);
+ File firstActiveFile =
Objects.requireNonNull(directory.listFiles((file, name) ->
activeSegments.contains(name)))[0];
+ zeroFirstSyncMarkerCRC(firstActiveFile);
+
+ CommitLogSegmentReader.setAllowSkipSyncMarkerCrc(true);
+
+ if (DatabaseDescriptor.getCommitLogCompression() != null ||
DatabaseDescriptor.getEncryptionContext().isEnabled())
+ {
+ // If compression or encryption are enabled, expect an error, and
do not attempt to replay using only mutation CRCs.
+ runExpecting(() ->
+ {
+ CommitLog.instance.recoverFiles(firstActiveFile);
+ return null;
+ },
+ CommitLogReplayException.class);
+ }
+ else
+ {
+ SimpleCountingReplayer replayer = new
SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE,
cfs.metadata());
+ replayer.replayPath(firstActiveFile, false);
+ assertEquals(1, replayer.cells);
+ }
+ }
+
+ private void zeroFirstSyncMarkerCRC(File file) throws IOException
+ {
+ // Get the position of the first sync marker...
+ int firstSyncMarkerPosition = -1;
+
+ try (RandomAccessReader reader = RandomAccessReader.open(file))
+ {
+ CommitLogDescriptor.readHeader(reader,
DatabaseDescriptor.getEncryptionContext());
+ firstSyncMarkerPosition = (int) reader.getFilePointer();
+ }
+
+ // ...buffer the file into memory...
+ ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
+
+ try (RandomAccessReader reader = RandomAccessReader.open(file))
+ {
+ reader.readFully(buffer);
+ }
+
+ // ...overwrite the sync marker CRC with zero...
+ buffer.position(firstSyncMarkerPosition + 4);
+ buffer.putInt(0);
+
+ // ...and write the file back out.
+ try (OutputStream out = new FileOutputStream(file))
+ {
+ out.write(buffer.array());
+ }
+ }
+
+ @Test
public void replayWithDiscard() throws IOException
{
int cellCount = 0;
@@ -886,7 +972,7 @@ public abstract class CommitLogTest
System.setProperty("cassandra.replayList", KEYSPACE1 + "." +
STANDARD1);
// Currently we don't attempt to re-flush a memtable that failed, thus
make sure data is replayed by commitlog.
// If retries work subsequent flushes should clear up error and this
should change to expect 0.
- Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+ assertEquals(1, CommitLog.instance.resetUnsafe(false));
System.clearProperty("cassandra.replayList");
}
@@ -921,7 +1007,7 @@ public abstract class CommitLogTest
// In the absence of error, this should be 0 because
forceBlockingFlush/forceRecycleAllSegments would have
// persisted all data in the commit log. Because we know there was an
error, there must be something left to
// replay.
- Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+ assertEquals(1, CommitLog.instance.resetUnsafe(false));
System.clearProperty("cassandra.replayList");
}
@@ -1000,7 +1086,7 @@ public abstract class CommitLogTest
System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
}
- Assert.assertEquals(replayed, 1);
+ assertEquals(replayed, 1);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]