Repository: hadoop
Updated Branches:
  refs/heads/trunk 8a99eba96 -> 2f73396b5


HDFS-6708. StorageType should be encoded in the block token. Contributed by 
Ewan Higgs


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2f73396b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2f73396b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2f73396b

Branch: refs/heads/trunk
Commit: 2f73396b5901fd5fe29f6cd76fc1b3134b854b37
Parents: 8a99eba
Author: Chris Douglas <cdoug...@apache.org>
Authored: Tue Apr 25 23:57:00 2017 -0700
Committer: Chris Douglas <cdoug...@apache.org>
Committed: Tue Apr 25 23:57:00 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |   3 +
 .../token/block/BlockTokenIdentifier.java       |  40 +++-
 .../src/main/proto/hdfs.proto                   |   1 +
 .../block/BlockPoolTokenSecretManager.java      |  30 +--
 .../token/block/BlockTokenSecretManager.java    |  65 ++++++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java |   3 +-
 .../hadoop/hdfs/server/balancer/KeyManager.java |   8 +-
 .../server/blockmanagement/BlockManager.java    |   6 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  21 ++-
 .../hdfs/server/datanode/DataXceiver.java       |  57 ++++--
 .../erasurecode/StripedBlockReader.java         |   4 +-
 .../erasurecode/StripedBlockWriter.java         |   3 +-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |  70 +++++++
 .../security/token/block/TestBlockToken.java    | 182 +++++++++++++++----
 14 files changed, 397 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index e703a94..dd55203 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -635,6 +635,9 @@ public class PBHelperClient {
         blockTokenSecret.getAccessModes()) {
       builder.addModes(convert(aMode));
     }
+    for (StorageType storageType : blockTokenSecret.getStorageTypes()) {
+      builder.addStorageTypes(convertStorageType(storageType));
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 28e7acc..228a7b6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -22,10 +22,13 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.Optional;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
@@ -49,21 +52,24 @@ public class BlockTokenIdentifier extends TokenIdentifier {
   private String blockPoolId;
   private long blockId;
   private final EnumSet<AccessMode> modes;
+  private StorageType[] storageTypes;
   private boolean useProto;
 
   private byte [] cache;
 
   public BlockTokenIdentifier() {
-    this(null, null, 0, EnumSet.noneOf(AccessMode.class), false);
+    this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false);
   }
 
   public BlockTokenIdentifier(String userId, String bpid, long blockId,
-      EnumSet<AccessMode> modes, boolean useProto) {
+      EnumSet<AccessMode> modes, StorageType[] storageTypes, boolean useProto) 
{
     this.cache = null;
     this.userId = userId;
     this.blockPoolId = bpid;
     this.blockId = blockId;
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+    this.storageTypes = Optional.ofNullable(storageTypes)
+                                .orElse(StorageType.EMPTY_ARRAY);
     this.useProto = useProto;
   }
 
@@ -115,13 +121,18 @@ public class BlockTokenIdentifier extends TokenIdentifier 
{
     return modes;
   }
 
+  public StorageType[] getStorageTypes(){
+    return storageTypes;
+  }
+
   @Override
   public String toString() {
     return "block_token_identifier (expiryDate=" + this.getExpiryDate()
         + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
         + ", blockPoolId=" + this.getBlockPoolId()
         + ", blockId=" + this.getBlockId() + ", access modes="
-        + this.getAccessModes() + ")";
+        + this.getAccessModes() + ", storageTypes= "
+        + Arrays.toString(this.getStorageTypes()) + ")";
   }
 
   static boolean isEqual(Object a, Object b) {
@@ -139,7 +150,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
           && isEqual(this.userId, that.userId)
           && isEqual(this.blockPoolId, that.blockPoolId)
           && this.blockId == that.blockId
-          && isEqual(this.modes, that.modes);
+          && isEqual(this.modes, that.modes)
+          && Arrays.equals(this.storageTypes, that.storageTypes);
     }
     return false;
   }
@@ -148,7 +160,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
   public int hashCode() {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
         ^ (userId == null ? 0 : userId.hashCode())
-        ^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
+        ^ (blockPoolId == null ? 0 : blockPoolId.hashCode())
+        ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes));
   }
 
   /**
@@ -200,6 +213,13 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (int i = 0; i < length; i++) {
       modes.add(WritableUtils.readEnum(in, AccessMode.class));
     }
+
+    length = WritableUtils.readVInt(in);
+    StorageType[] readStorageTypes = new StorageType[length];
+    for (int i = 0; i < length; i++) {
+      readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class);
+    }
+    storageTypes = readStorageTypes;
     useProto = false;
   }
 
@@ -224,6 +244,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
       AccessModeProto accessModeProto = blockTokenSecretProto.getModes(i);
       modes.add(PBHelperClient.convert(accessModeProto));
     }
+
+    storageTypes = blockTokenSecretProto.getStorageTypesList().stream()
+        .map(PBHelperClient::convertStorageType)
+        .toArray(StorageType[]::new);
     useProto = true;
   }
 
@@ -247,6 +271,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (AccessMode aMode : modes) {
       WritableUtils.writeEnum(out, aMode);
     }
+    WritableUtils.writeVInt(out, storageTypes.length);
+    for (StorageType type: storageTypes){
+      WritableUtils.writeEnum(out, type);
+    }
   }
 
   @VisibleForTesting
@@ -269,4 +297,4 @@ public class BlockTokenIdentifier extends TokenIdentifier {
       return KIND_NAME;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 3e3994c..a34e512 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -550,4 +550,5 @@ message BlockTokenSecretProto {
   optional string blockPoolId = 4;
   optional uint64 blockId = 5;
   repeated AccessModeProto modes = 6;
+  repeated StorageTypeProto storageTypes = 7;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
index 3e315ee..29fb73f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
 
 /**
  * Manages a {@link BlockTokenSecretManager} per block pool. Routes the 
requests
@@ -81,21 +82,27 @@ public class BlockPoolTokenSecretManager extends
   }
 
   /**
-   * See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier, 
-   *                String, ExtendedBlock, BlockTokenIdentifier.AccessMode)}
+   * See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
+   *                String, ExtendedBlock, BlockTokenIdentifier.AccessMode,
+   *                StorageType[])}
    */
   public void checkAccess(BlockTokenIdentifier id, String userId,
-      ExtendedBlock block, AccessMode mode) throws InvalidToken {
-    get(block.getBlockPoolId()).checkAccess(id, userId, block, mode);
+      ExtendedBlock block, AccessMode mode,
+      StorageType[] storageTypes) throws InvalidToken {
+    get(block.getBlockPoolId()).checkAccess(id, userId, block, mode,
+        storageTypes);
   }
 
   /**
-   * See {@link BlockTokenSecretManager#checkAccess(Token, String, 
-   *                ExtendedBlock, BlockTokenIdentifier.AccessMode)}
+   * See {@link BlockTokenSecretManager#checkAccess(Token, String,
+   *                ExtendedBlock, BlockTokenIdentifier.AccessMode,
+   *                StorageType[])}.
    */
   public void checkAccess(Token<BlockTokenIdentifier> token,
-      String userId, ExtendedBlock block, AccessMode mode) throws InvalidToken 
{
-    get(block.getBlockPoolId()).checkAccess(token, userId, block, mode);
+      String userId, ExtendedBlock block, AccessMode mode,
+      StorageType[] storageTypes) throws InvalidToken {
+    get(block.getBlockPoolId()).checkAccess(token, userId, block, mode,
+        storageTypes);
   }
 
   /**
@@ -107,11 +114,12 @@ public class BlockPoolTokenSecretManager extends
   }
 
   /**
-   * See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet)}
+   * See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet,
+   *  StorageType[])}
    */
   public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b,
-      EnumSet<AccessMode> of) throws IOException {
-    return get(b.getBlockPoolId()).generateToken(b, of);
+      EnumSet<AccessMode> of, StorageType[] storageTypes) throws IOException {
+    return get(b.getBlockPoolId()).generateToken(b, of, storageTypes);
   }
   
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index a3100d0..f3bec83 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -22,10 +22,13 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -41,6 +44,7 @@ import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.StorageType;
 
 /**
  * BlockTokenSecretManager can be instantiated in 2 modes, master mode
@@ -242,17 +246,19 @@ public class BlockTokenSecretManager extends
 
   /** Generate an block token for current user */
   public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
-      EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
+      EnumSet<BlockTokenIdentifier.AccessMode> modes,
+      StorageType[] storageTypes) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String userID = (ugi == null ? null : ugi.getShortUserName());
-    return generateToken(userID, block, modes);
+    return generateToken(userID, block, modes, storageTypes);
   }
 
   /** Generate a block token for a specified user */
   public Token<BlockTokenIdentifier> generateToken(String userId,
-      ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) 
throws IOException {
+      ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
+      StorageType[] storageTypes) throws IOException {
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
-        .getBlockPoolId(), block.getBlockId(), modes, useProto);
+        .getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto);
     return new Token<BlockTokenIdentifier>(id, this);
   }
 
@@ -260,9 +266,22 @@ public class BlockTokenSecretManager extends
    * Check if access should be allowed. userID is not checked if null. This
    * method doesn't check if token password is correct. It should be used only
    * when token password has already been verified (e.g., in the RPC layer).
+   *
+   * Some places need to check the access using StorageTypes and for other
+   * places the StorageTypes is not relevant.
    */
   public void checkAccess(BlockTokenIdentifier id, String userId,
-      ExtendedBlock block, BlockTokenIdentifier.AccessMode mode) throws 
InvalidToken {
+      ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
+      StorageType[] storageTypes) throws InvalidToken {
+    checkAccess(id, userId, block, mode);
+    if (storageTypes != null && storageTypes.length > 0) {
+      checkAccess(id.getStorageTypes(), storageTypes);
+    }
+  }
+
+  public void checkAccess(BlockTokenIdentifier id, String userId,
+      ExtendedBlock block, BlockTokenIdentifier.AccessMode mode)
+      throws InvalidToken {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Checking access for user=" + userId + ", block=" + block
           + ", access mode=" + mode + " using " + id.toString());
@@ -289,9 +308,41 @@ public class BlockTokenSecretManager extends
     }
   }
 
+  /**
+   * Check if the requested StorageTypes match the StorageTypes in the
+   * BlockTokenIdentifier.
+   * Empty candidateStorageTypes specifiers mean 'all is permitted'. They
+   * would otherwise be nonsensical.
+   */
+  public static void checkAccess(StorageType[] candidateStorageTypes,
+      StorageType[] storageTypesRequested) throws InvalidToken {
+    if (storageTypesRequested.length == 0) {
+      throw new InvalidToken("The request has no StorageTypes. "
+          + "This is probably a configuration error.");
+    }
+    if (candidateStorageTypes.length == 0) {
+      return;
+    }
+
+    List<StorageType> unseenCandidates = new ArrayList<StorageType>();
+    unseenCandidates.addAll(Arrays.asList(candidateStorageTypes));
+    for (StorageType storageType : storageTypesRequested) {
+      final int index = unseenCandidates.indexOf(storageType);
+      if (index == -1) {
+        throw new InvalidToken("Block token with StorageTypes "
+            + Arrays.toString(candidateStorageTypes)
+            + " not valid for access with StorageTypes "
+            + Arrays.toString(storageTypesRequested));
+      }
+      Collections.swap(unseenCandidates, index, unseenCandidates.size()-1);
+      unseenCandidates.remove(unseenCandidates.size()-1);
+    }
+  }
+
   /** Check if access should be allowed. userID is not checked if null */
   public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
-      ExtendedBlock block, BlockTokenIdentifier.AccessMode mode) throws 
InvalidToken {
+      ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
+      StorageType[] storageTypes) throws InvalidToken {
     BlockTokenIdentifier id = new BlockTokenIdentifier();
     try {
       id.readFields(new DataInputStream(new ByteArrayInputStream(token
@@ -301,7 +352,7 @@ public class BlockTokenSecretManager extends
           "Unable to de-serialize block token identifier for user=" + userId
               + ", block=" + block + ", access mode=" + mode);
     }
-    checkAccess(id, userId, block, mode);
+    checkAccess(id, userId, block, mode, storageTypes);
     if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
       throw new InvalidToken("Block token with " + id.toString()
           + " doesn't have the correct token password");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index ceccff5..dc81901 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -355,7 +355,8 @@ public class Dispatcher {
         ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
             reportedBlock.getBlock());
         final KeyManager km = nnc.getKeyManager(); 
-        Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
+        Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
+            new StorageType[]{target.storageType});
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
             unbufIn, km, accessToken, target.getDatanodeInfo());
         unbufOut = saslStreams.out;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 0aa6fb2..06bf07f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
@@ -93,8 +94,8 @@ public class KeyManager implements Closeable, 
DataEncryptionKeyFactory {
   }
 
   /** Get an access token for a block. */
-  public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
-      ) throws IOException {
+  public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
+      StorageType[] storageTypes) throws IOException {
     if (!isBlockTokenEnabled) {
       return BlockTokenSecretManager.DUMMY_TOKEN;
     } else {
@@ -103,7 +104,8 @@ public class KeyManager implements Closeable, 
DataEncryptionKeyFactory {
             "Cannot get access token since BlockKeyUpdater is not running");
       }
       return blockTokenSecretManager.generateToken(null, eb,
-          EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE, 
BlockTokenIdentifier.AccessMode.COPY));
+          EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE,
+              BlockTokenIdentifier.AccessMode.COPY), storageTypes);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 285acde..7309846 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1283,15 +1283,15 @@ public class BlockManager implements BlockStatsMXBean {
           internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
           blockTokens[i] = blockTokenSecretManager.generateToken(
               NameNode.getRemoteUser().getShortUserName(),
-              internalBlock, EnumSet.of(mode));
+              internalBlock, EnumSet.of(mode), b.getStorageTypes());
         }
         sb.setBlockTokens(blockTokens);
       } else {
         b.setBlockToken(blockTokenSecretManager.generateToken(
             NameNode.getRemoteUser().getShortUserName(),
-            b.getBlock(), EnumSet.of(mode)));
+            b.getBlock(), EnumSet.of(mode), b.getStorageTypes()));
       }
-    }    
+    }
   }
 
   void addKeyUpdateCommand(final List<DatanodeCommand> cmds,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 4b7e052..fbed595 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1929,8 +1929,9 @@ public class DataNode extends ReconfigurableBase
     return fis;
   }
 
-  private void checkBlockToken(ExtendedBlock block, 
Token<BlockTokenIdentifier> token,
-      AccessMode accessMode) throws IOException {
+  private void checkBlockToken(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token, AccessMode accessMode)
+      throws IOException {
     if (isBlockTokenEnabled) {
       BlockTokenIdentifier id = new BlockTokenIdentifier();
       ByteArrayInputStream buf = new 
ByteArrayInputStream(token.getIdentifier());
@@ -1939,7 +1940,8 @@ public class DataNode extends ReconfigurableBase
       if (LOG.isDebugEnabled()) {
         LOG.debug("Got: " + id.toString());
       }
-      blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode);
+      blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode,
+          null);
     }
   }
 
@@ -2450,8 +2452,9 @@ public class DataNode extends ReconfigurableBase
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, 
-            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
+            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+            targetStorageTypes);
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
@@ -2534,11 +2537,13 @@ public class DataNode extends ReconfigurableBase
    * Use BlockTokenSecretManager to generate block token for current user.
    */
   public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
-      EnumSet<AccessMode> mode) throws IOException {
+      EnumSet<AccessMode> mode,
+      StorageType[] storageTypes) throws IOException {
     Token<BlockTokenIdentifier> accessToken = 
         BlockTokenSecretManager.DUMMY_TOKEN;
     if (isBlockTokenEnabled) {
-      accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
+      accessToken = blockPoolTokenSecretManager.generateToken(b, mode,
+          storageTypes);
     }
     return accessToken;
   }
@@ -2911,7 +2916,7 @@ public class DataNode extends ReconfigurableBase
           LOG.debug("Got: " + id.toString());
         }
         blockPoolTokenSecretManager.checkAccess(id, null, block,
-            BlockTokenIdentifier.AccessMode.READ);
+            BlockTokenIdentifier.AccessMode.READ, null);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 706d93a..cc13799 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -573,9 +573,9 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Sending block " + block);
     OutputStream baseStream = getOutputStream();
     DataOutputStream out = getBufferedOutputStream();
-    checkAccess(out, true, block, blockToken,
-        Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
-  
+    checkAccess(out, true, block, blockToken, Op.READ_BLOCK,
+        BlockTokenIdentifier.AccessMode.READ);
+
     // send the block
     BlockSender blockSender = null;
     DatanodeRegistration dnR = 
@@ -685,9 +685,17 @@ class DataXceiver extends Receiver implements Runnable {
     long size = 0;
     // reply to upstream datanode or client 
     final DataOutputStream replyOut = getBufferedOutputStream();
-    checkAccess(replyOut, isClient, block, blockToken,
-        Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
-    // check single target for transfer-RBW/Finalized 
+
+    int nst = targetStorageTypes.length;
+    StorageType[] storageTypes = new StorageType[nst + 1];
+    storageTypes[0] = storageType;
+    if (targetStorageTypes.length > 0) {
+      System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
+    }
+    checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
+        BlockTokenIdentifier.AccessMode.WRITE, storageTypes);
+
+    // check single target for transfer-RBW/Finalized
     if (isTransfer && targets.length > 0) {
       throw new IOException(stage + " does not support multiple targets "
           + Arrays.asList(targets));
@@ -927,8 +935,8 @@ class DataXceiver extends Receiver implements Runnable {
 
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
-    checkAccess(out, true, blk, blockToken,
-        Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
+    checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK,
+        BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes);
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets,
           targetStorageTypes, clientName);
@@ -949,9 +957,8 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Getting checksum for block " + block);
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
-    checkAccess(out, true, block, blockToken,
-        Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
-
+    checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM,
+        BlockTokenIdentifier.AccessMode.READ);
     BlockChecksumComputer maker =
         new ReplicatedBlockChecksumComputer(datanode, block);
 
@@ -985,11 +992,12 @@ class DataXceiver extends Receiver implements Runnable {
   public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
       final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
       throws IOException {
+    final ExtendedBlock block = stripedBlockInfo.getBlock();
     updateCurrentThreadName("Getting checksum for block group" +
-        stripedBlockInfo.getBlock());
+        block);
     final DataOutputStream out = new DataOutputStream(getOutputStream());
-    checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
-        Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
+    checkAccess(out, true, block, blockToken, Op.BLOCK_GROUP_CHECKSUM,
+        BlockTokenIdentifier.AccessMode.READ);
 
     AbstractBlockChecksumComputer maker =
         new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
@@ -1027,8 +1035,8 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);
     DataOutputStream reply = getBufferedOutputStream();
-    checkAccess(reply, true, block, blockToken,
-        Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
+    checkAccess(reply, true, block, blockToken, Op.COPY_BLOCK,
+        BlockTokenIdentifier.AccessMode.COPY);
 
     if (datanode.data.getPinning(block)) {
       String msg = "Not able to copy block " + block.getBlockId() + " " +
@@ -1101,7 +1109,8 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
     DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     checkAccess(replyOut, true, block, blockToken,
-        Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE);
+        Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE,
+        new StorageType[]{ storageType });
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to receive block " + block.getBlockId() +
@@ -1353,11 +1362,18 @@ class DataXceiver extends Receiver implements Runnable {
     throw new IOException("Not ready to serve the block pool, " + bpId + ".");
   }
 
-  private void checkAccess(OutputStream out, final boolean reply, 
+  private void checkAccess(OutputStream out, final boolean reply,
+      ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op,
+      BlockTokenIdentifier.AccessMode mode) throws IOException {
+    checkAccess(out, reply, blk, t, op, mode, null);
+  }
+
+  private void checkAccess(OutputStream out, final boolean reply,
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final Op op,
-      final BlockTokenIdentifier.AccessMode mode) throws IOException {
+      final BlockTokenIdentifier.AccessMode mode,
+      final StorageType[] storageTypes) throws IOException {
     checkAndWaitForBP(blk);
     if (datanode.isBlockTokenEnabled) {
       if (LOG.isDebugEnabled()) {
@@ -1365,7 +1381,8 @@ class DataXceiver extends Receiver implements Runnable {
             + "' with mode '" + mode + "'");
       }
       try {
-        datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
+        datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode,
+            storageTypes);
       } catch(InvalidToken e) {
         try {
           if (reply) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 556158c..b3884c2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
@@ -108,7 +109,8 @@ class StripedBlockReader {
       InetSocketAddress dnAddr =
           stripedReader.getSocketAddress4Transfer(source);
       Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
-          block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
+          block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
+          StorageType.EMPTY_ARRAY);
         /*
          * This can be further improved if the replica is local, then we can
          * read directly from DN and need to check the replica is FINALIZED

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index d999202..a6989d4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -116,7 +116,8 @@ class StripedBlockWriter {
 
       Token<BlockTokenIdentifier> blockToken =
           datanode.getBlockAccessToken(block,
-              EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+              EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+              new StorageType[]{storageType});
 
       long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
       OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index a3c0aa4..b6884da 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.*;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -44,9 +45,11 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.Assert;
+import static org.junit.Assert.fail;
 import org.junit.Test;
 
 /** Test {@link BlockStoragePolicy} */
@@ -1397,4 +1400,71 @@ public class TestBlockStoragePolicy {
     }
   }
 
+  @Test
+  public void testStorageTypeCheckAccess(){
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DEFAULT},
+        new StorageType[]{StorageType.DEFAULT}, true);
+
+    testStorageTypeCheckAccessResult(StorageType.EMPTY_ARRAY,
+        StorageType.EMPTY_ARRAY, false);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
+        StorageType.EMPTY_ARRAY, false);
+
+    testStorageTypeCheckAccessResult(StorageType.EMPTY_ARRAY,
+        new StorageType[]{StorageType.RAM_DISK}, true);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.DISK}, true);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.DISK, StorageType.DISK, 
StorageType.DISK},
+        false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.DISK, StorageType.DISK, 
StorageType.DISK},
+        new StorageType[]{StorageType.DISK, StorageType.DISK, 
StorageType.DISK},
+        true);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD},
+        new StorageType[]{StorageType.DISK, StorageType.RAM_DISK,
+            StorageType.SSD},
+        false);
+
+    testStorageTypeCheckAccessResult(
+        new StorageType[]{StorageType.DISK, StorageType.SSD},
+        new StorageType[]{StorageType.SSD},
+        true);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.RAM_DISK}, false);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.ARCHIVE},
+        false);
+
+    testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
+        StorageType.SSD, StorageType.ARCHIVE},
+        new StorageType[]{StorageType.DISK}, false);
+  }
+
+  private void testStorageTypeCheckAccessResult(StorageType[] requested,
+      StorageType[] allowed, boolean expAccess) {
+    try {
+      BlockTokenSecretManager.checkAccess(requested, allowed);
+      if (!expAccess) {
+        fail("No expected access with allowed StorageTypes "
+            + Arrays.toString(allowed) + " and requested StorageTypes "
+            + Arrays.toString(requested));
+      }
+    } catch (SecretManager.InvalidToken e) {
+      if (expAccess) {
+        fail("Expected access with allowed StorageTypes "
+            + Arrays.toString(allowed) + " and requested StorageTypes "
+            + Arrays.toString(requested));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f73396b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index ecb63ae..e98207f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.security.token.block;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
@@ -73,6 +74,7 @@ import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -88,6 +90,7 @@ import org.mockito.stubbing.Answer;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.fs.StorageType;
 
 /** Unit tests for block tokens */
 public class TestBlockToken {
@@ -102,7 +105,9 @@ public class TestBlockToken {
     GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
   }
 
-  /** Directory where we can count our open file descriptors under Linux */
+  /**
+   * Directory where we can count our open file descriptors under Linux
+   */
   static final File FD_DIR = new File("/proc/self/fd/");
 
   final long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
@@ -124,7 +129,7 @@ public class TestBlockToken {
     final BlockTokenIdentifier ident;
 
     public GetLengthAnswer(BlockTokenSecretManager sm,
-        BlockTokenIdentifier ident) {
+                           BlockTokenIdentifier ident) {
       this.sm = sm;
       this.ident = ident;
     }
@@ -145,7 +150,8 @@ public class TestBlockToken {
         LOG.info("Got: " + id.toString());
         assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
         sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
-            BlockTokenIdentifier.AccessMode.WRITE);
+            BlockTokenIdentifier.AccessMode.WRITE,
+            new StorageType[]{StorageType.DEFAULT});
         result = id.getBlockId();
       }
       return GetReplicaVisibleLengthResponseProto.newBuilder()
@@ -154,10 +160,11 @@ public class TestBlockToken {
   }
 
   private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
-      ExtendedBlock block,
-      EnumSet<BlockTokenIdentifier.AccessMode> accessModes)
-      throws IOException {
-    Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
+                                               ExtendedBlock block,
+                                               
EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
+                                               StorageType... storageTypes) 
throws IOException {
+    Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
+        storageTypes);
     BlockTokenIdentifier id = sm.createIdentifier();
     id.readFields(new DataInputStream(new ByteArrayInputStream(token
         .getIdentifier())));
@@ -169,12 +176,31 @@ public class TestBlockToken {
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
         blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
         enableProtobuf);
+    TestWritable.testWritable(generateTokenId(sm, block3,
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
+        StorageType.DEFAULT));
+    TestWritable.testWritable(generateTokenId(sm, block3,
+        EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+        StorageType.DEFAULT));
+    TestWritable.testWritable(generateTokenId(sm, block3,
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
+        StorageType.DEFAULT));
     TestWritable.testWritable(generateTokenId(sm, block1,
-        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
+        StorageType.DEFAULT));
     TestWritable.testWritable(generateTokenId(sm, block2,
-        EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)));
+        EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+        StorageType.DEFAULT));
+    TestWritable.testWritable(generateTokenId(sm, block3,
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
+        StorageType.DEFAULT));
+    // We must be backwards compatible when adding storageType
     TestWritable.testWritable(generateTokenId(sm, block3,
-        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)));
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
+        (StorageType[]) null));
+    TestWritable.testWritable(generateTokenId(sm, block3,
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
+        StorageType.EMPTY_ARRAY));
   }
 
   @Test
@@ -187,29 +213,37 @@ public class TestBlockToken {
     testWritable(true);
   }
 
+  private static void checkAccess(BlockTokenSecretManager m,
+      Token<BlockTokenIdentifier> t, ExtendedBlock blk,
+      BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken {
+    m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT 
});
+  }
+
   private void tokenGenerationAndVerification(BlockTokenSecretManager master,
-      BlockTokenSecretManager slave) throws Exception {
+      BlockTokenSecretManager slave, StorageType... storageTypes)
+      throws Exception {
     // single-mode tokens
     for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
         .values()) {
       // generated by master
       Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
-          EnumSet.of(mode));
-      master.checkAccess(token1, null, block1, mode);
-      slave.checkAccess(token1, null, block1, mode);
+          EnumSet.of(mode), storageTypes);
+      checkAccess(master, token1, block1, mode);
+      checkAccess(slave, token1, block1, mode);
       // generated by slave
       Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
-          EnumSet.of(mode));
-      master.checkAccess(token2, null, block2, mode);
-      slave.checkAccess(token2, null, block2, mode);
+          EnumSet.of(mode), storageTypes);
+      checkAccess(master, token2, block2, mode);
+      checkAccess(slave, token2, block2, mode);
     }
     // multi-mode tokens
     Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
-        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
+        storageTypes);
     for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
         .values()) {
-      master.checkAccess(mtoken, null, block3, mode);
-      slave.checkAccess(mtoken, null, block3, mode);
+      checkAccess(master, mtoken, block3, mode);
+      checkAccess(slave, mtoken, block3, mode);
     }
   }
 
@@ -224,13 +258,19 @@ public class TestBlockToken {
         enableProtobuf);
     ExportedBlockKeys keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
-    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    tokenGenerationAndVerification(masterHandler, slaveHandler,
+        StorageType.DEFAULT);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
     // key updating
     masterHandler.updateKeys();
-    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    tokenGenerationAndVerification(masterHandler, slaveHandler,
+        StorageType.DEFAULT);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
     keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
-    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    tokenGenerationAndVerification(masterHandler, slaveHandler,
+        StorageType.DEFAULT);
+    tokenGenerationAndVerification(masterHandler, slaveHandler, null);
   }
 
   @Test
@@ -274,7 +314,8 @@ public class TestBlockToken {
         blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
         enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
-        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
+        new StorageType[]{StorageType.DEFAULT});
 
     final Server server = createMockDatanode(sm, token, conf);
 
@@ -323,7 +364,8 @@ public class TestBlockToken {
         blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
         enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
-        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
+        new StorageType[]{StorageType.DEFAULT});
 
     final Server server = createMockDatanode(sm, token, conf);
     server.start();
@@ -409,14 +451,19 @@ public class TestBlockToken {
 
       ExportedBlockKeys keys = masterHandler.exportKeys();
       bpMgr.addKeys(bpid, keys);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
-
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
+          StorageType.DEFAULT);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
       // Test key updating
       masterHandler.updateKeys();
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
+          StorageType.DEFAULT);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
       keys = masterHandler.exportKeys();
       bpMgr.addKeys(bpid, keys);
-      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
+          StorageType.DEFAULT);
+      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
     }
   }
 
@@ -492,7 +539,8 @@ public class TestBlockToken {
         blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
         useProto);
     Token<BlockTokenIdentifier> token = sm.generateToken(block1,
-        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
+        new StorageType[]{StorageType.DEFAULT});
     final byte[] tokenBytes = token.getIdentifier();
     BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
     BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -547,7 +595,6 @@ public class TestBlockToken {
 
     dib.reset(emptyIdentBytes, emptyIdentBytes.length);
     readToken.readFields(dib);
-    assertTrue(invalidProtobufMessage);
   }
 
   @Test
@@ -557,7 +604,8 @@ public class TestBlockToken {
         blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
         useProto);
     Token<BlockTokenIdentifier> token = sm.generateToken(block1,
-        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
+        StorageType.EMPTY_ARRAY);
     final byte[] tokenBytes = token.getIdentifier();
     BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
     BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@@ -594,7 +642,7 @@ public class TestBlockToken {
     assertEquals(protobufToken, readToken);
   }
 
-  public void testCraftedProtobufBlockTokenIdentifier(
+  private void testCraftedProtobufBlockTokenIdentifier(
       BlockTokenIdentifier identifier, boolean expectIOE,
       boolean expectRTE) throws IOException {
     DataOutputBuffer dob = new DataOutputBuffer(4096);
@@ -631,20 +679,27 @@ public class TestBlockToken {
     dib.reset(identBytes, identBytes.length);
     readToken.readFieldsProtobuf(dib);
     assertEquals(protobufToken, readToken);
+    assertEquals(identifier, readToken);
   }
 
   @Test
-  public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
-      IOException {
+  public void testEmptyProtobufBlockTokenBytesIsProtobuf() throws IOException {
     // Empty BlockTokenIdentifiers throw IOException
     BlockTokenIdentifier identifier = new BlockTokenIdentifier();
     testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
+  }
 
+  @Test
+  public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
+      IOException {
     /* Parsing BlockTokenIdentifier with expiryDate
      * 2017-02-09 00:12:35,072+0100 will throw IOException.
      * However, expiryDate of
      * 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException.
      */
+    BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
+        "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
+        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true);
     Calendar cal = new GregorianCalendar();
     cal.set(2017, 1, 9, 0, 12, 35);
     long datetime = cal.getTimeInMillis();
@@ -656,4 +711,61 @@ public class TestBlockToken {
     identifier.setExpiryDate(datetime);
     testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
   }
+
+  private BlockTokenIdentifier writeAndReadBlockToken(
+      BlockTokenIdentifier identifier) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer(4096);
+    DataInputBuffer dib = new DataInputBuffer();
+    identifier.write(dob);
+    byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());
+
+    BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+    dib.reset(identBytes, identBytes.length);
+    readToken.readFields(dib);
+    assertEquals(identifier, readToken);
+    return readToken;
+  }
+
+  @Test
+  public void testEmptyBlockTokenSerialization() throws IOException {
+    BlockTokenIdentifier ident = new BlockTokenIdentifier();
+    BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
+    assertEquals(ret.getExpiryDate(), 0);
+    assertEquals(ret.getKeyId(), 0);
+    assertEquals(ret.getUserId(), null);
+    assertEquals(ret.getBlockPoolId(), null);
+    assertEquals(ret.getBlockId(), 0);
+    assertEquals(ret.getAccessModes(),
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+    assertArrayEquals(ret.getStorageTypes(), StorageType.EMPTY_ARRAY);
+  }
+
+  private void testBlockTokenSerialization(boolean useProto) throws
+      IOException {
+    EnumSet<BlockTokenIdentifier.AccessMode> accessModes =
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class);
+    StorageType[] storageTypes =
+        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
+            StorageType.DISK, StorageType.ARCHIVE};
+    BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
+        123, accessModes, storageTypes, useProto);
+    ident.setExpiryDate(1487080345L);
+    BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
+    assertEquals(ret.getExpiryDate(), 1487080345L);
+    assertEquals(ret.getKeyId(), 0);
+    assertEquals(ret.getUserId(), "user");
+    assertEquals(ret.getBlockPoolId(), "bpool");
+    assertEquals(ret.getBlockId(), 123);
+    assertEquals(ret.getAccessModes(),
+        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
+    assertArrayEquals(ret.getStorageTypes(), storageTypes);
+  }
+
+  @Test
+  public void testBlockTokenSerialization() throws IOException {
+    testBlockTokenSerialization(false);
+    testBlockTokenSerialization(true);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to