This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9b7c1e  Allow CommitLogSegmentReader to optionally skip sync marker 
CRC checks
f9b7c1e is described below

commit f9b7c1e6984f5b81aae1e3a2191d4e9599db15ae
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Jan 11 10:55:44 2021 +0100

    Allow CommitLogSegmentReader to optionally skip sync marker CRC checks
    
    patch by Caleb Rackliffe; reviewed by Josh McKenzie for CASSANDRA-16842
    
    Co-authored-by: Jordan West <[email protected]>
    Co-authored-by: Caleb Rackliffe <[email protected]>
    Co-authored-by: Marcus Eriksson <[email protected]>
---
 CHANGES.txt                                        |   1 +
 .../db/commitlog/CommitLogSegmentReader.java       |  29 +++++
 .../cassandra/db/commitlog/CommitLogTest.java      | 128 +++++++++++++++++----
 3 files changed, 137 insertions(+), 21 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a9c8ebd..be3ea40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks 
(CASSANDRA-16842)
  * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
  * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
  * Implement nodetool getauditlog command (CASSANDRA-16725)
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
index e23a915..33e70c1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
@@ -26,6 +26,10 @@ import javax.crypto.Cipher;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.AbstractIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
 import 
org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
 import org.apache.cassandra.db.commitlog.CommitLogReadHandler.*;
 import org.apache.cassandra.io.FSReadError;
@@ -46,6 +50,11 @@ import static 
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
  */
 public class CommitLogSegmentReader implements 
Iterable<CommitLogSegmentReader.SyncSegment>
 {
+    public static final String ALLOW_IGNORE_SYNC_CRC = Config.PROPERTY_PREFIX 
+ "commitlog.allow_ignore_sync_crc";
+    private static volatile boolean allowSkipSyncMarkerCrc = 
Boolean.getBoolean(ALLOW_IGNORE_SYNC_CRC);
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CommitLogSegmentReader.class);
+    
     private final CommitLogReadHandler handler;
     private final CommitLogDescriptor descriptor;
     private final RandomAccessReader reader;
@@ -75,6 +84,11 @@ public class CommitLogSegmentReader implements 
Iterable<CommitLogSegmentReader.S
         else
             segmenter = new NoOpSegmenter(reader);
     }
+    
+    public static void setAllowSkipSyncMarkerCrc(boolean allow)
+    {
+        allowSkipSyncMarkerCrc = allow;
+    }
 
     public Iterator<SyncSegment> iterator()
     {
@@ -151,8 +165,23 @@ public class CommitLogSegmentReader implements 
Iterable<CommitLogSegmentReader.S
         updateChecksumInt(crc, (int) reader.getPosition());
         final int end = reader.readInt();
         long filecrc = reader.readInt() & 0xffffffffL;
+
         if (crc.getValue() != filecrc)
         {
+            // The next marker position and CRC value are not written 
atomically, so it is possible for the latter to 
+            // still be zero after the former has been finalized, even though 
the mutations that follow it are valid.
+            // When there is no compression or encryption enabled, we can 
ignore a sync marker CRC mismatch and defer 
+            // to the per-mutation CRCs, which may be preferable to preventing 
startup altogether.
+            if (allowSkipSyncMarkerCrc
+                && descriptor.compression == null && 
!descriptor.getEncryptionContext().isEnabled()
+                && filecrc == 0 && end != 0)
+            {
+                logger.warn("Skipping sync marker CRC check at position {} 
(end={}, calculated crc={}) of commit log {}." +
+                            "Using per-mutation CRC checks to ensure 
correctness...",
+                            offset, end, crc.getValue(), reader.getPath());
+                return end;
+            }
+
             if (end != 0 || filecrc != 0)
             {
                 String msg = String.format("Encountered bad header at position 
%d of commit log %s, with invalid CRC. " +
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index da3b83e..ee56efe 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -29,10 +29,16 @@ import java.util.stream.Collectors;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import javax.crypto.Cipher;
+
 import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -41,6 +47,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.io.compress.ZstdCompressor;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -61,6 +68,7 @@ import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.security.CipherFactory;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.EncryptionContextGenerator;
 import org.apache.cassandra.service.StorageService;
@@ -79,6 +87,7 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.MapType;
@@ -105,15 +114,25 @@ public abstract class CommitLogTest
     }
 
     @Parameters()
-    public static Collection<Object[]> generateData()
+    public static Collection<Object[]> generateData() throws Exception
+    {
+        return Arrays.asList(new Object[][]
+        {
+            { null, EncryptionContextGenerator.createDisabledContext()}, // No 
compression, no encryption
+            { null, newEncryptionContext() }, // Encryption
+            { new ParameterizedClass(LZ4Compressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext() },
+            { new ParameterizedClass(SnappyCompressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            { new ParameterizedClass(DeflateCompressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
+            { new ParameterizedClass(ZstdCompressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}
+        });
+    }
+
+    private static EncryptionContext newEncryptionContext() throws Exception
     {
-        return Arrays.asList(new Object[][]{
-            {null, EncryptionContextGenerator.createDisabledContext()}, // No 
compression, no encryption
-            {null, EncryptionContextGenerator.createContext(true)}, // 
Encryption
-            {new ParameterizedClass(LZ4Compressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(SnappyCompressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(DeflateCompressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
-            {new ParameterizedClass(ZstdCompressor.class.getName(), 
Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+        EncryptionContext context = 
EncryptionContextGenerator.createContext(true);
+        CipherFactory cipherFactory = new 
CipherFactory(context.getTransparentDataEncryptionOptions());
+        Cipher cipher = 
cipherFactory.getEncryptor(context.getTransparentDataEncryptionOptions().cipher,
 context.getTransparentDataEncryptionOptions().key_alias);
+        return EncryptionContextGenerator.createContext(cipher.getIV(), true);
     }
 
     public static void beforeClass() throws ConfigurationException
@@ -167,6 +186,7 @@ public abstract class CommitLogTest
     @After
     public void afterTest()
     {
+        CommitLogSegmentReader.setAllowSkipSyncMarkerCrc(false);
         testKiller.reset();
     }
 
@@ -174,10 +194,8 @@ public abstract class CommitLogTest
     public void testRecoveryWithEmptyLog() throws Exception
     {
         runExpecting(() -> {
-            CommitLog.instance.recoverFiles(new File[]{
-            tmpFile(CommitLogDescriptor.current_version),
-            tmpFile(CommitLogDescriptor.current_version)
-            });
+            
CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.current_version),
+                                            
tmpFile(CommitLogDescriptor.current_version));
             return null;
         }, CommitLogReplayException.class);
     }
@@ -196,6 +214,8 @@ public abstract class CommitLogTest
     @Test
     public void testHeaderOnlyFileFiltering() throws Exception
     {
+        
Assume.assumeTrue(!DatabaseDescriptor.getEncryptionContext().isEnabled());
+        
         File directory = Files.createTempDir();
 
         CommitLogDescriptor desc1 = new 
CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, 
DatabaseDescriptor.getEncryptionContext());
@@ -470,7 +490,7 @@ public abstract class CommitLogTest
         }
         catch (MutationExceededMaxSizeException e)
         {
-            Assert.assertEquals(cnt + 1, 
CommitLog.instance.metrics.oversizedMutations.getCount());
+            assertEquals(cnt + 1, 
CommitLog.instance.metrics.oversizedMutations.getCount());
             throw e;
         }
         throw new AssertionError("mutation larger than limit was accepted");
@@ -655,20 +675,18 @@ public abstract class CommitLogTest
             caught = t;
         }
         if (expected != null && caught == null)
-            Assert.fail("Expected exception " + expected + " but call 
completed successfully.");
+            fail("Expected exception " + expected + " but call completed 
successfully.");
 
         assertEquals("JVM kill state doesn't match expectation.", expected != 
null, testKiller.wasKilled());
     }
 
     protected void testRecovery(final byte[] logData, Class<?> expected) 
throws Exception
     {
-        ParameterizedClass commitLogCompression = 
DatabaseDescriptor.getCommitLogCompression();
-        EncryptionContext encryptionContext = 
DatabaseDescriptor.getEncryptionContext();
         runExpecting(() -> testRecovery(logData, 
CommitLogDescriptor.current_version), expected);
     }
 
     @Test
-    public void testTruncateWithoutSnapshot() throws ExecutionException, 
InterruptedException, IOException
+    public void testTruncateWithoutSnapshot()
     {
         boolean originalState = DatabaseDescriptor.isAutoSnapshot();
         try
@@ -765,6 +783,74 @@ public abstract class CommitLogTest
     }
 
     @Test
+    public void replayWithBadSyncMarkerCRC() throws IOException
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        Mutation rm2 = new RowUpdateBuilder(cfs.metadata(), 0, 
"k2").clustering("bytes")
+                                                                    
.add("val", bytes("this is a string"))
+                                                                    .build();
+        CommitLog.instance.add(rm2);
+        CommitLog.instance.sync(true);
+
+        List<String> activeSegments = 
CommitLog.instance.getActiveSegmentNames();
+        assertFalse(activeSegments.isEmpty());
+
+        File directory = new 
File(CommitLog.instance.segmentManager.storageDirectory);
+        File firstActiveFile = 
Objects.requireNonNull(directory.listFiles((file, name) -> 
activeSegments.contains(name)))[0];
+        zeroFirstSyncMarkerCRC(firstActiveFile);
+
+        CommitLogSegmentReader.setAllowSkipSyncMarkerCrc(true);
+
+        if (DatabaseDescriptor.getCommitLogCompression() != null || 
DatabaseDescriptor.getEncryptionContext().isEnabled())
+        {
+            // If compression or encryption are enabled, expect an error, and 
do not attempt to replay using only mutation CRCs.
+            runExpecting(() ->
+                         {
+                             CommitLog.instance.recoverFiles(firstActiveFile);
+                             return null;
+                         },
+                         CommitLogReplayException.class);
+        }
+        else
+        {
+            SimpleCountingReplayer replayer = new 
SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, 
cfs.metadata());
+            replayer.replayPath(firstActiveFile, false);
+            assertEquals(1, replayer.cells);
+        }
+    }
+
+    private void zeroFirstSyncMarkerCRC(File file) throws IOException
+    {
+        // Get the position of the first sync marker...
+        int firstSyncMarkerPosition = -1;
+
+        try (RandomAccessReader reader = RandomAccessReader.open(file))
+        {
+            CommitLogDescriptor.readHeader(reader, 
DatabaseDescriptor.getEncryptionContext());
+            firstSyncMarkerPosition = (int) reader.getFilePointer();
+        }
+
+        // ...buffer the file into memory...
+        ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
+
+        try (RandomAccessReader reader = RandomAccessReader.open(file))
+        {
+            reader.readFully(buffer);
+        }
+
+        // ...overwrite the sync marker CRC with zero...
+        buffer.position(firstSyncMarkerPosition + 4);
+        buffer.putInt(0);
+
+        // ...and write the file back out.
+        try (OutputStream out = new FileOutputStream(file))
+        {
+            out.write(buffer.array());
+        }
+    }
+
+    @Test
     public void replayWithDiscard() throws IOException
     {
         int cellCount = 0;
@@ -886,7 +972,7 @@ public abstract class CommitLogTest
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + 
STANDARD1);
         // Currently we don't attempt to re-flush a memtable that failed, thus 
make sure data is replayed by commitlog.
         // If retries work subsequent flushes should clear up error and this 
should change to expect 0.
-        Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+        assertEquals(1, CommitLog.instance.resetUnsafe(false));
         System.clearProperty("cassandra.replayList");
     }
 
@@ -921,7 +1007,7 @@ public abstract class CommitLogTest
         // In the absence of error, this should be 0 because 
forceBlockingFlush/forceRecycleAllSegments would have
         // persisted all data in the commit log. Because we know there was an 
error, there must be something left to
         // replay.
-        Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+        assertEquals(1, CommitLog.instance.resetUnsafe(false));
         System.clearProperty("cassandra.replayList");
     }
 
@@ -1000,7 +1086,7 @@ public abstract class CommitLogTest
             
System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
         }
 
-        Assert.assertEquals(replayed, 1);
+        assertEquals(replayed, 1);
     }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to