Repository: cassandra
Updated Branches:
  refs/heads/trunk 7709c2a7f -> 90033b921


Encrypted hints

patch by jasobrown; reviewed by Blake Eggleston for CASSANDRA-11040


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90033b92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90033b92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90033b92

Branch: refs/heads/trunk
Commit: 90033b921be41cbebab99bdcd50a4753a07b2e1b
Parents: 7709c2a
Author: Jason Brown <[email protected]>
Authored: Mon Jan 4 06:50:25 2016 -0800
Committer: Jason Brown <[email protected]>
Committed: Wed Feb 10 14:21:01 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  14 +-
 .../hints/CompressedChecksummedDataInput.java   |  46 ++-----
 .../CompressedChecksummedDataInputBuilder.java  |  36 ++++++
 .../cassandra/hints/CompressedHintsWriter.java  |   8 ++
 .../hints/EncryptedChecksummedDataInput.java    | 129 +++++++++++++++++++
 .../cassandra/hints/EncryptedHintsWriter.java   |  65 ++++++++++
 .../apache/cassandra/hints/HintsDescriptor.java | 113 +++++++++++++++-
 .../org/apache/cassandra/hints/HintsReader.java |   2 +
 .../org/apache/cassandra/hints/HintsWriter.java |  14 +-
 .../cassandra/security/EncryptionUtils.java     |  36 ++++++
 .../apache/cassandra/hints/AlteredHints.java    | 129 +++++++++++++++++++
 .../cassandra/hints/HintsCompressionTest.java   | 119 +++--------------
 .../cassandra/hints/HintsEncryptionTest.java    |  81 ++++++++++++
 14 files changed, 640 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04ce8d7..551d147 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * Encrypted hints (CASSANDRA-11040)
  * SASI index options validation (CASSANDRA-11136)
  * Optimize disk seek using min/max column name meta data when the LIMIT 
clause is used
    (CASSANDRA-8180)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a9749f2..d6bcace 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -942,31 +942,29 @@ enable_scripted_user_defined_functions: false
 windows_timer_interval: 1
 
 
-# Enables encrypting data at-rest (on disk). Currently, AES/CBC/PKCS5Padding 
is the only supported
-# encyption algorithm. Different key providers can be plugged in, but the 
default reads from
+# Enables encrypting data at-rest (on disk). Different key providers can be 
plugged in, but the default reads from
 # a JCE-style keystore. A single keystore can hold multiple keys, but the one 
referenced by
 # the "key_alias" is the only key that will be used for encrypt opertaions; 
previously used keys
 # can still (and should!) be in the keystore and will be used on decrypt 
operations
 # (to handle the case of key rotation).
 #
-# In order to make use of transparent data encryption, you must download and 
install the
-# Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy 
Files
-# for your version of the JDK.
+# It is strongly recommended to download and install Java Cryptography 
Extension (JCE)
+# Unlimited Strength Jurisdiction Policy Files for your version of the JDK.
 # (current link: 
http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
 #
 # Currently, only the following file types are supported for transparent data 
encryption, although
-# more are coming in future cassandra releases: commitlog
+# more are coming in future cassandra releases: commitlog, hints
 transparent_data_encryption_options:
     enabled: false
     chunk_length_kb: 64
     cipher: AES/CBC/PKCS5Padding
     key_alias: testing:1
-    # CBC requires iv length to be 16 bytes
+    # CBC IV length for AES needs to be 16 bytes (which is also the default 
size)
     # iv_length: 16
     key_provider: 
       - class_name: org.apache.cassandra.security.JKSKeyProvider
         parameters: 
-          - keystore: test/conf/cassandra.keystore
+          - keystore: conf/.keystore
             keystore_password: cassandra
             store_type: JCEKS
             key_password: cassandra

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java 
b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index 1009b57..bbf1fdb 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.hints;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
@@ -33,7 +35,7 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
     private volatile ByteBuffer compressedBuffer = null;
     private final ByteBuffer metadataBuffer = 
ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
 
-    public CompressedChecksummedDataInput(Builder builder)
+    public 
CompressedChecksummedDataInput(CompressedChecksummedDataInputBuilder builder)
     {
         super(builder);
         assert regions == null;  //mmapped regions are not supported
@@ -113,46 +115,20 @@ public final class CompressedChecksummedDataInput extends 
ChecksummedDataInput
         throw new UnsupportedOperationException();
     }
 
-    public static final class Builder extends ChecksummedDataInput.Builder
-    {
-        private long position;
-        private ICompressor compressor;
-
-        public Builder(ChannelProxy channel)
-        {
-            super(channel);
-            bufferType = null;
-        }
-
-        public CompressedChecksummedDataInput build()
-        {
-            assert position >= 0;
-            assert compressor != null;
-            return new CompressedChecksummedDataInput(this);
-        }
-
-        public Builder withCompressor(ICompressor compressor)
-        {
-            this.compressor = compressor;
-            bufferType = compressor.preferredBufferType();
-            return this;
-        }
-
-        public Builder withPosition(long position)
-        {
-            this.position = position;
-            return this;
-        }
-    }
-
-    public static final CompressedChecksummedDataInput 
upgradeInput(ChecksummedDataInput input, ICompressor compressor)
+    public static ChecksummedDataInput upgradeInput(ChecksummedDataInput 
input, ICompressor compressor)
     {
         long position = input.getPosition();
         input.close();
 
-        Builder builder = new Builder(new ChannelProxy(input.getPath()));
+        CompressedChecksummedDataInputBuilder builder = new 
CompressedChecksummedDataInputBuilder(new ChannelProxy(input.getPath()));
         builder.withPosition(position);
         builder.withCompressor(compressor);
         return builder.build();
     }
+
+    @VisibleForTesting
+    ICompressor getCompressor()
+    {
+        return compressor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
 
b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
new file mode 100644
index 0000000..3452df8
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInputBuilder.java
@@ -0,0 +1,36 @@
+package org.apache.cassandra.hints;
+
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
+
+public class CompressedChecksummedDataInputBuilder extends 
ChecksummedDataInput.Builder
+{
+    long position;
+    ICompressor compressor;
+
+    public CompressedChecksummedDataInputBuilder(ChannelProxy channel)
+    {
+        super(channel);
+        bufferType = null;
+    }
+
+    public ChecksummedDataInput build()
+    {
+        assert position >= 0;
+        assert compressor != null;
+        return new CompressedChecksummedDataInput(this);
+    }
+
+    public CompressedChecksummedDataInputBuilder withCompressor(ICompressor 
compressor)
+    {
+        this.compressor = compressor;
+        bufferType = compressor.preferredBufferType();
+        return this;
+    }
+
+    public CompressedChecksummedDataInputBuilder withPosition(long position)
+    {
+        this.position = position;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java 
b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
index 491dceb..8792e32 100644
--- a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
@@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.zip.CRC32;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.compress.ICompressor;
 
 public class CompressedHintsWriter extends HintsWriter
@@ -64,4 +66,10 @@ public class CompressedHintsWriter extends HintsWriter
         compressionBuffer.limit(compressedSize + METADATA_SIZE);
         super.writeBuffer(compressionBuffer);
     }
+
+    @VisibleForTesting
+    ICompressor getCompressor()
+    {
+        return compressor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java 
b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
new file mode 100644
index 0000000..12b6bf2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
+
+public class EncryptedChecksummedDataInput extends ChecksummedDataInput
+{
+    private static final ThreadLocal<ByteBuffer> reusableBuffers = new 
ThreadLocal<ByteBuffer>()
+    {
+        protected ByteBuffer initialValue()
+        {
+            return ByteBuffer.allocate(0);
+        }
+    };
+
+    private final Cipher cipher;
+    private final ICompressor compressor;
+
+    private final EncryptionUtils.ChannelProxyReadChannel readChannel;
+
+    protected EncryptedChecksummedDataInput(Builder builder)
+    {
+        super(builder);
+        cipher = builder.cipher;
+        compressor = builder.compressor;
+        readChannel = new EncryptionUtils.ChannelProxyReadChannel(channel, 
builder.position);
+        assert cipher != null;
+        assert compressor != null;
+    }
+
+    /**
+     * Since an entire block of compressed data is read off of disk, not just 
a hint at a time,
+     * we don't report EOF until the decompressed data has also been read 
completely
+     */
+    public boolean isEOF()
+    {
+        return readChannel.getCurrentPosition() == channel.size() && 
buffer.remaining() == 0;
+    }
+
+    protected void reBufferStandard()
+    {
+        try
+        {
+            ByteBuffer byteBuffer = reusableBuffers.get();
+            ByteBuffer decrypted = EncryptionUtils.decrypt(readChannel, 
byteBuffer, true, cipher);
+            buffer = EncryptionUtils.uncompress(decrypted, buffer, true, 
compressor);
+
+            if (decrypted.capacity() > byteBuffer.capacity())
+                reusableBuffers.set(decrypted);
+        }
+        catch (IOException ioe)
+        {
+            throw new FSReadError(ioe, getPath());
+        }
+    }
+
+    public static class Builder extends CompressedChecksummedDataInputBuilder
+    {
+        Cipher cipher;
+
+        public Builder(ChannelProxy channel)
+        {
+            super(channel);
+        }
+
+        public Builder withCipher(Cipher cipher)
+        {
+            this.cipher = cipher;
+            return this;
+        }
+
+        public ChecksummedDataInput build()
+        {
+            assert position >= 0;
+            assert compressor != null;
+            assert cipher != null;
+            return new EncryptedChecksummedDataInput(this);
+        }
+    }
+
+    public static ChecksummedDataInput upgradeInput(ChecksummedDataInput 
input, Cipher cipher, ICompressor compressor)
+    {
+        long position = input.getPosition();
+        input.close();
+
+        Builder builder = new Builder(new ChannelProxy(input.getPath()));
+        builder.withPosition(position);
+        builder.withCompressor(compressor);
+        builder.withCipher(cipher);
+        return builder.build();
+    }
+
+    @VisibleForTesting
+    Cipher getCipher()
+    {
+        return cipher;
+    }
+
+    @VisibleForTesting
+    ICompressor getCompressor()
+    {
+        return compressor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java 
b/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java
new file mode 100644
index 0000000..4786d9c
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/EncryptedHintsWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.io.compress.ICompressor;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+
+public class EncryptedHintsWriter extends HintsWriter
+{
+    private final Cipher cipher;
+    private final ICompressor compressor;
+    private volatile ByteBuffer byteBuffer;
+
+    protected EncryptedHintsWriter(File directory, HintsDescriptor descriptor, 
File file, FileChannel channel, int fd, CRC32 globalCRC)
+    {
+        super(directory, descriptor, file, channel, fd, globalCRC);
+        cipher = descriptor.getCipher();
+        compressor = descriptor.createCompressor();
+    }
+
+    protected void writeBuffer(ByteBuffer input) throws IOException
+    {
+        byteBuffer = EncryptionUtils.compress(input, byteBuffer, true, 
compressor);
+        ByteBuffer output = EncryptionUtils.encryptAndWrite(byteBuffer, 
channel, true, cipher);
+        updateChecksum(globalCRC, output);
+    }
+
+    @VisibleForTesting
+    Cipher getCipher()
+    {
+        return cipher;
+    }
+
+    @VisibleForTesting
+    ICompressor getCompressor()
+    {
+        return compressor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java 
b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index f5296b3..8a3ee8b 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -22,15 +22,20 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Pattern;
 import java.util.zip.CRC32;
+import javax.crypto.Cipher;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.FSReadError;
@@ -38,6 +43,8 @@ import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.Hex;
 import org.json.simple.JSONValue;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -50,10 +57,13 @@ import static 
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
  */
 final class HintsDescriptor
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(HintsDescriptor.class);
+
     static final int VERSION_30 = 1;
     static final int CURRENT_VERSION = VERSION_30;
 
     static final String COMPRESSION = "compression";
+    static final String ENCRYPTION = "encryption";
 
     static final Pattern pattern =
         
Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
@@ -62,17 +72,35 @@ final class HintsDescriptor
     final int version;
     final long timestamp;
 
-    // implemented for future compression support - see CASSANDRA-9428
     final ImmutableMap<String, Object> parameters;
     final ParameterizedClass compressionConfig;
 
+    private final Cipher cipher;
+    private final ICompressor compressor;
+
     HintsDescriptor(UUID hostId, int version, long timestamp, 
ImmutableMap<String, Object> parameters)
     {
         this.hostId = hostId;
         this.version = version;
         this.timestamp = timestamp;
-        this.parameters = parameters;
         compressionConfig = createCompressionConfig(parameters);
+
+        EncryptionData encryption = createEncryption(parameters);
+        if (encryption == null)
+        {
+            cipher = null;
+            compressor = null;
+        }
+        else
+        {
+            if (compressionConfig != null)
+                throw new IllegalStateException("a hints file cannot be 
configured for both compression and encryption");
+            cipher = encryption.cipher;
+            compressor = encryption.compressor;
+            parameters = encryption.params;
+        }
+
+        this.parameters = parameters;
     }
 
     HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> 
parameters)
@@ -100,6 +128,71 @@ final class HintsDescriptor
         }
     }
 
+    /**
+     * Create, if necessary, the required encryption components (for either 
decrpyt or encrypt operations).
+     * Note that in the case of encyption (this is, when writing out a new 
hints file), we need to write
+     * the cipher's IV out to the header so it can be used when decrypting. 
Thus, we need to add an additional
+     * entry to the {@code params} map.
+     *
+     * @param params the base parameters into the descriptor.
+     * @return null if not using encryption; else, the initialized {@link 
Cipher} and a possibly updated version
+     * of the {@code params} map.
+     */
+    @SuppressWarnings("unchecked")
+    static EncryptionData createEncryption(ImmutableMap<String, Object> params)
+    {
+        if (params.containsKey(ENCRYPTION))
+        {
+            Map<?, ?> encryptionConfig = (Map<?, ?>) params.get(ENCRYPTION);
+            EncryptionContext encryptionContext = 
EncryptionContext.createFromMap(encryptionConfig, 
DatabaseDescriptor.getEncryptionContext());
+
+            try
+            {
+                Cipher cipher;
+                if 
(encryptionConfig.containsKey(EncryptionContext.ENCRYPTION_IV))
+                {
+                    cipher = encryptionContext.getDecryptor();
+                }
+                else
+                {
+                    cipher = encryptionContext.getEncryptor();
+                    ImmutableMap<String, Object> encParams = 
ImmutableMap.<String, Object>builder()
+                                                                 
.putAll(encryptionContext.toHeaderParameters())
+                                                                 
.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()))
+                                                                 .build();
+
+                    Map<String, Object> map = new HashMap<>(params);
+                    map.put(ENCRYPTION, encParams);
+                    params = ImmutableMap.<String, 
Object>builder().putAll(map).build();
+                }
+                return new EncryptionData(cipher, 
encryptionContext.getCompressor(), params);
+            }
+            catch (IOException ioe)
+            {
+                logger.warn("failed to create encyption context for hints 
file. ignoring encryption for hints.", ioe);
+                return null;
+            }
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    private static final class EncryptionData
+    {
+        final Cipher cipher;
+        final ICompressor compressor;
+        final ImmutableMap<String, Object> params;
+
+        private EncryptionData(Cipher cipher, ICompressor compressor, 
ImmutableMap<String, Object> params)
+        {
+            this.cipher = cipher;
+            this.compressor = compressor;
+            this.params = params;
+        }
+    }
+
     String fileName()
     {
         return String.format("%s-%s-%s.hints", hostId, timestamp, version);
@@ -148,9 +241,23 @@ final class HintsDescriptor
         return compressionConfig != null;
     }
 
+    public boolean isEncrypted()
+    {
+        return cipher != null;
+    }
+
     public ICompressor createCompressor()
     {
-        return isCompressed() ? 
CompressionParams.createCompressor(compressionConfig) : null;
+        if (isCompressed())
+            return CompressionParams.createCompressor(compressionConfig);
+        if (isEncrypted())
+            return compressor;
+        return null;
+    }
+
+    public Cipher getCipher()
+    {
+        return isEncrypted() ? cipher : null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java 
b/src/java/org/apache/cassandra/hints/HintsReader.java
index fe2b57a..0571af4 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -83,6 +83,8 @@ class HintsReader implements AutoCloseable, 
Iterable<HintsReader.Page>
                 // The compressed input is instantiated with the uncompressed 
input's position
                 reader = CompressedChecksummedDataInput.upgradeInput(reader, 
descriptor.createCompressor());
             }
+            else if (descriptor.isEncrypted())
+                reader = EncryptedChecksummedDataInput.upgradeInput(reader, 
descriptor.getCipher(), descriptor.createCompressor());
             return new HintsReader(descriptor, file, reader, rateLimiter);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java 
b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 8836258..b4da379 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.Throwables;
@@ -49,9 +48,9 @@ class HintsWriter implements AutoCloseable
     private final File directory;
     private final HintsDescriptor descriptor;
     private final File file;
-    private final FileChannel channel;
+    protected final FileChannel channel;
     private final int fd;
-    private final CRC32 globalCRC;
+    protected final CRC32 globalCRC;
 
     private volatile long lastSyncPosition = 0L;
 
@@ -89,14 +88,11 @@ class HintsWriter implements AutoCloseable
             throw e;
         }
 
+        if (descriptor.isEncrypted())
+            return new EncryptedHintsWriter(directory, descriptor, file, 
channel, fd, crc);
         if (descriptor.isCompressed())
-        {
             return new CompressedHintsWriter(directory, descriptor, file, 
channel, fd, crc);
-        }
-        else
-        {
-            return new HintsWriter(directory, descriptor, file, channel, fd, 
crc);
-        }
+        return new HintsWriter(directory, descriptor, file, channel, fd, crc);
     }
 
     HintsDescriptor descriptor()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java 
b/src/java/org/apache/cassandra/security/EncryptionUtils.java
index f95977e..7e72b3e 100644
--- a/src/java/org/apache/cassandra/security/EncryptionUtils.java
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -30,6 +30,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.db.commitlog.EncryptedSegment;
 import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -274,4 +275,39 @@ public class EncryptionUtils
             // nop
         }
     }
+
+    public static class ChannelProxyReadChannel implements ReadableByteChannel
+    {
+        private final ChannelProxy channelProxy;
+        private volatile long currentPosition;
+
+        public ChannelProxyReadChannel(ChannelProxy channelProxy, long 
currentPosition)
+        {
+            this.channelProxy = channelProxy;
+            this.currentPosition = currentPosition;
+        }
+
+        public int read(ByteBuffer dst) throws IOException
+        {
+            int bytesRead = channelProxy.read(dst, currentPosition);
+            dst.flip();
+            currentPosition += bytesRead;
+            return bytesRead;
+        }
+
+        public long getCurrentPosition()
+        {
+            return currentPosition;
+        }
+
+        public boolean isOpen()
+        {
+            return channelProxy.isCleanedUp();
+        }
+
+        public void close()
+        {
+            // nop
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/test/unit/org/apache/cassandra/hints/AlteredHints.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java 
b/test/unit/org/apache/cassandra/hints/AlteredHints.java
new file mode 100644
index 0000000..23dc32a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+/**
+ * Base class for testing compressed and encrypted hints.
+ */
+public abstract class AlteredHints
+{
+    protected static final String KEYSPACE = "hints_compression_test";
+    private static final String TABLE = "table";
+
+    private static Mutation createMutation(int index, long timestamp)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+               .clustering(bytes(index))
+               .add("val", bytes(index))
+               .build();
+    }
+
+    private static Hint createHint(int idx, long baseTimestamp)
+    {
+        long timestamp = baseTimestamp + idx;
+        return Hint.create(createMutation(idx, 
TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+    }
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), 
SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+    }
+
+    abstract ImmutableMap<String, Object> params();
+    abstract boolean looksLegit(HintsWriter writer);
+    abstract boolean looksLegit(ChecksummedDataInput checksummedDataInput);
+
+    public void multiFlushAndDeserializeTest() throws Exception
+    {
+        int hintNum = 0;
+        int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE;
+        List<Hint> hints = new LinkedList<>();
+
+        UUID hostId = UUIDGen.getTimeUUID();
+        long ts = System.currentTimeMillis();
+
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params());
+        File dir = Files.createTempDir();
+        try (HintsWriter writer = HintsWriter.create(dir, descriptor))
+        {
+            Assert.assertTrue(looksLegit(writer));
+
+            ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+            try (HintsWriter.Session session = writer.newSession(writeBuffer))
+            {
+                while (session.getBytesWritten() < bufferSize * 3)
+                {
+                    Hint hint = createHint(hintNum, ts+hintNum);
+                    session.append(hint);
+                    hints.add(hint);
+                    hintNum++;
+                }
+            }
+        }
+
+        try (HintsReader reader = HintsReader.open(new File(dir, 
descriptor.fileName())))
+        {
+            Assert.assertTrue(looksLegit(reader.getInput()));
+            List<Hint> deserialized = new ArrayList<>(hintNum);
+
+            for (HintsReader.Page page: reader)
+            {
+                Iterator<Hint> iterator = page.hintsIterator();
+                while (iterator.hasNext())
+                {
+                    deserialized.add(iterator.next());
+                }
+            }
+
+            Assert.assertEquals(hints.size(), deserialized.size());
+            hintNum = 0;
+            for (Hint expected: hints)
+            {
+                HintsTestUtil.assertHintsEqual(expected, 
deserialized.get(hintNum));
+                hintNum++;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java 
b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
index d6a08ca..f82db49 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -18,65 +18,20 @@
 
 package org.apache.cassandra.hints;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
-import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
-public class HintsCompressionTest
+public class HintsCompressionTest extends AlteredHints
 {
-    private static final String KEYSPACE = "hints_compression_test";
-    private static final String TABLE = "table";
-
-
-    private static Mutation createMutation(int index, long timestamp)
-    {
-        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
-        return new RowUpdateBuilder(table, timestamp, bytes(index))
-               .clustering(bytes(index))
-               .add("val", bytes(index))
-               .build();
-    }
+    private Class<? extends ICompressor> compressorClass;
 
-    private static Hint createHint(int idx, long baseTimestamp)
-    {
-        long timestamp = baseTimestamp + idx;
-        return Hint.create(createMutation(idx, 
TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
-    }
-
-    @BeforeClass
-    public static void defineSchema()
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), 
SchemaLoader.standardCFMD(KEYSPACE, TABLE));
-    }
-
-    private ImmutableMap<String, Object> params(Class<? extends ICompressor> 
compressorClass)
+    ImmutableMap<String, Object> params()
     {
         ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, 
Object>builder()
                                                                      
.put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName())
@@ -86,72 +41,40 @@ public class HintsCompressionTest
                            .build();
     }
 
-    public void multiFlushAndDeserializeTest(Class<? extends ICompressor> 
compressorClass) throws Exception
+    boolean looksLegit(HintsWriter writer)
     {
-        int hintNum = 0;
-        int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE;
-        List<Hint> hints = new LinkedList<>();
-
-        UUID hostId = UUIDGen.getTimeUUID();
-        long ts = System.currentTimeMillis();
-
-        HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, 
params(compressorClass));
-        File dir = Files.createTempDir();
-        try (HintsWriter writer = HintsWriter.create(dir, descriptor))
-        {
-            assert writer instanceof CompressedHintsWriter;
-
-            ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize);
-            try (HintsWriter.Session session = writer.newSession(writeBuffer))
-            {
-                while (session.getBytesWritten() < bufferSize * 3)
-                {
-                    Hint hint = createHint(hintNum, ts+hintNum);
-                    session.append(hint);
-                    hints.add(hint);
-                    hintNum++;
-                }
-            }
-        }
-
-        try (HintsReader reader = HintsReader.open(new File(dir, 
descriptor.fileName())))
-        {
-            List<Hint> deserialized = new ArrayList<>(hintNum);
-
-            for (HintsReader.Page page: reader)
-            {
-                Iterator<Hint> iterator = page.hintsIterator();
-                while (iterator.hasNext())
-                {
-                    deserialized.add(iterator.next());
-                }
-            }
+        if (!(writer instanceof CompressedHintsWriter))
+            return false;
+        CompressedHintsWriter compressedHintsWriter = 
(CompressedHintsWriter)writer;
+        return 
compressedHintsWriter.getCompressor().getClass().isAssignableFrom(compressorClass);
+    }
 
-            Assert.assertEquals(hints.size(), deserialized.size());
-            hintNum = 0;
-            for (Hint expected: hints)
-            {
-                HintsTestUtil.assertHintsEqual(expected, 
deserialized.get(hintNum));
-                hintNum++;
-            }
-        }
+    boolean looksLegit(ChecksummedDataInput checksummedDataInput)
+    {
+        if (!(checksummedDataInput instanceof CompressedChecksummedDataInput))
+            return false;
+        CompressedChecksummedDataInput compressedChecksummedDataInput = 
(CompressedChecksummedDataInput)checksummedDataInput;
+        return 
compressedChecksummedDataInput.getCompressor().getClass().isAssignableFrom(compressorClass);
     }
 
     @Test
     public void lz4Compressor() throws Exception
     {
-        multiFlushAndDeserializeTest(LZ4Compressor.class);
+        compressorClass = LZ4Compressor.class;
+        multiFlushAndDeserializeTest();
     }
 
     @Test
     public void snappyCompressor() throws Exception
     {
-        multiFlushAndDeserializeTest(SnappyCompressor.class);
+        compressorClass = SnappyCompressor.class;
+        multiFlushAndDeserializeTest();
     }
 
     @Test
     public void deflateCompressor() throws Exception
     {
-        multiFlushAndDeserializeTest(DeflateCompressor.class);
+        compressorClass = DeflateCompressor.class;
+        multiFlushAndDeserializeTest();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90033b92/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java 
b/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
new file mode 100644
index 0000000..83b8481
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.util.Arrays;
+
+import javax.crypto.Cipher;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
+
+public class HintsEncryptionTest extends AlteredHints
+{
+    EncryptionContext encryptionContext;
+    Cipher cipher;
+
+    @Before
+    public void setup()
+    {
+        encryptionContext = EncryptionContextGenerator.createContext(true);
+        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+    }
+
+    @Test
+    public void encyptedHints() throws Exception
+    {
+        multiFlushAndDeserializeTest();
+    }
+
+    boolean looksLegit(HintsWriter writer)
+    {
+        if (!(writer instanceof EncryptedHintsWriter))
+            return false;
+
+        EncryptedHintsWriter encryptedHintsWriter = 
(EncryptedHintsWriter)writer;
+        cipher = encryptedHintsWriter.getCipher();
+
+        return 
encryptedHintsWriter.getCompressor().getClass().isAssignableFrom(encryptionContext.getCompressor().getClass());
+    }
+
+    boolean looksLegit(ChecksummedDataInput checksummedDataInput)
+    {
+        if (!(checksummedDataInput instanceof EncryptedChecksummedDataInput))
+            return false;
+
+        EncryptedChecksummedDataInput encryptedDataInput = 
(EncryptedChecksummedDataInput)checksummedDataInput;
+
+        return Arrays.equals(cipher.getIV(), 
encryptedDataInput.getCipher().getIV()) &&
+               
encryptedDataInput.getCompressor().getClass().isAssignableFrom(encryptionContext.getCompressor().getClass());
+    }
+
+    ImmutableMap<String, Object> params()
+    {
+        ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, 
Object>builder()
+                                                         
.putAll(encryptionContext.toHeaderParameters())
+                                                         .build();
+        return ImmutableMap.<String, Object>builder()
+               .put(HintsDescriptor.ENCRYPTION, compressionParams)
+               .build();
+    }
+}

Reply via email to