Repository: cassandra Updated Branches: refs/heads/trunk 7709c2a7f -> 90033b921
Encrypted hints patch by jasobrown; reviewed by Blake Eggleston for CASSANDRA-11040 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90033b92 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90033b92 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90033b92 Branch: refs/heads/trunk Commit: 90033b921be41cbebab99bdcd50a4753a07b2e1b Parents: 7709c2a Author: Jason Brown <[email protected]> Authored: Mon Jan 4 06:50:25 2016 -0800 Committer: Jason Brown <[email protected]> Committed: Wed Feb 10 14:21:01 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 14 +- .../hints/CompressedChecksummedDataInput.java | 46 ++----- .../CompressedChecksummedDataInputBuilder.java | 36 ++++++ .../cassandra/hints/CompressedHintsWriter.java | 8 ++ .../hints/EncryptedChecksummedDataInput.java | 129 +++++++++++++++++++ .../cassandra/hints/EncryptedHintsWriter.java | 65 ++++++++++ .../apache/cassandra/hints/HintsDescriptor.java | 113 +++++++++++++++- .../org/apache/cassandra/hints/HintsReader.java | 2 + .../org/apache/cassandra/hints/HintsWriter.java | 14 +- .../cassandra/security/EncryptionUtils.java | 36 ++++++ .../apache/cassandra/hints/AlteredHints.java | 129 +++++++++++++++++++ .../cassandra/hints/HintsCompressionTest.java | 119 +++-------------- .../cassandra/hints/HintsEncryptionTest.java | 81 ++++++++++++ 14 files changed, 640 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 04ce8d7..551d147 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.4 + * Encrypted hints (CASSANDRA-11040) * SASI index options validation (CASSANDRA-11136) * Optimize disk seek using min/max column name meta data when the LIMIT clause is used (CASSANDRA-8180) http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index a9749f2..d6bcace 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -942,31 +942,29 @@ enable_scripted_user_defined_functions: false windows_timer_interval: 1 -# Enables encrypting data at-rest (on disk). Currently, AES/CBC/PKCS5Padding is the only supported -# encyption algorithm. Different key providers can be plugged in, but the default reads from +# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from # a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by # the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys # can still (and should!) be in the keystore and will be used on decrypt operations # (to handle the case of key rotation). # -# In order to make use of transparent data encryption, you must download and install the -# Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files -# for your version of the JDK. +# It is strongly recommended to download and install Java Cryptography Extension (JCE) +# Unlimited Strength Jurisdiction Policy Files for your version of the JDK. # (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html) # # Currently, only the following file types are supported for transparent data encryption, although -# more are coming in future cassandra releases: commitlog +# more are coming in future cassandra releases: commitlog, hints transparent_data_encryption_options: enabled: false chunk_length_kb: 64 cipher: AES/CBC/PKCS5Padding key_alias: testing:1 - # CBC requires iv length to be 16 bytes + # CBC IV length for AES needs to be 16 bytes (which is also the default size) # iv_length: 16 key_provider: - class_name: org.apache.cassandra.security.JKSKeyProvider parameters: - - keystore: test/conf/cassandra.keystore + - keystore: conf/.keystore keystore_password: cassandra store_type: JCEKS key_password: cassandra http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java index 1009b57..bbf1fdb 100644 --- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java @@ -21,6 +21,8 @@ package org.apache.cassandra.hints; import java.io.IOException; import java.nio.ByteBuffer; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; @@ -33,7 +35,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput private volatile ByteBuffer compressedBuffer = null; private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE); - public CompressedChecksummedDataInput(Builder builder) + public CompressedChecksummedDataInput(CompressedChecksummedDataInputBuilder builder) { super(builder); assert regions == null; //mmapped regions are not supported @@ -113,46 +115,20 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput throw new UnsupportedOperationException(); } - public static final class Builder extends ChecksummedDataInput.Builder - { - private long position; - private ICompressor compressor; - - public Builder(ChannelProxy channel) - { - super(channel); - bufferType = null; - } - - public CompressedChecksummedDataInput build() - { - assert position >= 0; - assert compressor != null; - return new CompressedChecksummedDataInput(this); - } - - public Builder withCompressor(ICompressor compressor) - { - this.compressor = compressor; - bufferType = compressor.preferredBufferType(); - return this; - } - - public Builder withPosition(long position) - { - this.position = position; - return this; - } - } - - public static final CompressedChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor) + public static ChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor) { long position = input.getPosition(); input.close(); - Builder builder = new Builder(new ChannelProxy(input.getPath())); + CompressedChecksummedDataInputBuilder builder = new CompressedChecksummedDataInputBuilder(new ChannelProxy(input.getPath())); builder.withPosition(position); builder.withCompressor(compressor); return builder.build(); } + + @VisibleForTesting + ICompressor getCompressor() + { + return compressor; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java new file mode 100644 index 0000000..3452df8 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java @@ -0,0 +1,36 @@ +package org.apache.cassandra.hints; + +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.ChannelProxy; + +public class CompressedChecksummedDataInputBuilder extends ChecksummedDataInput.Builder +{ + long position; + ICompressor compressor; + + public CompressedChecksummedDataInputBuilder(ChannelProxy channel) + { + super(channel); + bufferType = null; + } + + public ChecksummedDataInput build() + { + assert position >= 0; + assert compressor != null; + return new CompressedChecksummedDataInput(this); + } + + public CompressedChecksummedDataInputBuilder withCompressor(ICompressor compressor) + { + this.compressor = compressor; + bufferType = compressor.preferredBufferType(); + return this; + } + + public CompressedChecksummedDataInputBuilder withPosition(long position) + { + this.position = position; + return this; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java index 491dceb..8792e32 100644 --- a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java +++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java @@ -24,6 +24,8 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.zip.CRC32; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.compress.ICompressor; public class CompressedHintsWriter extends HintsWriter @@ -64,4 +66,10 @@ public class CompressedHintsWriter extends HintsWriter compressionBuffer.limit(compressedSize + METADATA_SIZE); super.writeBuffer(compressionBuffer); } + + @VisibleForTesting + ICompressor getCompressor() + { + return compressor; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java new file mode 100644 index 0000000..12b6bf2 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.IOException; +import java.nio.ByteBuffer; +import javax.crypto.Cipher; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.ChannelProxy; + +public class EncryptedChecksummedDataInput extends ChecksummedDataInput +{ + private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>() + { + protected ByteBuffer initialValue() + { + return ByteBuffer.allocate(0); + } + }; + + private final Cipher cipher; + private final ICompressor compressor; + + private final EncryptionUtils.ChannelProxyReadChannel readChannel; + + protected EncryptedChecksummedDataInput(Builder builder) + { + super(builder); + cipher = builder.cipher; + compressor = builder.compressor; + readChannel = new EncryptionUtils.ChannelProxyReadChannel(channel, builder.position); + assert cipher != null; + assert compressor != null; + } + + /** + * Since an entire block of compressed data is read off of disk, not just a hint at a time, + * we don't report EOF until the decompressed data has also been read completely + */ + public boolean isEOF() + { + return readChannel.getCurrentPosition() == channel.size() && buffer.remaining() == 0; + } + + protected void reBufferStandard() + { + try + { + ByteBuffer byteBuffer = reusableBuffers.get(); + ByteBuffer decrypted = EncryptionUtils.decrypt(readChannel, byteBuffer, true, cipher); + buffer = EncryptionUtils.uncompress(decrypted, buffer, true, compressor); + + if (decrypted.capacity() > byteBuffer.capacity()) + reusableBuffers.set(decrypted); + } + catch (IOException ioe) + { + throw new FSReadError(ioe, getPath()); + } + } + + public static class Builder extends CompressedChecksummedDataInputBuilder + { + Cipher cipher; + + public Builder(ChannelProxy channel) + { + super(channel); + } + + public Builder withCipher(Cipher cipher) + { + this.cipher = cipher; + return this; + } + + public ChecksummedDataInput build() + { + assert position >= 0; + assert compressor != null; + assert cipher != null; + return new EncryptedChecksummedDataInput(this); + } + } + + public static ChecksummedDataInput upgradeInput(ChecksummedDataInput input, Cipher cipher, ICompressor compressor) + { + long position = input.getPosition(); + input.close(); + + Builder builder = new Builder(new ChannelProxy(input.getPath())); + builder.withPosition(position); + builder.withCompressor(compressor); + builder.withCipher(cipher); + return builder.build(); + } + + @VisibleForTesting + Cipher getCipher() + { + return cipher; + } + + @VisibleForTesting + ICompressor getCompressor() + { + return compressor; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java b/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java new file mode 100644 index 0000000..4786d9c --- /dev/null +++ b/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.zip.CRC32; +import javax.crypto.Cipher; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.io.compress.ICompressor; + +import static org.apache.cassandra.utils.FBUtilities.updateChecksum; + +public class EncryptedHintsWriter extends HintsWriter +{ + private final Cipher cipher; + private final ICompressor compressor; + private volatile ByteBuffer byteBuffer; + + protected EncryptedHintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC) + { + super(directory, descriptor, file, channel, fd, globalCRC); + cipher = descriptor.getCipher(); + compressor = descriptor.createCompressor(); + } + + protected void writeBuffer(ByteBuffer input) throws IOException + { + byteBuffer = EncryptionUtils.compress(input, byteBuffer, true, compressor); + ByteBuffer output = EncryptionUtils.encryptAndWrite(byteBuffer, channel, true, cipher); + updateChecksum(globalCRC, output); + } + + @VisibleForTesting + Cipher getCipher() + { + return cipher; + } + + @VisibleForTesting + ICompressor getCompressor() + { + return compressor; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/HintsDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java index f5296b3..8a3ee8b 100644 --- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java +++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java @@ -22,15 +22,20 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.regex.Pattern; import java.util.zip.CRC32; +import javax.crypto.Cipher; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.FSReadError; @@ -38,6 +43,8 @@ import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.utils.Hex; import org.json.simple.JSONValue; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@ -50,10 +57,13 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; */ final class HintsDescriptor { + private static final Logger logger = LoggerFactory.getLogger(HintsDescriptor.class); + static final int VERSION_30 = 1; static final int CURRENT_VERSION = VERSION_30; static final String COMPRESSION = "compression"; + static final String ENCRYPTION = "encryption"; static final Pattern pattern = Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$"); @@ -62,17 +72,35 @@ final class HintsDescriptor final int version; final long timestamp; - // implemented for future compression support - see CASSANDRA-9428 final ImmutableMap<String, Object> parameters; final ParameterizedClass compressionConfig; + private final Cipher cipher; + private final ICompressor compressor; + HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters) { this.hostId = hostId; this.version = version; this.timestamp = timestamp; - this.parameters = parameters; compressionConfig = createCompressionConfig(parameters); + + EncryptionData encryption = createEncryption(parameters); + if (encryption == null) + { + cipher = null; + compressor = null; + } + else + { + if (compressionConfig != null) + throw new IllegalStateException("a hints file cannot be configured for both compression and encryption"); + cipher = encryption.cipher; + compressor = encryption.compressor; + parameters = encryption.params; + } + + this.parameters = parameters; } HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> parameters) @@ -100,6 +128,71 @@ final class HintsDescriptor } } + /** + * Create, if necessary, the required encryption components (for either decrpyt or encrypt operations). + * Note that in the case of encyption (this is, when writing out a new hints file), we need to write + * the cipher's IV out to the header so it can be used when decrypting. Thus, we need to add an additional + * entry to the {@code params} map. + * + * @param params the base parameters into the descriptor. + * @return null if not using encryption; else, the initialized {@link Cipher} and a possibly updated version + * of the {@code params} map. + */ + @SuppressWarnings("unchecked") + static EncryptionData createEncryption(ImmutableMap<String, Object> params) + { + if (params.containsKey(ENCRYPTION)) + { + Map<?, ?> encryptionConfig = (Map<?, ?>) params.get(ENCRYPTION); + EncryptionContext encryptionContext = EncryptionContext.createFromMap(encryptionConfig, DatabaseDescriptor.getEncryptionContext()); + + try + { + Cipher cipher; + if (encryptionConfig.containsKey(EncryptionContext.ENCRYPTION_IV)) + { + cipher = encryptionContext.getDecryptor(); + } + else + { + cipher = encryptionContext.getEncryptor(); + ImmutableMap<String, Object> encParams = ImmutableMap.<String, Object>builder() + .putAll(encryptionContext.toHeaderParameters()) + .put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV())) + .build(); + + Map<String, Object> map = new HashMap<>(params); + map.put(ENCRYPTION, encParams); + params = ImmutableMap.<String, Object>builder().putAll(map).build(); + } + return new EncryptionData(cipher, encryptionContext.getCompressor(), params); + } + catch (IOException ioe) + { + logger.warn("failed to create encyption context for hints file. ignoring encryption for hints.", ioe); + return null; + } + } + else + { + return null; + } + } + + private static final class EncryptionData + { + final Cipher cipher; + final ICompressor compressor; + final ImmutableMap<String, Object> params; + + private EncryptionData(Cipher cipher, ICompressor compressor, ImmutableMap<String, Object> params) + { + this.cipher = cipher; + this.compressor = compressor; + this.params = params; + } + } + String fileName() { return String.format("%s-%s-%s.hints", hostId, timestamp, version); @@ -148,9 +241,23 @@ final class HintsDescriptor return compressionConfig != null; } + public boolean isEncrypted() + { + return cipher != null; + } + public ICompressor createCompressor() { - return isCompressed() ? CompressionParams.createCompressor(compressionConfig) : null; + if (isCompressed()) + return CompressionParams.createCompressor(compressionConfig); + if (isEncrypted()) + return compressor; + return null; + } + + public Cipher getCipher() + { + return isEncrypted() ? cipher : null; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java index fe2b57a..0571af4 100644 --- a/src/java/org/apache/cassandra/hints/HintsReader.java +++ b/src/java/org/apache/cassandra/hints/HintsReader.java @@ -83,6 +83,8 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> // The compressed input is instantiated with the uncompressed input's position reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor()); } + else if (descriptor.isEncrypted()) + reader = EncryptedChecksummedDataInput.upgradeInput(reader, descriptor.getCipher(), descriptor.createCompressor()); return new HintsReader(descriptor, file, reader, rateLimiter); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/HintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java index 8836258..b4da379 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@ -33,7 +33,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.Throwables; @@ -49,9 +48,9 @@ class HintsWriter implements AutoCloseable private final File directory; private final HintsDescriptor descriptor; private final File file; - private final FileChannel channel; + protected final FileChannel channel; private final int fd; - private final CRC32 globalCRC; + protected final CRC32 globalCRC; private volatile long lastSyncPosition = 0L; @@ -89,14 +88,11 @@ class HintsWriter implements AutoCloseable throw e; } + if (descriptor.isEncrypted()) + return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc); if (descriptor.isCompressed()) - { return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc); - } - else - { - return new HintsWriter(directory, descriptor, file, channel, fd, crc); - } + return new HintsWriter(directory, descriptor, file, channel, fd, crc); } HintsDescriptor descriptor() http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/security/EncryptionUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java index f95977e..7e72b3e 100644 --- a/src/java/org/apache/cassandra/security/EncryptionUtils.java +++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java @@ -30,6 +30,7 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.db.commitlog.EncryptedSegment; import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.utils.ByteBufferUtil; @@ -274,4 +275,39 @@ public class EncryptionUtils // nop } } + + public static class ChannelProxyReadChannel implements ReadableByteChannel + { + private final ChannelProxy channelProxy; + private volatile long currentPosition; + + public ChannelProxyReadChannel(ChannelProxy channelProxy, long currentPosition) + { + this.channelProxy = channelProxy; + this.currentPosition = currentPosition; + } + + public int read(ByteBuffer dst) throws IOException + { + int bytesRead = channelProxy.read(dst, currentPosition); + dst.flip(); + currentPosition += bytesRead; + return bytesRead; + } + + public long getCurrentPosition() + { + return currentPosition; + } + + public boolean isOpen() + { + return channelProxy.isCleanedUp(); + } + + public void close() + { + // nop + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/test/unit/org/apache/cassandra/hints/AlteredHints.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java b/test/unit/org/apache/cassandra/hints/AlteredHints.java new file mode 100644 index 0000000..23dc32a --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import org.junit.Assert; +import org.junit.BeforeClass; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +/** + * Base class for testing compressed and encrypted hints. + */ +public abstract class AlteredHints +{ + protected static final String KEYSPACE = "hints_compression_test"; + private static final String TABLE = "table"; + + private static Mutation createMutation(int index, long timestamp) + { + CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + return new RowUpdateBuilder(table, timestamp, bytes(index)) + .clustering(bytes(index)) + .add("val", bytes(index)) + .build(); + } + + private static Hint createHint(int idx, long baseTimestamp) + { + long timestamp = baseTimestamp + idx; + return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp); + } + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + } + + abstract ImmutableMap<String, Object> params(); + abstract boolean looksLegit(HintsWriter writer); + abstract boolean looksLegit(ChecksummedDataInput checksummedDataInput); + + public void multiFlushAndDeserializeTest() throws Exception + { + int hintNum = 0; + int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE; + List<Hint> hints = new LinkedList<>(); + + UUID hostId = UUIDGen.getTimeUUID(); + long ts = System.currentTimeMillis(); + + HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params()); + File dir = Files.createTempDir(); + try (HintsWriter writer = HintsWriter.create(dir, descriptor)) + { + Assert.assertTrue(looksLegit(writer)); + + ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize); + try (HintsWriter.Session session = writer.newSession(writeBuffer)) + { + while (session.getBytesWritten() < bufferSize * 3) + { + Hint hint = createHint(hintNum, ts+hintNum); + session.append(hint); + hints.add(hint); + hintNum++; + } + } + } + + try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName()))) + { + Assert.assertTrue(looksLegit(reader.getInput())); + List<Hint> deserialized = new ArrayList<>(hintNum); + + for (HintsReader.Page page: reader) + { + Iterator<Hint> iterator = page.hintsIterator(); + while (iterator.hasNext()) + { + deserialized.add(iterator.next()); + } + } + + Assert.assertEquals(hints.size(), deserialized.size()); + hintNum = 0; + for (Hint expected: hints) + { + HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum)); + hintNum++; + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java index d6a08ca..f82db49 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java @@ -18,65 +18,20 @@ package org.apache.cassandra.hints; -import java.io.File; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import com.google.common.collect.ImmutableMap; -import com.google.common.io.Files; -import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.io.compress.DeflateCompressor; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.compress.SnappyCompressor; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.UUIDGen; - -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; -public class HintsCompressionTest +public class HintsCompressionTest extends AlteredHints { - private static final String KEYSPACE = "hints_compression_test"; - private static final String TABLE = "table"; - - - private static Mutation createMutation(int index, long timestamp) - { - CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE); - return new RowUpdateBuilder(table, timestamp, bytes(index)) - .clustering(bytes(index)) - .add("val", bytes(index)) - .build(); - } + private Class<? extends ICompressor> compressorClass; - private static Hint createHint(int idx, long baseTimestamp) - { - long timestamp = baseTimestamp + idx; - return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp); - } - - @BeforeClass - public static void defineSchema() - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE)); - } - - private ImmutableMap<String, Object> params(Class<? extends ICompressor> compressorClass) + ImmutableMap<String, Object> params() { ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder() .put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName()) @@ -86,72 +41,40 @@ public class HintsCompressionTest .build(); } - public void multiFlushAndDeserializeTest(Class<? extends ICompressor> compressorClass) throws Exception + boolean looksLegit(HintsWriter writer) { - int hintNum = 0; - int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE; - List<Hint> hints = new LinkedList<>(); - - UUID hostId = UUIDGen.getTimeUUID(); - long ts = System.currentTimeMillis(); - - HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params(compressorClass)); - File dir = Files.createTempDir(); - try (HintsWriter writer = HintsWriter.create(dir, descriptor)) - { - assert writer instanceof CompressedHintsWriter; - - ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize); - try (HintsWriter.Session session = writer.newSession(writeBuffer)) - { - while (session.getBytesWritten() < bufferSize * 3) - { - Hint hint = createHint(hintNum, ts+hintNum); - session.append(hint); - hints.add(hint); - hintNum++; - } - } - } - - try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName()))) - { - List<Hint> deserialized = new ArrayList<>(hintNum); - - for (HintsReader.Page page: reader) - { - Iterator<Hint> iterator = page.hintsIterator(); - while (iterator.hasNext()) - { - deserialized.add(iterator.next()); - } - } + if (!(writer instanceof CompressedHintsWriter)) + return false; + CompressedHintsWriter compressedHintsWriter = (CompressedHintsWriter)writer; + return compressedHintsWriter.getCompressor().getClass().isAssignableFrom(compressorClass); + } - Assert.assertEquals(hints.size(), deserialized.size()); - hintNum = 0; - for (Hint expected: hints) - { - HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum)); - hintNum++; - } - } + boolean looksLegit(ChecksummedDataInput checksummedDataInput) + { + if (!(checksummedDataInput instanceof CompressedChecksummedDataInput)) + return false; + CompressedChecksummedDataInput compressedChecksummedDataInput = (CompressedChecksummedDataInput)checksummedDataInput; + return compressedChecksummedDataInput.getCompressor().getClass().isAssignableFrom(compressorClass); } @Test public void lz4Compressor() throws Exception { - multiFlushAndDeserializeTest(LZ4Compressor.class); + compressorClass = LZ4Compressor.class; + multiFlushAndDeserializeTest(); } @Test public void snappyCompressor() throws Exception { - multiFlushAndDeserializeTest(SnappyCompressor.class); + compressorClass = SnappyCompressor.class; + multiFlushAndDeserializeTest(); } @Test public void deflateCompressor() throws Exception { - multiFlushAndDeserializeTest(DeflateCompressor.class); + compressorClass = DeflateCompressor.class; + multiFlushAndDeserializeTest(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java b/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java new file mode 100644 index 0000000..83b8481 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.util.Arrays; + +import javax.crypto.Cipher; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.security.EncryptionContextGenerator; + +public class HintsEncryptionTest extends AlteredHints +{ + EncryptionContext encryptionContext; + Cipher cipher; + + @Before + public void setup() + { + encryptionContext = EncryptionContextGenerator.createContext(true); + DatabaseDescriptor.setEncryptionContext(encryptionContext); + } + + @Test + public void encyptedHints() throws Exception + { + multiFlushAndDeserializeTest(); + } + + boolean looksLegit(HintsWriter writer) + { + if (!(writer instanceof EncryptedHintsWriter)) + return false; + + EncryptedHintsWriter encryptedHintsWriter = (EncryptedHintsWriter)writer; + cipher = encryptedHintsWriter.getCipher(); + + return encryptedHintsWriter.getCompressor().getClass().isAssignableFrom(encryptionContext.getCompressor().getClass()); + } + + boolean looksLegit(ChecksummedDataInput checksummedDataInput) + { + if (!(checksummedDataInput instanceof EncryptedChecksummedDataInput)) + return false; + + EncryptedChecksummedDataInput encryptedDataInput = (EncryptedChecksummedDataInput)checksummedDataInput; + + return Arrays.equals(cipher.getIV(), encryptedDataInput.getCipher().getIV()) && + encryptedDataInput.getCompressor().getClass().isAssignableFrom(encryptionContext.getCompressor().getClass()); + } + + ImmutableMap<String, Object> params() + { + ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder() + .putAll(encryptionContext.toHeaderParameters()) + .build(); + return ImmutableMap.<String, Object>builder() + .put(HintsDescriptor.ENCRYPTION, compressionParams) + .build(); + } +}
