Repository: cassandra Updated Branches: refs/heads/trunk 7226ac9e6 -> 7374e9b5a
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 6bcec96..4712dff 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -35,8 +35,8 @@ import java.util.UUID; import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; /** @@ -626,4 +626,47 @@ public class ByteBufferUtil return readBytes(bb, length); } + /** + * Ensure {@code buf} is large enough for {@code outputLength}. If not, it is cleaned up and a new buffer is allocated; + * else; buffer has it's position/limit set appropriately. + * + * @param buf buffer to test the size of; may be null, in which case, a new buffer is allocated. + * @param outputLength the minimum target size of the buffer + * @param allowBufferResize true if resizing (reallocating) the buffer is allowed + * @return {@code buf} if it was large enough, else a newly allocated buffer. + */ + public static ByteBuffer ensureCapacity(ByteBuffer buf, int outputLength, boolean allowBufferResize) + { + BufferType bufferType = buf != null ? BufferType.typeOf(buf) : BufferType.ON_HEAP; + return ensureCapacity(buf, outputLength, allowBufferResize, bufferType); + } + + /** + * Ensure {@code buf} is large enough for {@code outputLength}. If not, it is cleaned up and a new buffer is allocated; + * else; buffer has it's position/limit set appropriately. + * + * @param buf buffer to test the size of; may be null, in which case, a new buffer is allocated. + * @param outputLength the minimum target size of the buffer + * @param allowBufferResize true if resizing (reallocating) the buffer is allowed + * @param bufferType on- or off- heap byte buffer + * @return {@code buf} if it was large enough, else a newly allocated buffer. + */ + public static ByteBuffer ensureCapacity(ByteBuffer buf, int outputLength, boolean allowBufferResize, BufferType bufferType) + { + if (0 > outputLength) + throw new IllegalArgumentException("invalid size for output buffer: " + outputLength); + if (buf == null || buf.capacity() < outputLength) + { + if (!allowBufferResize) + throw new IllegalStateException(String.format("output buffer is not large enough for data: current capacity %d, required %d", buf.capacity(), outputLength)); + FileUtils.clean(buf); + buf = bufferType.allocate(outputLength); + } + else + { + buf.position(0).limit(outputLength); + } + return buf; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log b/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log new file mode 100644 index 0000000..3be1fcf Binary files /dev/null and b/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/data/legacy-commitlog/3.4-encrypted/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/3.4-encrypted/hash.txt b/test/data/legacy-commitlog/3.4-encrypted/hash.txt new file mode 100644 index 0000000..d4cca55 --- /dev/null +++ b/test/data/legacy-commitlog/3.4-encrypted/hash.txt @@ -0,0 +1,5 @@ +#CommitLog upgrade test, version 3.4-SNAPSHOT +#Fri Jan 15 20:35:53 PST 2016 +cells=8777 +hash=-542543236 +cfid=9debf690-bc0a-11e5-9ac3-9fafc76bc377 http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index be3abb4..e6f9499 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -37,10 +37,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import junit.framework.Assert; - import com.google.common.util.concurrent.RateLimiter; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -59,6 +58,9 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; + public class CommitLogStressTest { @@ -186,8 +188,8 @@ public class CommitLogStressTest @Test public void testDiscardedRun() throws Exception { - discardedRun = true; randomSize = true; + discardedRun = true; testAllLogConfigs(); } @@ -198,34 +200,43 @@ public class CommitLogStressTest DatabaseDescriptor.setCommitLogSyncBatchWindow(1); DatabaseDescriptor.setCommitLogSyncPeriod(30); DatabaseDescriptor.setCommitLogSegmentSize(32); - for (ParameterizedClass compressor : new ParameterizedClass[] { - null, - new ParameterizedClass("LZ4Compressor", null), - new ParameterizedClass("SnappyCompressor", null), - new ParameterizedClass("DeflateCompressor", null) }) + + // test plain vanilla commit logs (the choice of 98% of users) + testLog(null, EncryptionContextGenerator.createDisabledContext()); + + // test the compression types + testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext()); + testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext()); + testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext()); + + // test the encrypted commit log + testLog(null, EncryptionContextGenerator.createContext(true)); + } + + public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException + { + DatabaseDescriptor.setCommitLogCompression(compression); + DatabaseDescriptor.setEncryptionContext(encryptionContext); + for (CommitLogSync sync : CommitLogSync.values()) { - DatabaseDescriptor.setCommitLogCompression(compressor); - for (CommitLogSync sync : CommitLogSync.values()) - { - DatabaseDescriptor.setCommitLogSync(sync); - CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); - testLog(commitLog); - } + DatabaseDescriptor.setCommitLogSync(sync); + CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); + testLog(commitLog); + assert !failed; } - assert !failed; } - public void testLog(CommitLog commitLog) throws IOException, InterruptedException - { - System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n", - mb(DatabaseDescriptor.getCommitLogSegmentSize()), - commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", - commitLog.executor.getClass().getSimpleName(), - randomSize ? " random size" : "", - discardedRun ? " with discarded run" : ""); + public void testLog(CommitLog commitLog) throws IOException, InterruptedException { + System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n", + mb(DatabaseDescriptor.getCommitLogSegmentSize()), + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.encryptionContext.isEnabled(), + commitLog.executor.getClass().getSimpleName(), + randomSize ? " random size" : "", + discardedRun ? " with discarded run" : ""); commitLog.allocator.enableReserveSegmentCreation(); - - final List<CommitlogExecutor> threads = new ArrayList<>(); + + final List<CommitlogThread> threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads); discardedPos = ReplayPosition.NONE; @@ -237,7 +248,7 @@ public class CommitLogStressTest scheduled.shutdown(); scheduled.awaitTermination(2, TimeUnit.SECONDS); - for (CommitlogExecutor t : threads) + for (CommitlogThread t: threads) { t.join(); if (t.rp.compareTo(discardedPos) > 0) @@ -248,6 +259,7 @@ public class CommitLogStressTest commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos); threads.clear(); + System.out.format("Discarded at %s\n", discardedPos); verifySizes(commitLog); @@ -261,7 +273,7 @@ public class CommitLogStressTest int hash = 0; int cells = 0; - for (CommitlogExecutor t : threads) + for (CommitlogThread t: threads) { t.join(); hash += t.hash; @@ -271,7 +283,7 @@ public class CommitLogStressTest commitLog.shutdownBlocking(); - System.out.print("Stopped. Replaying... "); + System.out.println("Stopped. Replaying... "); System.out.flush(); Replayer repl = new Replayer(commitLog); File[] files = new File(location).listFiles(); @@ -282,14 +294,17 @@ public class CommitLogStressTest Assert.fail("Failed to delete " + f); if (hash == repl.hash && cells == repl.cells) - System.out.println("Test success."); + System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n", + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.encryptionContext.isEnabled(), + repl.discarded, repl.skipped); else { - System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", - repl.cells, - cells, - repl.hash, - hash); + System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n", + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.encryptionContext.isEnabled(), + repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped, + repl.hash, hash); failed = true; } } @@ -326,12 +341,11 @@ public class CommitLogStressTest Assert.assertTrue(ratios.isEmpty()); } - public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor> threads) + public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogThread> threads) { stop = false; - for (int ii = 0; ii < NUM_THREADS; ii++) - { - final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii)); + for (int ii = 0; ii < NUM_THREADS; ii++) { + final CommitlogThread t = new CommitlogThread(commitLog, new Random(ii)); threads.add(t); t.start(); } @@ -349,10 +363,10 @@ public class CommitLogStressTest long freeMemory = runtime.freeMemory(); long temp = 0; long sz = 0; - for (CommitlogExecutor cle : threads) + for (CommitlogThread clt : threads) { - temp += cle.counter.get(); - sz += cle.dataSize; + temp += clt.counter.get(); + sz += clt.dataSize; } double time = (System.currentTimeMillis() - start) / 1000.0; double avg = (temp / time); @@ -397,8 +411,7 @@ public class CommitLogStressTest return slice; } - public class CommitlogExecutor extends Thread - { + public class CommitlogThread extends Thread { final AtomicLong counter = new AtomicLong(); int hash = 0; int cells = 0; @@ -408,7 +421,7 @@ public class CommitLogStressTest volatile ReplayPosition rp; - public CommitlogExecutor(CommitLog commitLog, Random rand) + public CommitlogThread(CommitLog commitLog, Random rand) { this.commitLog = commitLog; this.random = rand; @@ -448,8 +461,10 @@ public class CommitLogStressTest super(log, discardedPos, null, ReplayFilter.create()); } - int hash = 0; - int cells = 0; + int hash; + int cells; + int discarded; + int skipped; @Override void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) @@ -457,11 +472,15 @@ public class CommitLogStressTest if (desc.id < discardedPos.segment) { System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation); + discarded++; return; } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) + { // Skip over this mutation. + skipped++; return; + } DataInputPlus bufIn = new DataInputBuffer(inputBuffer, 0, size); Mutation mutation; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java new file mode 100644 index 0000000..ab9cb6f --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.config.TransparentDataEncryptionOptions; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileSegmentInputStream; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; + +public class CommitLogDescriptorTest +{ + private static final byte[] iv = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; + + ParameterizedClass compression; + TransparentDataEncryptionOptions enabledTdeOptions; + + // Context with enabledTdeOptions enabled + EncryptionContext enabledEncryption; + + // Context with enabledTdeOptions disabled, with the assumption that enabledTdeOptions was never previously enabled + EncryptionContext neverEnabledEncryption; + + // Context with enabledTdeOptions disabled, with the assumption that enabledTdeOptions was previously enabled, but now disabled + // due to operator changing the yaml. + EncryptionContext previouslyEnabledEncryption; + + @Before + public void setup() + { + Map<String,String> params = new HashMap<>(); + compression = new ParameterizedClass(LZ4Compressor.class.getName(), params); + + enabledTdeOptions = EncryptionContextGenerator.createEncryptionOptions(); + enabledEncryption = new EncryptionContext(enabledTdeOptions, iv, false); + + neverEnabledEncryption = EncryptionContextGenerator.createDisabledContext(); + TransparentDataEncryptionOptions disaabledTdeOptions = new TransparentDataEncryptionOptions(false, enabledTdeOptions.cipher, enabledTdeOptions.key_alias, enabledTdeOptions.key_provider); + previouslyEnabledEncryption = new EncryptionContext(disaabledTdeOptions); + } + + @Test + public void testVersions() + { + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); + + Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); + + Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null, neverEnabledEncryption).getMessagingVersion()); + String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; + Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); + } + + // migrated from CommitLogTest + private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException + { + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + long length = buf.position(); + // Put some extra data in the stream. + buf.putDouble(0.1); + buf.flip(); + FileDataInput input = new FileSegmentInputStream(buf, "input", 0); + CommitLogDescriptor read = CommitLogDescriptor.readHeader(input, neverEnabledEncryption); + Assert.assertEquals("Descriptor length", length, input.getFilePointer()); + Assert.assertEquals("Descriptors", desc, read); + } + + // migrated from CommitLogTest + @Test + public void testDescriptorPersistence() throws IOException + { + testDescriptorPersistence(new CommitLogDescriptor(11, null, neverEnabledEncryption)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null, neverEnabledEncryption)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null, neverEnabledEncryption)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null), neverEnabledEncryption)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19, + new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null") + ), neverEnabledEncryption)); + } + + // migrated from CommitLogTest + @Test + public void testDescriptorInvalidParametersSize() throws IOException + { + Map<String, String> params = new HashMap<>(); + for (int i=0; i<65535; ++i) + params.put("key"+i, Integer.toString(i, 16)); + try { + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, + 21, + new ParameterizedClass("LZ4Compressor", params), + neverEnabledEncryption); + ByteBuffer buf = ByteBuffer.allocate(1024000); + CommitLogDescriptor.writeHeader(buf, desc); + Assert.fail("Parameter object too long should fail on writing descriptor."); + } catch (ConfigurationException e) + { + // correct path + } + } + + @Test + public void constructParametersString_NoCompressionOrEncryption() + { + String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap()); + Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY)); + Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER)); + + json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap()); + Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY)); + Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER)); + } + + @Test + public void constructParametersString_WithCompressionAndEncryption() + { + String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap()); + Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY)); + Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER)); + } + + @Test + public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption); + Assert.assertNotNull(result); + Assert.assertNull(result.compression); + Assert.assertFalse(result.getEncryptionContext().isEnabled()); + } + + @Test + public void writeAndReadHeader_OnlyCompression() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption); + Assert.assertNotNull(result); + Assert.assertEquals(compression, result.compression); + Assert.assertFalse(result.getEncryptionContext().isEnabled()); + } + + @Test + public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption); + Assert.assertNotNull(result); + Assert.assertNull(result.compression); + Assert.assertTrue(result.getEncryptionContext().isEnabled()); + Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV()); + } + + /** + * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted. + */ + @Test + public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption); + Assert.assertNotNull(result); + Assert.assertNull(result.compression); + Assert.assertTrue(result.getEncryptionContext().isEnabled()); + Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV()); + } + + /** + * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header + * functionality should be correct + */ + @Test + public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException + { + CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); + ByteBuffer buffer = ByteBuffer.allocate(16 * 1024); + CommitLogDescriptor.writeHeader(buffer, descriptor); + buffer.flip(); + FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0); + CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption); + Assert.assertNotNull(result); + Assert.assertEquals(compression, result.compression); + Assert.assertTrue(result.getEncryptionContext().isEnabled()); + Assert.assertEquals(enabledEncryption, result.getEncryptionContext()); + Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV()); + } + + @Test + public void equals_NoCompressionOrEncryption() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc2); + } + + @Test + public void equals_OnlyCompression() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc2); + } + + @Test + public void equals_OnlyEncryption() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption); + Assert.assertEquals(desc1, desc2); + + desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc1); + desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption); + Assert.assertEquals(desc1, desc2); + } + + /** + * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless + */ + @Test + public void equals_BothCompressionAndEncryption() + { + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); + Assert.assertEquals(desc1, desc1); + + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); + Assert.assertEquals(desc1, desc2); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 555cdda..91a25e1 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -18,26 +18,26 @@ */ package org.apache.cassandra.db.commitlog; -import static junit.framework.Assert.assertTrue; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; -import static org.junit.Assert.assertEquals; - import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; import java.util.zip.Checksum; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -46,28 +46,41 @@ import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; -import org.junit.*; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.config.Config.CommitFailurePolicy; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogDescriptor; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.vint.VIntCoding; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class CommitLogTest { private static final String KEYSPACE1 = "CommitLogTest"; @@ -75,6 +88,8 @@ public class CommitLogTest private static final String STANDARD1 = "Standard1"; private static final String STANDARD2 = "Standard2"; + String logDirectory; + @BeforeClass public static void defineSchema() throws ConfigurationException { @@ -90,11 +105,18 @@ public class CommitLogTest CompactionManager.instance.disableAutoCompaction(); } + @Before + public void setup() + { + logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit"; + new File(logDirectory).mkdirs(); + } + @Test public void testRecoveryWithEmptyLog() throws Exception { runExpecting(() -> { - CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) }); + CommitLog.instance.recover(tmpFile(CommitLogDescriptor.current_version)); return null; }, CommitLogReplayException.class); } @@ -102,7 +124,7 @@ public class CommitLogTest @Test public void testRecoveryWithEmptyLog20() throws Exception { - CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) }); + CommitLog.instance.recover(tmpFile(CommitLogDescriptor.VERSION_20)); } @Test @@ -128,14 +150,6 @@ public class CommitLogTest } @Test - public void testRecoveryWithShortCheckSum() throws Exception - { - byte[] data = new byte[8]; - data[3] = 10; // make sure this is not a legacy end marker. - testRecovery(data, CommitLogReplayException.class); - } - - @Test public void testRecoveryWithShortMutationSize() throws Exception { testRecoveryWithBadSizeArgument(9, 10); @@ -193,7 +207,7 @@ public class CommitLogTest // Roughly 32 MB mutation Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") .clustering("bytes") - .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4)) + .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) .build(); // Adding it 5 times @@ -210,13 +224,13 @@ public class CommitLogTest .build(); CommitLog.instance.add(m2); - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + assertEquals(2, CommitLog.instance.activeSegments()); UUID cfid2 = m2.getColumnFamilyIds().iterator().next(); CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); - // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + // Assert we still have both our segments + assertEquals(2, CommitLog.instance.activeSegments()); } @Test @@ -237,14 +251,14 @@ public class CommitLogTest CommitLog.instance.add(rm); CommitLog.instance.add(rm); - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + assertEquals(1, CommitLog.instance.activeSegments()); // "Flush": this won't delete anything UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); CommitLog.instance.sync(true); CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext()); - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + assertEquals(1, CommitLog.instance.activeSegments()); // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") @@ -256,8 +270,7 @@ public class CommitLogTest CommitLog.instance.add(rm2); CommitLog.instance.add(rm2); - assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments(); - + assertEquals(3, CommitLog.instance.activeSegments()); // "Flush" second cf: The first segment should be deleted since we // didn't write anything on cf1 since last flush (and we flush cf2) @@ -266,7 +279,7 @@ public class CommitLogTest CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + assertEquals(1, CommitLog.instance.activeSegments()); } private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName) @@ -311,24 +324,17 @@ public class CommitLogTest CommitLog.instance.add(rm); } - @Test + @Test(expected = IllegalArgumentException.class) public void testExceedRecordLimit() throws Exception { CommitLog.instance.resetUnsafe(true); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - try - { - Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) - .build(); - CommitLog.instance.add(rm); - throw new AssertionError("mutation larger than limit was accepted"); - } - catch (IllegalArgumentException e) - { - // IAE is thrown on too-large mutations - } + Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) + .build(); + CommitLog.instance.add(rm); + throw new AssertionError("mutation larger than limit was accepted"); } protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception @@ -349,6 +355,45 @@ public class CommitLogTest testRecovery(out.toByteArray(), CommitLogReplayException.class); } + /** + * Create a temporary commit log file with an appropriate descriptor at the head. + * + * @return the commit log file reference and the first position after the descriptor in the file + * (so that subsequent writes happen at the correct file location). + */ + protected Pair<File, Integer> tmpFile() throws IOException + { + EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext(); + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version, + CommitLogSegment.getNextId(), + DatabaseDescriptor.getCommitLogCompression(), + encryptionContext); + + // if we're testing encryption, we need to write out a cipher IV to the descriptor headers + Map<String, String> additionalHeaders = new HashMap<>(); + if (encryptionContext.isEnabled()) + { + byte[] buf = new byte[16]; + new Random().nextBytes(buf); + additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf)); + } + + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders); + buf.flip(); + int positionAfterHeader = buf.limit() + 1; + + File logFile = new File(logDirectory, desc.fileName()); + logFile.deleteOnExit(); + + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(buf.array(), 0, buf.limit()); + } + + return Pair.create(logFile, positionAfterHeader); + } + protected File tmpFile(int version) throws IOException { File logFile = File.createTempFile("CommitLog-" + version + "-", ".log"); @@ -374,7 +419,7 @@ public class CommitLogTest File logFile = tmpFile(desc.version); CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName()); // Change id to match file. - desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression); + desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext()); ByteBuffer buf = ByteBuffer.allocate(1024); CommitLogDescriptor.writeHeader(buf, desc); try (OutputStream lout = new FileOutputStream(logFile)) @@ -390,7 +435,7 @@ public class CommitLogTest @Test public void testRecoveryWithIdMismatch() throws Exception { - CommitLogDescriptor desc = new CommitLogDescriptor(4, null); + CommitLogDescriptor desc = new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()); File logFile = tmpFile(desc.version); ByteBuffer buf = ByteBuffer.allocate(1024); CommitLogDescriptor.writeHeader(buf, desc); @@ -408,7 +453,7 @@ public class CommitLogTest @Test public void testRecoveryWithBadCompressor() throws Exception { - CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null)); + CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null), EncryptionContextGenerator.createDisabledContext()); runExpecting(() -> { testRecovery(desc, new byte[0]); return null; @@ -444,23 +489,20 @@ public class CommitLogTest protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception { runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected); - runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected); + runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected); } - @Test - public void testVersions() + protected void testRecovery(byte[] logData) throws Exception { - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); - - assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); + Pair<File, Integer> pair = tmpFile(); + try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw")) + { + raf.seek(pair.right); + raf.write(logData); + raf.close(); - assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); - String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; - assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); + CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } } @Test @@ -513,12 +555,12 @@ public class CommitLogTest ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); new RowUpdateBuilder(cfs.metadata, 0, "key1") - .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd")) - .build() - .applyUnsafe(); + .clustering("bytes").add("val", bytes("abcd")) + .build() + .applyUnsafe(); assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build()) - .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd"))); + .cells().iterator().next().value().equals(bytes("abcd"))); cfs.truncateBlocking(); @@ -530,46 +572,154 @@ public class CommitLogTest } } - private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException + @Test + public void replay_StandardMmapped() throws IOException { - ByteBuffer buf = ByteBuffer.allocate(1024); - CommitLogDescriptor.writeHeader(buf, desc); - // Put some extra data in the stream. - buf.putDouble(0.1); - buf.flip(); + DatabaseDescriptor.setCommitLogCompression(null); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); + CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start(); + replaySimple(commitLog); + replayWithDiscard(commitLog); + } - DataInputBuffer input = new DataInputBuffer(buf, false); - CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); - Assert.assertEquals("Descriptors", desc, read); + @Test + public void replay_Compressed_LZ4() throws IOException + { + replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap())); } @Test - public void testDescriptorPersistence() throws IOException + public void replay_Compressed_Snappy() throws IOException { - testDescriptorPersistence(new CommitLogDescriptor(11, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null))); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19, - new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); + replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap())); } @Test - public void testDescriptorInvalidParametersSize() throws IOException + public void replay_Compressed_Deflate() throws IOException { - Map<String, String> params = new HashMap<>(); - for (int i=0; i<65535; ++i) - params.put("key"+i, Integer.toString(i, 16)); - try { - CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, - 21, - new ParameterizedClass("LZ4Compressor", params)); - ByteBuffer buf = ByteBuffer.allocate(1024000); - CommitLogDescriptor.writeHeader(buf, desc); - Assert.fail("Parameter object too long should fail on writing descriptor."); - } catch (ConfigurationException e) + replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap())); + } + + private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException + { + DatabaseDescriptor.setCommitLogCompression(parameterizedClass); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); + CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start(); + replaySimple(commitLog); + replayWithDiscard(commitLog); + } + + @Test + public void replay_Encrypted() throws IOException + { + DatabaseDescriptor.setCommitLogCompression(null); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true)); + CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start(); + + try + { + replaySimple(commitLog); + replayWithDiscard(commitLog); + } + finally + { + for (String file : commitLog.getActiveSegmentNames()) + FileUtils.delete(new File(commitLog.location, file)); + } + } + + private void replaySimple(CommitLog commitLog) throws IOException + { + int cellCount = 0; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; + commitLog.add(rm1); + + final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2") + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + cellCount += 1; + commitLog.add(rm2); + + commitLog.sync(true); + + Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE); + List<String> activeSegments = commitLog.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + + File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name)); + replayer.recover(files); + + assertEquals(cellCount, replayer.cells); + } + + private void replayWithDiscard(CommitLog commitLog) throws IOException + { + int cellCount = 0; + int max = 1024; + int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay + ReplayPosition replayPosition = null; + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + for (int i = 0; i < max; i++) + { + final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1) + .clustering("bytes") + .add("val", bytes("this is a string")) + .build(); + ReplayPosition position = commitLog.add(rm1); + + if (i == discardPosition) + replayPosition = position; + if (i > discardPosition) + { + cellCount += 1; + } + } + + commitLog.sync(true); + + Replayer replayer = new Replayer(commitLog, replayPosition); + List<String> activeSegments = commitLog.getActiveSegmentNames(); + Assert.assertFalse(activeSegments.isEmpty()); + + File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name)); + replayer.recover(files); + + assertEquals(cellCount, replayer.cells); + } + + class Replayer extends CommitLogReplayer + { + private final ReplayPosition filterPosition; + int cells; + int skipped; + + Replayer(CommitLog commitLog, ReplayPosition filterPosition) + { + super(commitLog, filterPosition, null, ReplayFilter.create()); + this.filterPosition = filterPosition; + } + + void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException { - // correct path + if (entryLocation <= filterPosition.position) + { + // Skip over this mutation. + skipped++; + return; + } + + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); + for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) + for (Row row : partitionUpdate) + cells += Iterables.size(row.cells()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java index 8d412a1..a49c4cf 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java @@ -37,6 +37,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.rows.Cell; @@ -45,10 +46,15 @@ import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.security.EncryptionContextGenerator; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; +/** + * Note: if you are looking to create new test cases for this test, check out + * {@link CommitLogUpgradeTestMaker} + */ public class CommitLogUpgradeTest { static final String DATA_DIR = "test/data/legacy-commitlog/"; @@ -157,13 +163,20 @@ public class CommitLogUpgradeTest } } + @Test + public void test34_encrypted() throws Exception + { + testRestore(DATA_DIR + "3.4-encrypted"); + } + @BeforeClass - static public void initialize() throws FileNotFoundException, IOException, InterruptedException + public static void initialize() { SchemaLoader.loadSchema(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), metadata); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true)); } public void testRestore(String location) throws IOException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java index 80683c2..69764e6 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java @@ -100,11 +100,12 @@ public class CommitLogUpgradeTestMaker public void makeLog() throws IOException, InterruptedException { CommitLog commitLog = CommitLog.instance; - System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n", + System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n", mb(DatabaseDescriptor.getCommitLogSegmentSize()), commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.encryptionContext.isEnabled() ? "enabled" : "none", commitLog.executor.getClass().getSimpleName(), - randomSize ? " random size" : ""); + randomSize ? "random size" : ""); final List<CommitlogExecutor> threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads); @@ -233,7 +234,6 @@ public class CommitLogUpgradeTestMaker { if (rl != null) rl.acquire(); - String ks = KEYSPACE; ByteBuffer key = randomBytes(16, tlr); UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData(KEYSPACE, TABLE), Util.dk(key)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java new file mode 100644 index 0000000..04e471d --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.DataInput; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Random; +import javax.crypto.Cipher; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.commitlog.SegmentReader.CompressedSegmenter; +import org.apache.cassandra.db.commitlog.SegmentReader.EncryptedSegmenter; +import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.security.CipherFactory; +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class SegmentReaderTest +{ + static final Random random = new Random(); + + @Test + public void compressedSegmenter_LZ4() throws IOException + { + compressedSegmenter(LZ4Compressor.create(null)); + } + + @Test + public void compressedSegmenter_Snappy() throws IOException + { + compressedSegmenter(SnappyCompressor.create(null)); + } + + @Test + public void compressedSegmenter_Deflate() throws IOException + { + compressedSegmenter(DeflateCompressor.create(null)); + } + + private void compressedSegmenter(ICompressor compressor) throws IOException + { + int rawSize = (1 << 15) - 137; + ByteBuffer plainTextBuffer = compressor.preferredBufferType().allocate(rawSize); + byte[] b = new byte[rawSize]; + random.nextBytes(b); + plainTextBuffer.put(b); + plainTextBuffer.flip(); + + int uncompressedHeaderSize = 4; // need to add in the plain text size to the block we write out + int length = compressor.initialCompressedBufferLength(rawSize); + ByteBuffer compBuffer = ByteBufferUtil.ensureCapacity(null, length + uncompressedHeaderSize, true, compressor.preferredBufferType()); + compBuffer.putInt(rawSize); + compressor.compress(plainTextBuffer, compBuffer); + compBuffer.flip(); + + File compressedFile = File.createTempFile("compressed-segment-", ".log"); + compressedFile.deleteOnExit(); + FileOutputStream fos = new FileOutputStream(compressedFile); + fos.getChannel().write(compBuffer); + fos.close(); + + try (RandomAccessReader reader = RandomAccessReader.open(compressedFile)) + { + CompressedSegmenter segmenter = new CompressedSegmenter(compressor, reader); + int fileLength = (int) compressedFile.length(); + SyncSegment syncSegment = segmenter.nextSegment(0, fileLength); + FileDataInput fileDataInput = syncSegment.input; + ByteBuffer fileBuffer = readBytes(fileDataInput, rawSize); + + plainTextBuffer.flip(); + Assert.assertEquals(plainTextBuffer, fileBuffer); + + // CompressedSegmenter includes the Sync header length in the syncSegment.endPosition (value) + Assert.assertEquals(rawSize, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE); + } + } + + private ByteBuffer readBytes(DataInput input, int len) throws IOException + { + byte[] buf = new byte[len]; + input.readFully(buf); + return ByteBuffer.wrap(buf); + } + + @Test + public void encryptedSegmenter() throws IOException + { + EncryptionContext context = EncryptionContextGenerator.createContext(true); + CipherFactory cipherFactory = new CipherFactory(context.getTransparentDataEncryptionOptions()); + + int plainTextLength = (1 << 13) - 137; + ByteBuffer plainTextBuffer = ByteBuffer.allocate(plainTextLength); + random.nextBytes(plainTextBuffer.array()); + + ByteBuffer compressedBuffer = EncryptionUtils.compress(plainTextBuffer, null, true, context.getCompressor()); + Cipher cipher = cipherFactory.getEncryptor(context.getTransparentDataEncryptionOptions().cipher, context.getTransparentDataEncryptionOptions().key_alias); + File encryptedFile = File.createTempFile("encrypted-segment-", ".log"); + encryptedFile.deleteOnExit(); + FileChannel channel = new RandomAccessFile(encryptedFile, "rw").getChannel(); + channel.write(ByteBufferUtil.bytes(plainTextLength)); + EncryptionUtils.encryptAndWrite(compressedBuffer, channel, true, cipher); + channel.close(); + + try (RandomAccessReader reader = RandomAccessReader.open(encryptedFile)) + { + context = EncryptionContextGenerator.createContext(cipher.getIV(), true); + EncryptedSegmenter segmenter = new EncryptedSegmenter(reader, context); + SyncSegment syncSegment = segmenter.nextSegment(0, (int) reader.length()); + + // EncryptedSegmenter includes the Sync header length in the syncSegment.endPosition (value) + Assert.assertEquals(plainTextLength, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE); + ByteBuffer fileBuffer = readBytes(syncSegment.input, plainTextLength); + plainTextBuffer.position(0); + Assert.assertEquals(plainTextBuffer, fileBuffer); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java b/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java index 635889b..4719356 100644 --- a/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java +++ b/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java @@ -33,7 +33,12 @@ public class EncryptionContextGenerator public static EncryptionContext createContext(boolean init) { - return new EncryptionContext(createEncryptionOptions(), init); + return createContext(null, init); + } + + public static EncryptionContext createContext(byte[] iv, boolean init) + { + return new EncryptionContext(createEncryptionOptions(), iv, init); } public static TransparentDataEncryptionOptions createEncryptionOptions() http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java b/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java new file mode 100644 index 0000000..be37f45 --- /dev/null +++ b/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.security; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.Random; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.ShortBufferException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.TransparentDataEncryptionOptions; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.util.RandomAccessReader; + +public class EncryptionUtilsTest +{ + final Random random = new Random(); + ICompressor compressor; + TransparentDataEncryptionOptions tdeOptions; + + @Before + public void setup() + { + compressor = LZ4Compressor.create(new HashMap<>()); + tdeOptions = EncryptionContextGenerator.createEncryptionOptions(); + } + + @Test + public void compress() throws IOException + { + byte[] buf = new byte[(1 << 13) - 13]; + random.nextBytes(buf); + ByteBuffer compressedBuffer = EncryptionUtils.compress(ByteBuffer.wrap(buf), ByteBuffer.allocate(0), true, compressor); + ByteBuffer uncompressedBuffer = EncryptionUtils.uncompress(compressedBuffer, ByteBuffer.allocate(0), true, compressor); + Assert.assertArrayEquals(buf, uncompressedBuffer.array()); + } + + @Test + public void encrypt() throws BadPaddingException, ShortBufferException, IllegalBlockSizeException, IOException + { + byte[] buf = new byte[(1 << 12) - 7]; + random.nextBytes(buf); + + // encrypt + CipherFactory cipherFactory = new CipherFactory(tdeOptions); + Cipher encryptor = cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias); + + File f = File.createTempFile("commitlog-enc-utils-", ".tmp"); + f.deleteOnExit(); + FileChannel channel = new RandomAccessFile(f, "rw").getChannel(); + EncryptionUtils.encryptAndWrite(ByteBuffer.wrap(buf), channel, true, encryptor); + channel.close(); + + // decrypt + Cipher decryptor = cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, encryptor.getIV()); + ByteBuffer decryptedBuffer = EncryptionUtils.decrypt(RandomAccessReader.open(f), ByteBuffer.allocate(0), true, decryptor); + + // normally, we'd just call BB.array(), but that gives you the *entire* backing array, not with any of the offsets (position,limit) applied. + // thus, just for this test, we copy the array and perform an array-level comparison with those offsets + decryptedBuffer.limit(buf.length); + byte[] b = new byte[buf.length]; + System.arraycopy(decryptedBuffer.array(), 0, b, 0, buf.length); + Assert.assertArrayEquals(buf, b); + } + + @Test + public void fullRoundTrip() throws IOException, BadPaddingException, ShortBufferException, IllegalBlockSizeException + { + // compress + byte[] buf = new byte[(1 << 12) - 7]; + random.nextBytes(buf); + ByteBuffer compressedBuffer = EncryptionUtils.compress(ByteBuffer.wrap(buf), ByteBuffer.allocate(0), true, compressor); + + // encrypt + CipherFactory cipherFactory = new CipherFactory(tdeOptions); + Cipher encryptor = cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias); + File f = File.createTempFile("commitlog-enc-utils-", ".tmp"); + f.deleteOnExit(); + FileChannel channel = new RandomAccessFile(f, "rw").getChannel(); + EncryptionUtils.encryptAndWrite(compressedBuffer, channel, true, encryptor); + + // decrypt + Cipher decryptor = cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, encryptor.getIV()); + ByteBuffer decryptedBuffer = EncryptionUtils.decrypt(RandomAccessReader.open(f), ByteBuffer.allocate(0), true, decryptor); + + // uncompress + ByteBuffer uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, ByteBuffer.allocate(0), true, compressor); + Assert.assertArrayEquals(buf, uncompressedBuffer.array()); + } +}
