HBASE-13720 - Mob files are not encrypting in mob compaction and Sweeper
(Jingcheng du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5428c9fd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5428c9fd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5428c9fd

Branch: refs/heads/master
Commit: 5428c9fdd3517889e80b80486010d6b6ceb698b4
Parents: 132f65e
Author: ramkrishna <[email protected]>
Authored: Mon May 25 10:05:20 2015 +0530
Committer: ramkrishna <[email protected]>
Committed: Mon May 25 10:05:20 2015 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/mob/MobUtils.java   | 122 +++++++++++++++++--
 .../PartitionedMobFileCompactor.java            |  12 +-
 .../hbase/mob/mapreduce/MemStoreWrapper.java    |   9 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |   9 +-
 .../filecompactions/TestMobFileCompactor.java   | 106 +++++++++++++++-
 .../hbase/mob/mapreduce/TestMobSweeper.java     | 106 ++++++++++++----
 6 files changed, 317 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5428c9fd/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 9f08c6e..527aef2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mob;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.security.Key;
+import java.security.KeyException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -41,11 +43,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -57,7 +69,13 @@ import 
org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ChecksumType;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
 
 /**
  * The mob utilities
@@ -439,17 +457,19 @@ public class MobUtils {
    * @param compression The compression algorithm.
    * @param startKey The hex string of the start key.
    * @param cacheConfig The current cache config.
+   * @param cryptoContext The encryption context.
    * @return The writer for the mob file.
    * @throws IOException
    */
   public static StoreFile.Writer createWriter(Configuration conf, FileSystem 
fs,
       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
-      Compression.Algorithm compression, String startKey, CacheConfig 
cacheConfig)
+      Compression.Algorithm compression, String startKey, CacheConfig 
cacheConfig,
+      Encryption.Context cryptoContext)
       throws IOException {
     MobFileName mobFileName = MobFileName.create(startKey, date, 
UUID.randomUUID().toString()
         .replaceAll("-", ""));
     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, 
compression,
-      cacheConfig);
+      cacheConfig, cryptoContext);
   }
 
   /**
@@ -460,17 +480,20 @@ public class MobUtils {
    * @param basePath The basic path for a temp directory.
    * @param maxKeyCount The key count.
    * @param cacheConfig The current cache config.
+   * @param cryptoContext The encryption context.
    * @return The writer for the mob file.
    * @throws IOException
    */
   public static StoreFile.Writer createRefFileWriter(Configuration conf, 
FileSystem fs,
-    HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig 
cacheConfig)
+    HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig 
cacheConfig,
+    Encryption.Context cryptoContext)
     throws IOException {
     HFileContext hFileContext = new 
HFileContextBuilder().withIncludesMvcc(true)
       
.withIncludesTags(true).withCompression(family.getCompactionCompression())
       
.withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf))
       
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
-      
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
+      
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
+      .withEncryptionContext(cryptoContext).build();
     Path tempPath = new Path(basePath, 
UUID.randomUUID().toString().replaceAll("-", ""));
     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, 
fs).withFilePath(tempPath)
       
.withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType())
@@ -489,17 +512,19 @@ public class MobUtils {
    * @param compression The compression algorithm.
    * @param startKey The start key.
    * @param cacheConfig The current cache config.
+   * @param cryptoContext The encryption context.
    * @return The writer for the mob file.
    * @throws IOException
    */
   public static StoreFile.Writer createWriter(Configuration conf, FileSystem 
fs,
       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
-      Compression.Algorithm compression, byte[] startKey, CacheConfig 
cacheConfig)
+      Compression.Algorithm compression, byte[] startKey, CacheConfig 
cacheConfig,
+      Encryption.Context cryptoContext)
       throws IOException {
     MobFileName mobFileName = MobFileName.create(startKey, date, 
UUID.randomUUID().toString()
         .replaceAll("-", ""));
     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, 
compression,
-      cacheConfig);
+      cacheConfig, cryptoContext);
   }
 
   /**
@@ -513,18 +538,20 @@ public class MobUtils {
    * @param compression The compression algorithm.
    * @param startKey The start key.
    * @param cacheConfig The current cache config.
+   * @param cryptoContext The encryption context.
    * @return The writer for the del file.
    * @throws IOException
    */
   public static StoreFile.Writer createDelFileWriter(Configuration conf, 
FileSystem fs,
       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
-      Compression.Algorithm compression, byte[] startKey, CacheConfig 
cacheConfig)
+      Compression.Algorithm compression, byte[] startKey, CacheConfig 
cacheConfig,
+      Encryption.Context cryptoContext)
       throws IOException {
     String suffix = UUID
       .randomUUID().toString().replaceAll("-", "") + "_del";
     MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, 
compression,
-      cacheConfig);
+      cacheConfig, cryptoContext);
   }
 
   /**
@@ -537,17 +564,20 @@ public class MobUtils {
    * @param maxKeyCount The key count.
    * @param compression The compression algorithm.
    * @param cacheConfig The current cache config.
+   * @param cryptoContext The encryption context.
    * @return The writer for the mob file.
    * @throws IOException
    */
   private static StoreFile.Writer createWriter(Configuration conf, FileSystem 
fs,
     HColumnDescriptor family, MobFileName mobFileName, Path basePath, long 
maxKeyCount,
-    Compression.Algorithm compression, CacheConfig cacheConfig) throws 
IOException {
+    Compression.Algorithm compression, CacheConfig cacheConfig, 
Encryption.Context cryptoContext)
+    throws IOException {
     HFileContext hFileContext = new 
HFileContextBuilder().withCompression(compression)
       .withIncludesMvcc(true).withIncludesTags(true)
       .withChecksumType(ChecksumType.getDefaultChecksumType())
       
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize())
-      
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
+      
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
+      .withEncryptionContext(cryptoContext).build();
 
     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
       .withFilePath(new Path(basePath, mobFileName.getFileName()))
@@ -739,4 +769,72 @@ public class MobUtils {
     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
     return pool;
   }
+
+  /**
+   * Creates the encyption context.
+   * @param conf The current configuration.
+   * @param family The current column descriptor.
+   * @return The encryption context.
+   * @throws IOException
+   */
+  public static Encryption.Context createEncryptionContext(Configuration conf,
+    HColumnDescriptor family) throws IOException {
+    // TODO the code is repeated, and needs to be unified.
+    Encryption.Context cryptoContext = Encryption.Context.NONE;
+    String cipherName = family.getEncryptionType();
+    if (cipherName != null) {
+      Cipher cipher;
+      Key key;
+      byte[] keyBytes = family.getEncryptionKey();
+      if (keyBytes != null) {
+        // Family provides specific key material
+        String masterKeyName = 
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User
+          .getCurrent().getShortName());
+        try {
+          // First try the master key
+          key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
+        } catch (KeyException e) {
+          // If the current master key fails to unwrap, try the alternate, if
+          // one is configured
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Unable to unwrap key with current master key '" + 
masterKeyName + "'");
+          }
+          String alternateKeyName = 
conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
+          if (alternateKeyName != null) {
+            try {
+              key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
+            } catch (KeyException ex) {
+              throw new IOException(ex);
+            }
+          } else {
+            throw new IOException(e);
+          }
+        }
+        // Use the algorithm the key wants
+        cipher = Encryption.getCipher(conf, key.getAlgorithm());
+        if (cipher == null) {
+          throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is 
not available");
+        }
+        // Fail if misconfigured
+        // We use the encryption type specified in the column schema as a 
sanity check on
+        // what the wrapped key is telling us
+        if (!cipher.getName().equalsIgnoreCase(cipherName)) {
+          throw new RuntimeException("Encryption for family '" + 
family.getNameAsString()
+            + "' configured with type '" + cipherName + "' but key specifies 
algorithm '"
+            + cipher.getName() + "'");
+        }
+      } else {
+        // Family does not provide key material, create a random key
+        cipher = Encryption.getCipher(conf, cipherName);
+        if (cipher == null) {
+          throw new RuntimeException("Cipher '" + cipherName + "' is not 
available");
+        }
+        key = cipher.getRandomKey();
+      }
+      cryptoContext = Encryption.newContext(conf);
+      cryptoContext.setCipher(cipher);
+      cryptoContext.setKey(key);
+    }
+    return cryptoContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5428c9fd/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
index e8729ce..b3c7d83 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mob.MobConstants;
@@ -72,9 +73,10 @@ public class PartitionedMobFileCompactor extends 
MobFileCompactor {
   private Path bulkloadPath;
   private CacheConfig compactionCacheConfig;
   private Tag tableNameTag;
+  private Encryption.Context cryptoContext = Encryption.Context.NONE;
 
   public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, 
TableName tableName,
-    HColumnDescriptor column, ExecutorService pool) {
+    HColumnDescriptor column, ExecutorService pool) throws IOException {
     super(conf, fs, tableName, column, pool);
     mergeableSize = 
conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
       MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
@@ -92,6 +94,7 @@ public class PartitionedMobFileCompactor extends 
MobFileCompactor {
     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
     compactionCacheConfig = new CacheConfig(copyOfConf);
     tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, 
tableName.getName());
+    cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column);
   }
 
   @Override
@@ -347,12 +350,12 @@ public class PartitionedMobFileCompactor extends 
MobFileCompactor {
     try {
       writer = MobUtils.createWriter(conf, fs, column, 
partition.getPartitionId().getDate(),
         tempPath, Long.MAX_VALUE, column.getCompactionCompression(), 
partition.getPartitionId()
-          .getStartKey(), compactionCacheConfig);
+          .getStartKey(), compactionCacheConfig, cryptoContext);
       filePath = writer.getPath();
       byte[] fileName = Bytes.toBytes(filePath.getName());
       // create a temp file and open a writer for it in the bulkloadPath
       refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, 
bulkloadColumnPath, fileInfo
-        .getSecond().longValue(), compactionCacheConfig);
+        .getSecond().longValue(), compactionCacheConfig, cryptoContext);
       refFilePath = refFileWriter.getPath();
       List<Cell> cells = new ArrayList<Cell>();
       boolean hasMore = false;
@@ -457,7 +460,8 @@ public class PartitionedMobFileCompactor extends 
MobFileCompactor {
     try {
       writer = MobUtils.createDelFileWriter(conf, fs, column,
         MobUtils.formatDate(new Date(request.selectionTime)), tempPath, 
Long.MAX_VALUE,
-        column.getCompactionCompression(), HConstants.EMPTY_START_ROW, 
compactionCacheConfig);
+        column.getCompactionCompression(), HConstants.EMPTY_START_ROW, 
compactionCacheConfig,
+        cryptoContext);
       filePath = writer.getPath();
       List<Cell> cells = new ArrayList<Cell>();
       boolean hasMore = false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5428c9fd/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
index 38b4d6f..458e187 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
@@ -75,6 +76,7 @@ public class MemStoreWrapper {
   private Path mobFamilyDir;
   private FileSystem fs;
   private CacheConfig cacheConfig;
+  private Encryption.Context cryptoContext = Encryption.Context.NONE;
 
   public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator 
table, HColumnDescriptor hcd,
       MemStore memstore, CacheConfig cacheConfig) throws IOException {
@@ -88,6 +90,7 @@ public class MemStoreWrapper {
     flushSize = 
this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
         MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
     mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), 
hcd.getNameAsString());
+    cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
   }
 
   public void setPartitionId(SweepPartitionId partitionId) {
@@ -127,9 +130,9 @@ public class MemStoreWrapper {
     }
     // generate the files into a temp directory.
     String tempPathString = 
context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
-    StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
-        partitionId.getDate(), new Path(tempPathString), 
snapshot.getCellsCount(),
-        hcd.getCompactionCompression(), partitionId.getStartKey(), 
cacheConfig);
+    StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, 
partitionId.getDate(),
+      new Path(tempPathString), snapshot.getCellsCount(), 
hcd.getCompactionCompression(),
+      partitionId.getStartKey(), cacheConfig, cryptoContext);
 
     String relativePath = mobFileWriter.getPath().getName();
     LOG.info("Create files under a temp directory " + 
mobFileWriter.getPath().toString());

http://git-wip-us.apache.org/repos/asf/hbase/blob/5428c9fd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index e4bdc74..a667582 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -33,9 +33,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5428c9fd/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
index ba0b620..abdfb94 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.mob.filecompactions;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.security.Key;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,6 +34,8 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import javax.crypto.spec.SecretKeySpec;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,22 +44,42 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.io.crypto.aes.AES;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -91,7 +115,11 @@ public class TestMobFileCompactor {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
     
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
-    
TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
 5000);
+    TEST_UTIL.getConfiguration()
+      .setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
+    TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
+      KeyProviderForTesting.class.getName());
+    
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
"hbase");
     TEST_UTIL.startMiniCluster(1);
     pool = createThreadPool(TEST_UTIL.getConfiguration());
     conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), 
pool);
@@ -211,6 +239,57 @@ public class TestMobFileCompactor {
   }
 
   @Test
+  public void testCompactionWithoutDelFilesAndWithEncryption() throws 
Exception {
+    resetConf();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    SecureRandom rng = new SecureRandom();
+    byte[] keyBytes = new byte[AES.KEY_LENGTH];
+    rng.nextBytes(keyBytes);
+    String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES);
+    Key cfKey = new SecretKeySpec(keyBytes, algorithm);
+    byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
+      conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
User.getCurrent().getShortName()), cfKey);
+    String tableNameAsString = 
"testCompactionWithoutDelFilesAndWithEncryption";
+    TableName tableName = TableName.valueOf(tableNameAsString);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family1);
+    hcd.setMobEnabled(true);
+    hcd.setMobThreshold(0);
+    hcd.setMaxVersions(4);
+    hcd.setEncryptionType(algorithm);
+    hcd.setEncryptionKey(encryptionKey);
+    HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
+    hcd2.setMobEnabled(true);
+    hcd2.setMobThreshold(0);
+    hcd2.setMaxVersions(4);
+    desc.addFamily(hcd);
+    desc.addFamily(hcd2);
+    admin.createTable(desc, getSplitKeys());
+    Table hTable = conn.getTable(tableName);
+    BufferedMutator bufMut = conn.getBufferedMutator(tableName);
+    int count = 4;
+    // generate mob files
+    loadData(admin, bufMut, tableName, count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before compaction: mob rows count", 
regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob file count", regionNum * count,
+      countFiles(tableName, true, family1));
+    assertEquals("Before compaction: del file count", 0, countFiles(tableName, 
false, family1));
+
+    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, 
tableName, hcd, pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After compaction: mob file count", regionNum,
+      countFiles(tableName, true, family1));
+    assertEquals("After compaction: del file count", 0, countFiles(tableName, 
false, family1));
+    Assert.assertTrue(verifyEncryption(tableName, family1));
+  }
+
+  @Test
   public void testCompactionWithDelFiles() throws Exception {
     resetConf();
     int count = 4;
@@ -632,6 +711,27 @@ public class TestMobFileCompactor {
     return count;
   }
 
+  private boolean verifyEncryption(TableName tableName, String familyName) 
throws IOException {
+    Path mobDirPath = 
MobUtils.getMobFamilyPath(MobUtils.getMobRegionPath(conf, tableName),
+      familyName);
+    boolean hasFiles = false;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      hasFiles = files != null && files.length > 0;
+      Assert.assertTrue(hasFiles);
+      Path path = files[0].getPath();
+      CacheConfig cacheConf = new CacheConfig(conf);
+      StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, 
cacheConf,
+        BloomType.NONE);
+      HFile.Reader reader = sf.createReader().getHFileReader();
+      byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
+      Assert.assertTrue(null != encryptionKey);
+      
Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
+        .equals(HConstants.CIPHER_AES));
+    }
+    return hasFiles;
+  }
+
   /**
    * Gets the number of HFileLink in the mob path.
    * @param familyName the family name

http://git-wip-us.apache.org/repos/asf/hbase/blob/5428c9fd/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
index 31778ae..6e6eac9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@ -20,26 +20,40 @@ package org.apache.hadoop.hbase.mob.mapreduce;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.security.Key;
+import java.security.SecureRandom;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 
+import javax.crypto.spec.SecretKeySpec;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.io.crypto.aes.AES;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -48,7 +62,7 @@ import org.junit.experimental.categories.Category;
 @Category(MediumTests.class)
 public class TestMobSweeper {
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-  private String tableName;
+  private TableName tableName;
   private final static String row = "row_";
   private final static String family = "family";
   private final static String column = "column";
@@ -61,9 +75,14 @@ public class TestMobSweeper {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
     
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", 
true);
-    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15); // 
avoid major compactions
-    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30); // 
avoid major compactions
+    // avoid major compactions
+    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15);
+    // avoid major compactions
+    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30);
     TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
+      KeyProviderForTesting.class.getName());
+    
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
"hbase");
 
     TEST_UTIL.startMiniCluster();
 
@@ -76,36 +95,34 @@ public class TestMobSweeper {
     TEST_UTIL.shutdownMiniMapReduceCluster();
   }
 
-  @SuppressWarnings("deprecation")
   @Before
   public void setUp() throws Exception {
     long tid = System.currentTimeMillis();
-    tableName = "testSweeper" + tid;
+    tableName = TableName.valueOf("testSweeper" + tid);
     HTableDescriptor desc = new HTableDescriptor(tableName);
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setMobEnabled(true);
-    hcd.setMobThreshold(3L);
+    hcd.setMobThreshold(0);
     hcd.setMaxVersions(4);
     desc.addFamily(hcd);
 
     admin = TEST_UTIL.getHBaseAdmin();
     admin.createTable(desc);
     Connection c = 
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
-    TableName tn = TableName.valueOf(tableName);
-    table = c.getTable(tn);
-    bufMut = c.getBufferedMutator(tn);
+    table = c.getTable(tableName);
+    bufMut = c.getBufferedMutator(tableName);
   }
 
   @After
   public void tearDown() throws Exception {
-    admin.disableTable(TableName.valueOf(tableName));
-    admin.deleteTable(TableName.valueOf(tableName));
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
     admin.close();
   }
 
-  private Path getMobFamilyPath(Configuration conf, String tableNameStr,
+  private Path getMobFamilyPath(Configuration conf, TableName tableName,
                                 String familyName) {
-    Path p = new Path(MobUtils.getMobRegionPath(conf, 
TableName.valueOf(tableNameStr)),
+    Path p = new Path(MobUtils.getMobRegionPath(conf, tableName),
             familyName);
     return p;
   }
@@ -117,7 +134,7 @@ public class TestMobSweeper {
     return sb.toString();
   }
 
-  private void generateMobTable(Admin admin, BufferedMutator table, String 
tableName, int count,
+  private void generateMobTable(Admin admin, BufferedMutator table, TableName 
tableName, int count,
     int flushStep) throws IOException, InterruptedException {
     if (count <= 0 || flushStep <= 0)
       return;
@@ -131,11 +148,11 @@ public class TestMobSweeper {
       table.mutate(put);
       if (index++ % flushStep == 0) {
         table.flush();
-        admin.flush(TableName.valueOf(tableName));
+        admin.flush(tableName);
       }
     }
     table.flush();
-    admin.flush(TableName.valueOf(tableName));
+    admin.flush(tableName);
   }
 
   @Test
@@ -173,7 +190,7 @@ public class TestMobSweeper {
     conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
 
     String[] args = new String[2];
-    args[0] = tableName;
+    args[0] = tableName.getNameAsString();
     args[1] = family;
     assertEquals(0, ToolRunner.run(conf, new Sweeper(), args));
 
@@ -208,8 +225,8 @@ public class TestMobSweeper {
             .equalsIgnoreCase(mobFilesSet.iterator().next()));
   }
 
-  private void testCompactionDelaySweeperInternal(Table table, BufferedMutator 
bufMut, String tableName)
-    throws Exception {
+  private void testCompactionDelaySweeperInternal(Table table, BufferedMutator 
bufMut,
+    TableName tableName, boolean encrypted) throws Exception {
     int count = 10;
     //create table and generate 10 mob files
     generateMobTable(admin, bufMut, tableName, count, 1);
@@ -242,7 +259,7 @@ public class TestMobSweeper {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
     String[] args = new String[2];
-    args[0] = tableName;
+    args[0] = tableName.getNameAsString();
     args[1] = family;
     assertEquals(0, ToolRunner.run(conf, new Sweeper(), args));
 
@@ -268,18 +285,61 @@ public class TestMobSweeper {
     assertEquals(1, mobFilesScannedAfterJob.size());
 
     fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
+    Path lastFilePath = null;
     mobFilesSet = new TreeSet<String>();
     for (FileStatus status : fileStatuses) {
       mobFilesSet.add(status.getPath().getName());
+      lastFilePath = status.getPath();
     }
     assertEquals(1, mobFilesSet.size());
     assertEquals(true, mobFilesScannedAfterJob.iterator().next()
             .equalsIgnoreCase(mobFilesSet.iterator().next()));
+    if (encrypted) {
+      // assert the encryption context
+      CacheConfig cacheConf = new CacheConfig(conf);
+      StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), 
lastFilePath, conf, cacheConf,
+        BloomType.NONE);
+      HFile.Reader reader = sf.createReader().getHFileReader();
+      byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
+      Assert.assertTrue(null != encryptionKey);
+      
Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
+        .equals(HConstants.CIPHER_AES));
+    }
+  }
+
+  @Test
+  public void testCompactionDelaySweeperWithEncryption() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    SecureRandom rng = new SecureRandom();
+    byte[] keyBytes = new byte[AES.KEY_LENGTH];
+    rng.nextBytes(keyBytes);
+    String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES);
+    Key cfKey = new SecretKeySpec(keyBytes, algorithm);
+    byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
+      conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 
User.getCurrent().getShortName()), cfKey);
+    String tableNameAsString = "testCompactionDelaySweeperWithEncryption";
+    TableName tableName = TableName.valueOf(tableNameAsString);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setMobEnabled(true);
+    hcd.setMobThreshold(0);
+    hcd.setMaxVersions(4);
+    hcd.setEncryptionType(algorithm);
+    hcd.setEncryptionKey(encryptionKey);
+    desc.addFamily(hcd);
+    admin.createTable(desc);
+    Connection c = 
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    BufferedMutator bufMut = c.getBufferedMutator(tableName);
+    Table table = c.getTable(tableName);
+    testCompactionDelaySweeperInternal(table, bufMut, tableName, true);
+    table.close();
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
   }
 
   @Test
   public void testCompactionDelaySweeper() throws Exception {
-    testCompactionDelaySweeperInternal(table, bufMut, tableName);
+    testCompactionDelaySweeperInternal(table, bufMut, tableName, false);
   }
 
   @Test
@@ -292,14 +352,14 @@ public class TestMobSweeper {
     HTableDescriptor desc = new HTableDescriptor(tableName);
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setMobEnabled(true);
-    hcd.setMobThreshold(3L);
+    hcd.setMobThreshold(0);
     hcd.setMaxVersions(4);
     desc.addFamily(hcd);
     admin.createTable(desc);
     Connection c = 
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
     BufferedMutator bufMut = c.getBufferedMutator(tableName);
     Table table = c.getTable(tableName);
-    testCompactionDelaySweeperInternal(table, bufMut, tableNameAsString);
+    testCompactionDelaySweeperInternal(table, bufMut, tableName, false);
     table.close();
     admin.disableTable(tableName);
     admin.deleteTable(tableName);

Reply via email to