http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index a95f397..02b20d6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -698,7 +698,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB 
implements
       RpcController controller, GetDatanodeReportRequestProto req)
       throws ServiceException {
     try {
-      List<? extends DatanodeInfoProto> result = PBHelperClient.convert(server
+      List<? extends DatanodeInfoProto> result = PBHelper.convert(server
           .getDatanodeReport(PBHelper.convert(req.getType())));
       return GetDatanodeReportResponseProto.newBuilder()
           .addAllDi(result).build();
@@ -892,7 +892,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB 
implements
       server.setQuota(req.getPath(), req.getNamespaceQuota(),
           req.getStoragespaceQuota(),
           req.hasStorageType() ?
-          PBHelperClient.convertStorageType(req.getStorageType()): null);
+          PBHelper.convertStorageType(req.getStorageType()): null);
       return VOID_SETQUOTA_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -992,7 +992,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB 
implements
       GetDelegationTokenResponseProto.Builder rspBuilder = 
           GetDelegationTokenResponseProto.newBuilder();
       if (token != null) {
-        rspBuilder.setToken(PBHelperClient.convert(token));
+        rspBuilder.setToken(PBHelper.convert(token));
       }
       return rspBuilder.build();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index a0431b1..7e57b97 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -390,7 +390,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       String holder) throws AccessControlException, FileNotFoundException,
         UnresolvedLinkException, IOException {
     AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder()
-        .setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder)
+        .setB(PBHelper.convert(b)).setSrc(src).setHolder(holder)
             .setFileId(fileId).build();
     try {
       rpcProxy.abandonBlock(null, req);
@@ -409,9 +409,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
     AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
         .setSrc(src).setClientName(clientName).setFileId(fileId);
     if (previous != null) 
-      req.setPrevious(PBHelperClient.convert(previous));
-    if (excludeNodes != null)
-      req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes));
+      req.setPrevious(PBHelper.convert(previous)); 
+    if (excludeNodes != null) 
+      req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
     if (favoredNodes != null) {
       req.addAllFavoredNodes(Arrays.asList(favoredNodes));
     }
@@ -433,10 +433,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .newBuilder()
         .setSrc(src)
         .setFileId(fileId)
-        .setBlk(PBHelperClient.convert(blk))
-        .addAllExistings(PBHelperClient.convert(existings))
+        .setBlk(PBHelper.convert(blk))
+        .addAllExistings(PBHelper.convert(existings))
         .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
-        .addAllExcludes(PBHelperClient.convert(excludes))
+        .addAllExcludes(PBHelper.convert(excludes))
         .setNumAdditionalNodes(numAdditionalNodes)
         .setClientName(clientName)
         .build();
@@ -458,7 +458,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setClientName(clientName)
         .setFileId(fileId);
     if (last != null)
-      req.setLast(PBHelperClient.convert(last));
+      req.setLast(PBHelper.convert(last));
     try {
       return rpcProxy.complete(null, req.build()).getResult();
     } catch (ServiceException e) {
@@ -817,7 +817,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setNamespaceQuota(namespaceQuota)
         .setStoragespaceQuota(storagespaceQuota);
     if (type != null) {
-      builder.setStorageType(PBHelperClient.convertStorageType(type));
+      builder.setStorageType(PBHelper.convertStorageType(type));
     }
     final SetQuotaRequestProto req = builder.build();
     try {
@@ -895,7 +895,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       String clientName) throws IOException {
     UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto
         .newBuilder()
-        .setBlock(PBHelperClient.convert(block))
+        .setBlock(PBHelper.convert(block))
         .setClientName(clientName)
         .build();
     try {
@@ -911,8 +911,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) 
throws IOException {
     UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder()
         .setClientName(clientName)
-        .setOldBlock(PBHelperClient.convert(oldBlock))
-        .setNewBlock(PBHelperClient.convert(newBlock))
+        .setOldBlock(PBHelper.convert(oldBlock))
+        .setNewBlock(PBHelper.convert(newBlock))
         .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes)))
         .addAllStorageIDs(storageIDs == null ? null : 
Arrays.asList(storageIDs))
         .build();
@@ -943,7 +943,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
     RenewDelegationTokenRequestProto req = 
RenewDelegationTokenRequestProto.newBuilder().
-        setToken(PBHelperClient.convert(token)).
+        setToken(PBHelper.convert(token)).
         build();
     try {
       return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime();
@@ -957,7 +957,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throws IOException {
     CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
         .newBuilder()
-        .setToken(PBHelperClient.convert(token))
+        .setToken(PBHelper.convert(token))
         .build();
     try {
       rpcProxy.cancelDelegationToken(null, req);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 0b46927..94028a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -298,11 +298,11 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       ) throws IOException {
     CommitBlockSynchronizationRequestProto.Builder builder = 
         CommitBlockSynchronizationRequestProto.newBuilder()
-        
.setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp)
+        .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
         .setNewLength(newlength).setCloseFile(closeFile)
         .setDeleteBlock(deleteblock);
     for (int i = 0; i < newtargets.length; i++) {
-      builder.addNewTaragets(PBHelperClient.convert(newtargets[i]));
+      builder.addNewTaragets(PBHelper.convert(newtargets[i]));
       builder.addNewTargetStorages(newtargetstorages[i]);
     }
     CommitBlockSynchronizationRequestProto req = builder.build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
index 17ba196..fee62a4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
@@ -105,7 +105,7 @@ public class InterDatanodeProtocolTranslatorPB implements
       long recoveryId, long newBlockId, long newLength) throws IOException {
     UpdateReplicaUnderRecoveryRequestProto req = 
         UpdateReplicaUnderRecoveryRequestProto.newBuilder()
-        .setBlock(PBHelperClient.convert(oldBlock))
+        .setBlock(PBHelper.convert(oldBlock))
         .setNewLength(newLength).setNewBlockId(newBlockId)
         .setRecoveryId(recoveryId).build();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index bcb96ba..82c5c4c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -101,7 +101,7 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
       throws IOException {
     GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
-        
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
+        .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size)
         .build();
     try {
       return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index dbb1861..a252262 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -347,7 +347,7 @@ public class PBHelper {
     if (types == null || types.length == 0) {
       return null;
     }
-    List<StorageTypeProto> list = PBHelperClient.convertStorageTypes(types);
+    List<StorageTypeProto> list = convertStorageTypes(types);
     return StorageTypesProto.newBuilder().addAllStorageTypes(list).build();
   }
 
@@ -382,6 +382,20 @@ public class PBHelper {
         .getInfoSecurePort() : 0, dn.getIpcPort());
   }
 
+  public static DatanodeIDProto convert(DatanodeID dn) {
+    // For wire compatibility with older versions we transmit the StorageID
+    // which is the same as the DatanodeUuid. Since StorageID is a required
+    // field we pass the empty string if the DatanodeUuid is not yet known.
+    return DatanodeIDProto.newBuilder()
+        .setIpAddr(dn.getIpAddr())
+        .setHostName(dn.getHostName())
+        .setXferPort(dn.getXferPort())
+        .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : 
"")
+        .setInfoPort(dn.getInfoPort())
+        .setInfoSecurePort(dn.getInfoSecurePort())
+        .setIpcPort(dn.getIpcPort()).build();
+  }
+
   // Arrays of DatanodeId
   public static DatanodeIDProto[] convert(DatanodeID[] did) {
     if (did == null)
@@ -389,7 +403,7 @@ public class PBHelper {
     final int len = did.length;
     DatanodeIDProto[] result = new DatanodeIDProto[len];
     for (int i = 0; i < len; ++i) {
-      result[i] = PBHelperClient.convert(did[i]);
+      result[i] = convert(did[i]);
     }
     return result;
   }
@@ -420,7 +434,7 @@ public class PBHelper {
         .setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
         .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
-        
.addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()))
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
         .build();
   }
 
@@ -582,6 +596,16 @@ public class PBHelper {
        eb.getGenerationStamp());
   }
   
+  public static ExtendedBlockProto convert(final ExtendedBlock b) {
+    if (b == null) return null;
+   return ExtendedBlockProto.newBuilder().
+      setPoolId(b.getBlockPoolId()).
+      setBlockId(b.getBlockId()).
+      setNumBytes(b.getNumBytes()).
+      setGenerationStamp(b.getGenerationStamp()).
+      build();
+  }
+  
   public static RecoveringBlockProto convert(RecoveringBlock b) {
     if (b == null) {
       return null;
@@ -602,6 +626,17 @@ public class PBHelper {
         new RecoveringBlock(block, locs, b.getNewGenStamp());
   }
   
+  public static DatanodeInfoProto.AdminState convert(
+      final DatanodeInfo.AdminStates inAs) {
+    switch (inAs) {
+    case NORMAL: return  DatanodeInfoProto.AdminState.NORMAL;
+    case DECOMMISSION_INPROGRESS: 
+        return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
+    case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
+    default: return DatanodeInfoProto.AdminState.NORMAL;
+    }
+  }
+  
   static public DatanodeInfo convert(DatanodeInfoProto di) {
     if (di == null) return null;
     return new DatanodeInfo(
@@ -613,6 +648,12 @@ public class PBHelper {
         di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
   }
   
+  static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
+    if (di == null) return null;
+    return convert(di);
+  }
+  
+  
   static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
     if (di == null) return null;
     DatanodeInfo[] result = new DatanodeInfo[di.length];
@@ -622,6 +663,27 @@ public class PBHelper {
     return result;
   }
 
+  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
+      DatanodeInfo[] dnInfos) {
+    return convert(dnInfos, 0);
+  }
+  
+  /**
+   * Copy from {@code dnInfos} to a target of list of same size starting at
+   * {@code startIdx}.
+   */
+  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
+      DatanodeInfo[] dnInfos, int startIdx) {
+    if (dnInfos == null)
+      return null;
+    ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
+        .newArrayListWithCapacity(dnInfos.length);
+    for (int i = startIdx; i < dnInfos.length; i++) {
+      protos.add(convert(dnInfos[i]));
+    }
+    return protos;
+  }
+
   public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
     DatanodeInfo[] info = new DatanodeInfo[list.size()];
     for (int i = 0; i < info.length; i++) {
@@ -629,11 +691,32 @@ public class PBHelper {
     }
     return info;
   }
+  
+  public static DatanodeInfoProto convert(DatanodeInfo info) {
+    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
+    if (info.getNetworkLocation() != null) {
+      builder.setLocation(info.getNetworkLocation());
+    }
+    builder
+        .setId(PBHelper.convert((DatanodeID)info))
+        .setCapacity(info.getCapacity())
+        .setDfsUsed(info.getDfsUsed())
+        .setRemaining(info.getRemaining())
+        .setBlockPoolUsed(info.getBlockPoolUsed())
+        .setCacheCapacity(info.getCacheCapacity())
+        .setCacheUsed(info.getCacheUsed())
+        .setLastUpdate(info.getLastUpdate())
+        .setLastUpdateMonotonic(info.getLastUpdateMonotonic())
+        .setXceiverCount(info.getXceiverCount())
+        .setAdminState(PBHelper.convert(info.getAdminState()))
+        .build();
+    return builder.build();
+  }
 
   public static DatanodeStorageReportProto convertDatanodeStorageReport(
       DatanodeStorageReport report) {
     return DatanodeStorageReportProto.newBuilder()
-        .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo()))
+        .setDatanodeInfo(convert(report.getDatanodeInfo()))
         
.addAllStorageReports(convertStorageReports(report.getStorageReports()))
         .build();
   }
@@ -685,7 +768,7 @@ public class PBHelper {
         Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
     for (int i = 0; i < locs.length; i++) {
       DatanodeInfo loc = locs[i];
-      builder.addLocs(i, PBHelperClient.convert(loc));
+      builder.addLocs(i, PBHelper.convert(loc));
       boolean locIsCached = cachedLocs.contains(loc);
       builder.addIsCached(locIsCached);
       if (locIsCached) {
@@ -699,7 +782,7 @@ public class PBHelper {
     StorageType[] storageTypes = b.getStorageTypes();
     if (storageTypes != null) {
       for (int i = 0; i < storageTypes.length; ++i) {
-        
builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i]));
+        builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
       }
     }
     final String[] storageIDs = b.getStorageIDs();
@@ -707,8 +790,8 @@ public class PBHelper {
       builder.addAllStorageIDs(Arrays.asList(storageIDs));
     }
 
-    return builder.setB(PBHelperClient.convert(b.getBlock()))
-        .setBlockToken(PBHelperClient.convert(b.getBlockToken()))
+    return builder.setB(PBHelper.convert(b.getBlock()))
+        .setBlockToken(PBHelper.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
   }
   
@@ -749,6 +832,14 @@ public class PBHelper {
     return lb;
   }
 
+  public static TokenProto convert(Token<?> tok) {
+    return TokenProto.newBuilder().
+              setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
+              setPassword(ByteString.copyFrom(tok.getPassword())).
+              setKind(tok.getKind().toString()).
+              setService(tok.getService().toString()).build(); 
+  }
+  
   public static Token<BlockTokenIdentifier> convert(
       TokenProto blockToken) {
     return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
@@ -800,7 +891,7 @@ public class PBHelper {
       DatanodeRegistration registration) {
     DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
         .newBuilder();
-    return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) 
registration))
+    return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
         .setStorageInfo(PBHelper.convert(registration.getStorageInfo()))
         .setKeys(PBHelper.convert(registration.getExportedKeys()))
         .setSoftwareVersion(registration.getSoftwareVersion()).build();
@@ -892,7 +983,7 @@ public class PBHelper {
     if (types != null) {
       for (StorageType[] ts : types) {
         StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
-        builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts));
+        builder.addAllStorageTypes(convertStorageTypes(ts));
         list.add(builder.build());
       }
     }
@@ -923,7 +1014,7 @@ public class PBHelper {
     DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
     for (int i = 0; i < targets.length; i++) {
       ret[i] = DatanodeInfosProto.newBuilder()
-          .addAllDatanodes(PBHelperClient.convert(targets[i])).build();
+          .addAllDatanodes(PBHelper.convert(targets[i])).build();
     }
     return Arrays.asList(ret);
   }
@@ -1247,7 +1338,7 @@ public class PBHelper {
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
-        PBHelperClient.convert(fs.getChecksumType()));
+        PBHelper.convert(fs.getChecksumType()));
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -1260,7 +1351,7 @@ public class PBHelper {
       .setFileBufferSize(fs.getFileBufferSize())
       .setEncryptDataTransfer(fs.getEncryptDataTransfer())
       .setTrashInterval(fs.getTrashInterval())
-      .setChecksumType(PBHelperClient.convert(fs.getChecksumType()))
+      .setChecksumType(PBHelper.convert(fs.getChecksumType()))
       .build();
   }
   
@@ -1648,7 +1739,7 @@ public class PBHelper {
     if (cs.hasTypeQuotaInfos()) {
       for (HdfsProtos.StorageTypeQuotaInfoProto info :
           cs.getTypeQuotaInfos().getTypeQuotaInfoList()) {
-        StorageType type = PBHelperClient.convertStorageType(info.getType());
+        StorageType type = PBHelper.convertStorageType(info.getType());
         builder.typeConsumed(type, info.getConsumed());
         builder.typeQuota(type, info.getQuota());
       }
@@ -1672,7 +1763,7 @@ public class PBHelper {
       for (StorageType t: StorageType.getTypesSupportingQuota()) {
         HdfsProtos.StorageTypeQuotaInfoProto info =
             HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
-                setType(PBHelperClient.convertStorageType(t)).
+                setType(convertStorageType(t)).
                 setConsumed(cs.getTypeConsumed(t)).
                 setQuota(cs.getTypeQuota(t)).
                 build();
@@ -1717,7 +1808,7 @@ public class PBHelper {
   public static DatanodeStorageProto convert(DatanodeStorage s) {
     return DatanodeStorageProto.newBuilder()
         .setState(PBHelper.convertState(s.getState()))
-        .setStorageType(PBHelperClient.convertStorageType(s.getStorageType()))
+        .setStorageType(PBHelper.convertStorageType(s.getStorageType()))
         .setStorageUuid(s.getStorageID()).build();
   }
 
@@ -1731,10 +1822,44 @@ public class PBHelper {
     }
   }
 
+  public static List<StorageTypeProto> convertStorageTypes(
+      StorageType[] types) {
+    return convertStorageTypes(types, 0);
+  }
+
+  public static List<StorageTypeProto> convertStorageTypes(
+      StorageType[] types, int startIdx) {
+    if (types == null) {
+      return null;
+    }
+    final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
+        types.length);
+    for (int i = startIdx; i < types.length; ++i) {
+      protos.add(convertStorageType(types[i]));
+    }
+    return protos; 
+  }
+
+  public static StorageTypeProto convertStorageType(StorageType type) {
+    switch(type) {
+    case DISK:
+      return StorageTypeProto.DISK;
+    case SSD:
+      return StorageTypeProto.SSD;
+    case ARCHIVE:
+      return StorageTypeProto.ARCHIVE;
+    case RAM_DISK:
+      return StorageTypeProto.RAM_DISK;
+    default:
+      throw new IllegalStateException(
+          "BUG: StorageType not found, type=" + type);
+    }
+  }
+
   public static DatanodeStorage convert(DatanodeStorageProto s) {
     return new DatanodeStorage(s.getStorageUuid(),
                                PBHelper.convertState(s.getState()),
-                               
PBHelperClient.convertStorageType(s.getStorageType()));
+                               
PBHelper.convertStorageType(s.getStorageType()));
   }
 
   private static State convertState(StorageState state) {
@@ -1747,6 +1872,22 @@ public class PBHelper {
     }
   }
 
+  public static StorageType convertStorageType(StorageTypeProto type) {
+    switch(type) {
+      case DISK:
+        return StorageType.DISK;
+      case SSD:
+        return StorageType.SSD;
+      case ARCHIVE:
+        return StorageType.ARCHIVE;
+      case RAM_DISK:
+        return StorageType.RAM_DISK;
+      default:
+        throw new IllegalStateException(
+            "BUG: StorageTypeProto not found, type=" + type);
+    }
+  }
+
   public static StorageType[] convertStorageTypes(
       List<StorageTypeProto> storageTypesList, int expectedSize) {
     final StorageType[] storageTypes = new StorageType[expectedSize];
@@ -1755,7 +1896,7 @@ public class PBHelper {
       Arrays.fill(storageTypes, StorageType.DEFAULT);
     } else {
       for (int i = 0; i < storageTypes.length; ++i) {
-        storageTypes[i] = 
PBHelperClient.convertStorageType(storageTypesList.get(i));
+        storageTypes[i] = convertStorageType(storageTypesList.get(i));
       }
     }
     return storageTypes;
@@ -1939,6 +2080,10 @@ public class PBHelper {
     return reportProto;
   }
 
+  public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
+    return DataChecksum.Type.valueOf(type.getNumber());
+  }
+
   public static CacheDirectiveInfoProto convert
       (CacheDirectiveInfo info) {
     CacheDirectiveInfoProto.Builder builder = 
@@ -2111,6 +2256,9 @@ public class PBHelper {
     return new CachePoolEntry(info, stats);
   }
   
+  public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
+  }
 
   public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) {
     DatanodeLocalInfoProto.Builder builder = 
DatanodeLocalInfoProto.newBuilder();
@@ -2125,6 +2273,17 @@ public class PBHelper {
         proto.getConfigVersion(), proto.getUptime());
   }
 
+  public static InputStream vintPrefixed(final InputStream input)
+      throws IOException {
+    final int firstByte = input.read();
+    if (firstByte == -1) {
+      throw new EOFException("Premature EOF: no length prefix available");
+    }
+
+    int size = CodedInputStream.readRawVarint32(firstByte, input);
+    assert size >= 0;
+    return new ExactSizeInputStream(input, size);
+  }
 
   private static AclEntryScopeProto convert(AclEntryScope v) {
     return AclEntryScopeProto.valueOf(v.ordinal());
@@ -2348,11 +2507,30 @@ public class PBHelper {
         proto.getKeyName());
   }
 
+  public static ShortCircuitShmSlotProto convert(SlotId slotId) {
+    return ShortCircuitShmSlotProto.newBuilder().
+        setShmId(convert(slotId.getShmId())).
+        setSlotIdx(slotId.getSlotIdx()).
+        build();
+  }
+
+  public static ShortCircuitShmIdProto convert(ShmId shmId) {
+    return ShortCircuitShmIdProto.newBuilder().
+        setHi(shmId.getHi()).
+        setLo(shmId.getLo()).
+        build();
+
+  }
+
   public static SlotId convert(ShortCircuitShmSlotProto slotId) {
-    return new SlotId(PBHelperClient.convert(slotId.getShmId()),
+    return new SlotId(PBHelper.convert(slotId.getShmId()),
         slotId.getSlotIdx());
   }
 
+  public static ShmId convert(ShortCircuitShmIdProto shmId) {
+    return new ShmId(shmId.getHi(), shmId.getLo());
+  }
+
   private static Event.CreateEvent.INodeType 
createTypeConvert(InotifyProtos.INodeType
       type) {
     switch (type) {
@@ -2859,6 +3037,18 @@ public class PBHelper {
         ezKeyVersionName);
   }
 
+  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
+    List<Boolean> pinnings = new ArrayList<Boolean>();
+    if (targetPinnings == null) {
+      pinnings.add(Boolean.FALSE);
+    } else {
+      for (; idx < targetPinnings.length; ++idx) {
+        pinnings.add(Boolean.valueOf(targetPinnings[idx]));
+      }
+    }
+    return pinnings;
+  }
+
   public static boolean[] convertBooleanList(
     List<Boolean> targetPinningsList) {
     final boolean[] targetPinnings = new boolean[targetPinningsList.size()];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
new file mode 100644
index 0000000..2fa86fa
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Access token verification failed.
+ */
[email protected]
[email protected]
+public class InvalidBlockTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidBlockTokenException() {
+    super();
+  }
+
+  public InvalidBlockTokenException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index dc967ff..40564df 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
new file mode 100644
index 0000000..215df13
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+/**
+ * The caching strategy we should use for an HDFS read or write operation.
+ */
+public class CachingStrategy {
+  private final Boolean dropBehind; // null = use server defaults
+  private final Long readahead; // null = use server defaults
+  
+  public static CachingStrategy newDefaultStrategy() {
+    return new CachingStrategy(null, null);
+  }
+
+  public static CachingStrategy newDropBehind() {
+    return new CachingStrategy(true, null);
+  }
+
+  public static class Builder {
+    private Boolean dropBehind;
+    private Long readahead;
+
+    public Builder(CachingStrategy prev) {
+      this.dropBehind = prev.dropBehind;
+      this.readahead = prev.readahead;
+    }
+
+    public Builder setDropBehind(Boolean dropBehind) {
+      this.dropBehind = dropBehind;
+      return this;
+    }
+
+    public Builder setReadahead(Long readahead) {
+      this.readahead = readahead;
+      return this;
+    }
+
+    public CachingStrategy build() {
+      return new CachingStrategy(dropBehind, readahead);
+    }
+  }
+
+  public CachingStrategy(Boolean dropBehind, Long readahead) {
+    this.dropBehind = dropBehind;
+    this.readahead = readahead;
+  }
+
+  public Boolean getDropBehind() {
+    return dropBehind;
+  }
+  
+  public Long getReadahead() {
+    return readahead;
+  }
+
+  public String toString() {
+    return "CachingStrategy(dropBehind=" + dropBehind +
+        ", readahead=" + readahead + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index df03808..fa3b78c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -137,7 +137,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
 import 
org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
@@ -2172,7 +2172,7 @@ public class DataNode extends ReconfigurableBase
         // read ack
         if (isClient) {
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              PBHelperClient.vintPrefixed(in));
+              PBHelper.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
             LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index dfaa525..e9cf436 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -70,7 +70,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import 
org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
@@ -427,7 +427,7 @@ class DataXceiver extends Receiver implements Runnable {
       throws IOException {
     DataNodeFaultInjector.get().sendShortCircuitShmResponse();
     ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
-        setId(PBHelperClient.convert(shmInfo.shmId)).build().
+        setId(PBHelper.convert(shmInfo.shmId)).build().
         writeDelimitedTo(socketOut);
     // Send the file descriptor for the shared memory segment.
     byte buf[] = new byte[] { (byte)0 };
@@ -559,7 +559,7 @@ class DataXceiver extends Receiver implements Runnable {
         // to respond with a Status enum.
         try {
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
-              PBHelperClient.vintPrefixed(in));
+              PBHelper.vintPrefixed(in));
           if (!stat.hasStatus()) {
             LOG.warn("Client " + peer.getRemoteAddressString() +
                 " did not send a valid status code after reading. " +
@@ -745,7 +745,7 @@ class DataXceiver extends Receiver implements Runnable {
           // read connect ack (only for clients, not for replication req)
           if (isClient) {
             BlockOpResponseProto connectAck =
-              
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
+              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
             mirrorInStatus = connectAck.getStatus();
             firstBadLink = connectAck.getFirstBadLink();
             if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@@ -962,7 +962,7 @@ class DataXceiver extends Receiver implements Runnable {
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelperClient.convert(checksum.getChecksumType())))
+          .setCrcType(PBHelper.convert(checksum.getChecksumType())))
         .build()
         .writeDelimitedTo(out);
       out.flush();
@@ -1147,8 +1147,8 @@ class DataXceiver extends Receiver implements Runnable {
         // receive the response from the proxy
         
         BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
-            PBHelperClient.vintPrefixed(proxyReply));
-
+            PBHelper.vintPrefixed(proxyReply));
+        
         String logInfo = "copy block " + block + " from "
             + proxySock.getRemoteSocketAddress();
         DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 0afb06c..9df1713 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -155,7 +154,7 @@ public final class FSImageFormatPBINode {
       QuotaByStorageTypeFeatureProto proto) {
       ImmutableList.Builder<QuotaByStorageTypeEntry> b = 
ImmutableList.builder();
       for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) {
-        StorageType type = 
PBHelperClient.convertStorageType(quotaEntry.getStorageType());
+        StorageType type = 
PBHelper.convertStorageType(quotaEntry.getStorageType());
         long quota = quotaEntry.getQuota();
         b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type)
             .setQuota(quota).build());
@@ -460,7 +459,7 @@ public final class FSImageFormatPBINode {
         if (q.getTypeSpace(t) >= 0) {
           QuotaByStorageTypeEntryProto.Builder eb =
               QuotaByStorageTypeEntryProto.newBuilder().
-              setStorageType(PBHelperClient.convertStorageType(t)).
+              setStorageType(PBHelper.convertStorageType(t)).
               setQuota(q.getTypeSpace(t));
           b.addQuotas(eb);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
new file mode 100644
index 0000000..81cc68d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import 
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * DfsClientShm is a subclass of ShortCircuitShm which is used by the
+ * DfsClient.
+ * When the UNIX domain socket associated with this shared memory segment
+ * closes unexpectedly, we mark the slots inside this segment as disconnected.
+ * ShortCircuitReplica objects that contain disconnected slots are stale,
+ * and will not be used to service new reads or mmap operations.
+ * However, in-progress read or mmap operations will continue to proceed.
+ * Once the last slot is deallocated, the segment can be safely munmapped.
+ *
+ * Slots may also become stale because the associated replica has been deleted
+ * on the DataNode.  In this case, the DataNode will clear the 'valid' bit.
+ * The client will then see these slots as stale (see
+ * #{ShortCircuitReplica#isStale}).
+ */
+public class DfsClientShm extends ShortCircuitShm
+    implements DomainSocketWatcher.Handler {
+  /**
+   * The EndpointShmManager associated with this shared memory segment.
+   */
+  private final EndpointShmManager manager;
+
+  /**
+   * The UNIX domain socket associated with this DfsClientShm.
+   * We rely on the DomainSocketWatcher to close the socket associated with
+   * this DomainPeer when necessary.
+   */
+  private final DomainPeer peer;
+
+  /**
+   * True if this shared memory segment has lost its connection to the
+   * DataNode.
+   *
+   * {@link DfsClientShm#handle} sets this to true.
+   */
+  private boolean disconnected = false;
+
+  DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
+      DomainPeer peer) throws IOException {
+    super(shmId, stream);
+    this.manager = manager;
+    this.peer = peer;
+  }
+
+  public EndpointShmManager getEndpointShmManager() {
+    return manager;
+  }
+
+  public DomainPeer getPeer() {
+    return peer;
+  }
+
+  /**
+   * Determine if the shared memory segment is disconnected from the DataNode.
+   *
+   * This must be called with the DfsClientShmManager lock held.
+   *
+   * @return   True if the shared memory segment is stale.
+   */
+  public synchronized boolean isDisconnected() {
+    return disconnected;
+  }
+
+  /**
+   * Handle the closure of the UNIX domain socket associated with this shared
+   * memory segment by marking this segment as stale.
+   *
+   * If there are no slots associated with this shared memory segment, it will
+   * be freed immediately in this function.
+   */
+  @Override
+  public boolean handle(DomainSocket sock) {
+    manager.unregisterShm(getShmId());
+    synchronized (this) {
+      Preconditions.checkState(!disconnected);
+      disconnected = true;
+      boolean hadSlots = false;
+      for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
+        Slot slot = iter.next();
+        slot.makeInvalid();
+        hadSlots = true;
+      }
+      if (!hadSlots) {
+        free();
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
new file mode 100644
index 0000000..062539a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages short-circuit memory segments for an HDFS client.
+ * 
+ * Clients are responsible for requesting and releasing shared memory segments 
used
+ * for communicating with the DataNode. The client will try to allocate new 
slots
+ * in the set of existing segments, falling back to getting a new segment from 
the
+ * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}.
+ * 
+ * The counterpart to this class on the DataNode is {@link 
ShortCircuitRegistry}.
+ * See {@link ShortCircuitRegistry} for more information on the communication 
protocol.
+ */
[email protected]
+public class DfsClientShmManager implements Closeable {
+  private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
+
+  /**
+   * Manages short-circuit memory segments that pertain to a given DataNode.
+   */
+  class EndpointShmManager {
+    /**
+     * The datanode we're managing.
+     */
+    private final DatanodeInfo datanode;
+
+    /**
+     * Shared memory segments which have no empty slots.
+     *
+     * Protected by the manager lock.
+     */
+    private final TreeMap<ShmId, DfsClientShm> full =
+        new TreeMap<ShmId, DfsClientShm>();
+
+    /**
+     * Shared memory segments which have at least one empty slot.
+     *
+     * Protected by the manager lock.
+     */
+    private final TreeMap<ShmId, DfsClientShm> notFull =
+        new TreeMap<ShmId, DfsClientShm>();
+
+    /**
+     * True if this datanode doesn't support short-circuit shared memory
+     * segments.
+     *
+     * Protected by the manager lock.
+     */
+    private boolean disabled = false;
+
+    /**
+     * True if we're in the process of loading a shared memory segment from
+     * this DataNode.
+     *
+     * Protected by the manager lock.
+     */
+    private boolean loading = false;
+
+    EndpointShmManager (DatanodeInfo datanode) {
+      this.datanode = datanode;
+    }
+
+    /**
+     * Pull a slot out of a preexisting shared memory segment.
+     *
+     * Must be called with the manager lock held.
+     *
+     * @param blockId     The blockId to put inside the Slot object.
+     *
+     * @return            null if none of our shared memory segments contain a
+     *                      free slot; the slot object otherwise.
+     */
+    private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
+      if (notFull.isEmpty()) {
+        return null;
+      }
+      Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
+      DfsClientShm shm = entry.getValue();
+      ShmId shmId = shm.getShmId();
+      Slot slot = shm.allocAndRegisterSlot(blockId);
+      if (shm.isFull()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
+              " out of " + shm);
+        }
+        DfsClientShm removedShm = notFull.remove(shmId);
+        Preconditions.checkState(removedShm == shm);
+        full.put(shmId, shm);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
+              " out of " + shm);
+        }
+      }
+      return slot;
+    }
+
+    /**
+     * Ask the DataNode for a new shared memory segment.  This function must be
+     * called with the manager lock held.  We will release the lock while
+     * communicating with the DataNode.
+     *
+     * @param clientName    The current client name.
+     * @param peer          The peer to use to talk to the DataNode.
+     *
+     * @return              Null if the DataNode does not support shared memory
+     *                        segments, or experienced an error creating the
+     *                        shm.  The shared memory segment itself on 
success.
+     * @throws IOException  If there was an error communicating over the 
socket.
+     *                        We will not throw an IOException unless the 
socket
+     *                        itself (or the network) is the problem.
+     */
+    private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
+        throws IOException {
+      final DataOutputStream out = 
+          new DataOutputStream(
+              new BufferedOutputStream(peer.getOutputStream()));
+      new Sender(out).requestShortCircuitShm(clientName);
+      ShortCircuitShmResponseProto resp = 
+          ShortCircuitShmResponseProto.parseFrom(
+              PBHelper.vintPrefixed(peer.getInputStream()));
+      String error = resp.hasError() ? resp.getError() : "(unknown)";
+      switch (resp.getStatus()) {
+      case SUCCESS:
+        DomainSocket sock = peer.getDomainSocket();
+        byte buf[] = new byte[1];
+        FileInputStream fis[] = new FileInputStream[1];
+        if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
+          throw new EOFException("got EOF while trying to transfer the " +
+              "file descriptor for the shared memory segment.");
+        }
+        if (fis[0] == null) {
+          throw new IOException("the datanode " + datanode + " failed to " +
+              "pass a file descriptor for the shared memory segment.");
+        }
+        try {
+          DfsClientShm shm = 
+              new DfsClientShm(PBHelper.convert(resp.getId()),
+                  fis[0], this, peer);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": createNewShm: created " + shm);
+          }
+          return shm;
+        } finally {
+          IOUtils.cleanup(LOG,  fis[0]);
+        }
+      case ERROR_UNSUPPORTED:
+        // The DataNode just does not support short-circuit shared memory
+        // access, and we should stop asking.
+        LOG.info(this + ": datanode does not support short-circuit " +
+            "shared memory access: " + error);
+        disabled = true;
+        return null;
+      default:
+        // The datanode experienced some kind of unexpected error when trying 
to
+        // create the short-circuit shared memory segment.
+        LOG.warn(this + ": error requesting short-circuit shared memory " +
+            "access: " + error);
+        return null;
+      }
+    }
+
+    /**
+     * Allocate a new shared memory slot connected to this datanode.
+     *
+     * Must be called with the EndpointShmManager lock held.
+     *
+     * @param peer          The peer to use to talk to the DataNode.
+     * @param usedPeer      (out param) Will be set to true if we used the 
peer.
+     *                        When a peer is used
+     *
+     * @param clientName    The client name.
+     * @param blockId       The block ID to use.
+     * @return              null if the DataNode does not support shared memory
+     *                        segments, or experienced an error creating the
+     *                        shm.  The shared memory segment itself on 
success.
+     * @throws IOException  If there was an error communicating over the 
socket.
+     */
+    Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
+        String clientName, ExtendedBlockId blockId) throws IOException {
+      while (true) {
+        if (closed) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": the DfsClientShmManager has been closed.");
+          }
+          return null;
+        }
+        if (disabled) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": shared memory segment access is disabled.");
+          }
+          return null;
+        }
+        // Try to use an existing slot.
+        Slot slot = allocSlotFromExistingShm(blockId);
+        if (slot != null) {
+          return slot;
+        }
+        // There are no free slots.  If someone is loading more slots, wait
+        // for that to finish.
+        if (loading) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": waiting for loading to finish...");
+          }
+          finishedLoading.awaitUninterruptibly();
+        } else {
+          // Otherwise, load the slot ourselves.
+          loading = true;
+          lock.unlock();
+          DfsClientShm shm;
+          try {
+            shm = requestNewShm(clientName, peer);
+            if (shm == null) continue;
+            // See #{DfsClientShmManager#domainSocketWatcher} for details
+            // about why we do this before retaking the manager lock.
+            domainSocketWatcher.add(peer.getDomainSocket(), shm);
+            // The DomainPeer is now our responsibility, and should not be
+            // closed by the caller.
+            usedPeer.setValue(true);
+          } finally {
+            lock.lock();
+            loading = false;
+            finishedLoading.signalAll();
+          }
+          if (shm.isDisconnected()) {
+            // If the peer closed immediately after the shared memory segment
+            // was created, the DomainSocketWatcher callback might already have
+            // fired and marked the shm as disconnected.  In this case, we
+            // obviously don't want to add the SharedMemorySegment to our list
+            // of valid not-full segments.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(this + ": the UNIX domain socket associated with " +
+                  "this short-circuit memory closed before we could make " +
+                  "use of the shm.");
+            }
+          } else {
+            notFull.put(shm.getShmId(), shm);
+          }
+        }
+      }
+    }
+    
+    /**
+     * Stop tracking a slot.
+     *
+     * Must be called with the EndpointShmManager lock held.
+     *
+     * @param slot          The slot to release.
+     */
+    void freeSlot(Slot slot) {
+      DfsClientShm shm = (DfsClientShm)slot.getShm();
+      shm.unregisterSlot(slot.getSlotIdx());
+      if (shm.isDisconnected()) {
+        // Stale shared memory segments should not be tracked here.
+        Preconditions.checkState(!full.containsKey(shm.getShmId()));
+        Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
+        if (shm.isEmpty()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": freeing empty stale " + shm);
+          }
+          shm.free();
+        }
+      } else {
+        ShmId shmId = shm.getShmId();
+        full.remove(shmId); // The shm can't be full if we just freed a slot.
+        if (shm.isEmpty()) {
+          notFull.remove(shmId);
+  
+          // If the shared memory segment is now empty, we call shutdown(2) on
+          // the UNIX domain socket associated with it.  The 
DomainSocketWatcher,
+          // which is watching this socket, will call DfsClientShm#handle,
+          // cleaning up this shared memory segment.
+          //
+          // See #{DfsClientShmManager#domainSocketWatcher} for details about 
why
+          // we don't want to call DomainSocketWatcher#remove directly here.
+          //
+          // Note that we could experience 'fragmentation' here, where the
+          // DFSClient allocates a bunch of slots in different shared memory
+          // segments, and then frees most of them, but never fully empties out
+          // any segment.  We make some attempt to avoid this fragmentation by
+          // always allocating new slots out of the shared memory segment with 
the
+          // lowest ID, but it could still occur.  In most workloads,
+          // fragmentation should not be a major concern, since it doesn't 
impact
+          // peak file descriptor usage or the speed of allocation.
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": shutting down UNIX domain socket for " +
+                "empty " + shm);
+          }
+          shutdown(shm);
+        } else {
+          notFull.put(shmId, shm);
+        }
+      }
+    }
+    
+    /**
+     * Unregister a shared memory segment.
+     *
+     * Once a segment is unregistered, we will not allocate any more slots
+     * inside that segment.
+     *
+     * The DomainSocketWatcher calls this while holding the DomainSocketWatcher
+     * lock.
+     *
+     * @param shmId         The ID of the shared memory segment to unregister.
+     */
+    void unregisterShm(ShmId shmId) {
+      lock.lock();
+      try {
+        full.remove(shmId);
+        notFull.remove(shmId);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return String.format("EndpointShmManager(%s, parent=%s)",
+          datanode, DfsClientShmManager.this);
+    }
+
+    PerDatanodeVisitorInfo getVisitorInfo() {
+      return new PerDatanodeVisitorInfo(full, notFull, disabled);
+    }
+
+    final void shutdown(DfsClientShm shm) {
+      try {
+        shm.getPeer().getDomainSocket().shutdown();
+      } catch (IOException e) {
+        LOG.warn(this + ": error shutting down shm: got IOException calling " +
+            "shutdown(SHUT_RDWR)", e);
+      }
+    }
+  }
+
+  private boolean closed = false;
+
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * A condition variable which is signalled when we finish loading a segment
+   * from the Datanode.
+   */
+  private final Condition finishedLoading = lock.newCondition();
+
+  /**
+   * Information about each Datanode.
+   */
+  private final HashMap<DatanodeInfo, EndpointShmManager> datanodes =
+      new HashMap<DatanodeInfo, EndpointShmManager>(1);
+  
+  /**
+   * The DomainSocketWatcher which keeps track of the UNIX domain socket
+   * associated with each shared memory segment.
+   *
+   * Note: because the DomainSocketWatcher makes callbacks into this
+   * DfsClientShmManager object, you must MUST NOT attempt to take the
+   * DomainSocketWatcher lock while holding the DfsClientShmManager lock,
+   * or else deadlock might result.   This means that most DomainSocketWatcher
+   * methods are off-limits unless you release the manager lock first.
+   */
+  private final DomainSocketWatcher domainSocketWatcher;
+  
+  DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
+    this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs,
+        "client");
+  }
+  
+  public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
+      MutableBoolean usedPeer, ExtendedBlockId blockId,
+      String clientName) throws IOException {
+    lock.lock();
+    try {
+      if (closed) {
+        LOG.trace(this + ": the DfsClientShmManager isclosed.");
+        return null;
+      }
+      EndpointShmManager shmManager = datanodes.get(datanode);
+      if (shmManager == null) {
+        shmManager = new EndpointShmManager(datanode);
+        datanodes.put(datanode, shmManager);
+      }
+      return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  public void freeSlot(Slot slot) {
+    lock.lock();
+    try {
+      DfsClientShm shm = (DfsClientShm)slot.getShm();
+      shm.getEndpointShmManager().freeSlot(slot);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public static class PerDatanodeVisitorInfo {
+    public final TreeMap<ShmId, DfsClientShm> full;
+    public final TreeMap<ShmId, DfsClientShm> notFull;
+    public final boolean disabled;
+
+    PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full,
+        TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) {
+      this.full = full;
+      this.notFull = notFull;
+      this.disabled = disabled;
+    }
+  }
+
+  @VisibleForTesting
+  public interface Visitor {
+    void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+        throws IOException;
+  }
+
+  @VisibleForTesting
+  public void visit(Visitor visitor) throws IOException {
+    lock.lock();
+    try {
+      HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = 
+          new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
+      for (Entry<DatanodeInfo, EndpointShmManager> entry :
+            datanodes.entrySet()) {
+        info.put(entry.getKey(), entry.getValue().getVisitorInfo());
+      }
+      visitor.visit(info);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Close the DfsClientShmManager.
+   */
+  @Override
+  public void close() throws IOException {
+    lock.lock();
+    try {
+      if (closed) return;
+      closed = true;
+    } finally {
+      lock.unlock();
+    }
+    // When closed, the domainSocketWatcher will issue callbacks that mark
+    // all the outstanding DfsClientShm segments as stale.
+    IOUtils.cleanup(LOG, domainSocketWatcher);
+  }
+
+
+  @Override
+  public String toString() {
+    return String.format("ShortCircuitShmManager(%08x)",
+        System.identityHashCode(this));
+  }
+
+  @VisibleForTesting
+  public DomainSocketWatcher getDomainSocketWatcher() {
+    return domainSocketWatcher;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index 15b8dea..db4cbe2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RetriableException;
@@ -201,7 +201,7 @@ public class ShortCircuitCache implements Closeable {
         DataInputStream in = new DataInputStream(sock.getInputStream());
         ReleaseShortCircuitAccessResponseProto resp =
             ReleaseShortCircuitAccessResponseProto.parseFrom(
-                PBHelperClient.vintPrefixed(in));
+                PBHelper.vintPrefixed(in));
         if (resp.getStatus() != Status.SUCCESS) {
           String error = resp.hasError() ? resp.getError() : "(unknown)";
           throw new IOException(resp.getStatus().toString() + ": " + error);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
new file mode 100644
index 0000000..7b89d0a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
@@ -0,0 +1,646 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+
+import sun.misc.Unsafe;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.primitives.Ints;
+
+/**
+ * A shared memory segment used to implement short-circuit reads.
+ */
+public class ShortCircuitShm {
+  private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class);
+
+  protected static final int BYTES_PER_SLOT = 64;
+
+  private static final Unsafe unsafe = safetyDance();
+
+  private static Unsafe safetyDance() {
+    try {
+      Field f = Unsafe.class.getDeclaredField("theUnsafe");
+      f.setAccessible(true);
+      return (Unsafe)f.get(null);
+    } catch (Throwable e) {
+      LOG.error("failed to load misc.Unsafe", e);
+    }
+    return null;
+  }
+
+  /**
+   * Calculate the usable size of a shared memory segment.
+   * We round down to a multiple of the slot size and do some validation.
+   *
+   * @param stream The stream we're using.
+   * @return       The usable size of the shared memory segment.
+   */
+  private static int getUsableLength(FileInputStream stream)
+      throws IOException {
+    int intSize = Ints.checkedCast(stream.getChannel().size());
+    int slots = intSize / BYTES_PER_SLOT;
+    if (slots == 0) {
+      throw new IOException("size of shared memory segment was " +
+          intSize + ", but that is not enough to hold even one slot.");
+    }
+    return slots * BYTES_PER_SLOT;
+  }
+
+  /**
+   * Identifies a DfsClientShm.
+   */
+  public static class ShmId implements Comparable<ShmId> {
+    private static final Random random = new Random();
+    private final long hi;
+    private final long lo;
+
+    /**
+     * Generate a random ShmId.
+     * 
+     * We generate ShmIds randomly to prevent a malicious client from
+     * successfully guessing one and using that to interfere with another
+     * client.
+     */
+    public static ShmId createRandom() {
+      return new ShmId(random.nextLong(), random.nextLong());
+    }
+
+    public ShmId(long hi, long lo) {
+      this.hi = hi;
+      this.lo = lo;
+    }
+    
+    public long getHi() {
+      return hi;
+    }
+    
+    public long getLo() {
+      return lo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      ShmId other = (ShmId)o;
+      return new EqualsBuilder().
+          append(hi, other.hi).
+          append(lo, other.lo).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.hi).
+          append(this.lo).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%016x%016x", hi, lo);
+    }
+
+    @Override
+    public int compareTo(ShmId other) {
+      return ComparisonChain.start().
+          compare(hi, other.hi).
+          compare(lo, other.lo).
+          result();
+    }
+  };
+
+  /**
+   * Uniquely identifies a slot.
+   */
+  public static class SlotId {
+    private final ShmId shmId;
+    private final int slotIdx;
+    
+    public SlotId(ShmId shmId, int slotIdx) {
+      this.shmId = shmId;
+      this.slotIdx = slotIdx;
+    }
+
+    public ShmId getShmId() {
+      return shmId;
+    }
+
+    public int getSlotIdx() {
+      return slotIdx;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      SlotId other = (SlotId)o;
+      return new EqualsBuilder().
+          append(shmId, other.shmId).
+          append(slotIdx, other.slotIdx).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.shmId).
+          append(this.slotIdx).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx);
+    }
+  }
+
+  public class SlotIterator implements Iterator<Slot> {
+    int slotIdx = -1;
+
+    @Override
+    public boolean hasNext() {
+      synchronized (ShortCircuitShm.this) {
+        return allocatedSlots.nextSetBit(slotIdx + 1) != -1;
+      }
+    }
+
+    @Override
+    public Slot next() {
+      synchronized (ShortCircuitShm.this) {
+        int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1);
+        if (nextSlotIdx == -1) {
+          throw new NoSuchElementException();
+        }
+        slotIdx = nextSlotIdx;
+        return slots[nextSlotIdx];
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("SlotIterator " +
+          "doesn't support removal");
+    }
+  }
+  
+  /**
+   * A slot containing information about a replica.
+   *
+   * The format is:
+   * word 0
+   *   bit 0:32   Slot flags (see below).
+   *   bit 33:63  Anchor count.
+   * word 1:7
+   *   Reserved for future use, such as statistics.
+   *   Padding is also useful for avoiding false sharing.
+   *
+   * Little-endian versus big-endian is not relevant here since both the client
+   * and the server reside on the same computer and use the same orientation.
+   */
+  public class Slot {
+    /**
+     * Flag indicating that the slot is valid.  
+     * 
+     * The DFSClient sets this flag when it allocates a new slot within one of
+     * its shared memory regions.
+     * 
+     * The DataNode clears this flag when the replica associated with this slot
+     * is no longer valid.  The client itself also clears this flag when it
+     * believes that the DataNode is no longer using this slot to communicate.
+     */
+    private static final long VALID_FLAG =          1L<<63;
+
+    /**
+     * Flag indicating that the slot can be anchored.
+     */
+    private static final long ANCHORABLE_FLAG =     1L<<62;
+
+    /**
+     * The slot address in memory.
+     */
+    private final long slotAddress;
+
+    /**
+     * BlockId of the block this slot is used for.
+     */
+    private final ExtendedBlockId blockId;
+
+    Slot(long slotAddress, ExtendedBlockId blockId) {
+      this.slotAddress = slotAddress;
+      this.blockId = blockId;
+    }
+
+    /**
+     * Get the short-circuit memory segment associated with this Slot.
+     *
+     * @return      The enclosing short-circuit memory segment.
+     */
+    public ShortCircuitShm getShm() {
+      return ShortCircuitShm.this;
+    }
+
+    /**
+     * Get the ExtendedBlockId associated with this slot.
+     *
+     * @return      The ExtendedBlockId of this slot.
+     */
+    public ExtendedBlockId getBlockId() {
+      return blockId;
+    }
+
+    /**
+     * Get the SlotId of this slot, containing both shmId and slotIdx.
+     *
+     * @return      The SlotId of this slot.
+     */
+    public SlotId getSlotId() {
+      return new SlotId(getShmId(), getSlotIdx());
+    }
+
+    /**
+     * Get the Slot index.
+     *
+     * @return      The index of this slot.
+     */
+    public int getSlotIdx() {
+      return Ints.checkedCast(
+          (slotAddress - baseAddress) / BYTES_PER_SLOT);
+    }
+
+    /**
+     * Clear the slot.
+     */
+    void clear() {
+      unsafe.putLongVolatile(null, this.slotAddress, 0);
+    }
+
+    private boolean isSet(long flag) {
+      long prev = unsafe.getLongVolatile(null, this.slotAddress);
+      return (prev & flag) != 0;
+    }
+
+    private void setFlag(long flag) {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        if ((prev & flag) != 0) {
+          return;
+        }
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev | flag));
+    }
+
+    private void clearFlag(long flag) {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        if ((prev & flag) == 0) {
+          return;
+        }
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev & (~flag)));
+    }
+    
+    public boolean isValid() {
+      return isSet(VALID_FLAG);
+    }
+
+    public void makeValid() {
+      setFlag(VALID_FLAG);
+    }
+
+    public void makeInvalid() {
+      clearFlag(VALID_FLAG);
+    }
+
+    public boolean isAnchorable() {
+      return isSet(ANCHORABLE_FLAG);
+    }
+
+    public void makeAnchorable() {
+      setFlag(ANCHORABLE_FLAG);
+    }
+
+    public void makeUnanchorable() {
+      clearFlag(ANCHORABLE_FLAG);
+    }
+
+    public boolean isAnchored() {
+      long prev = unsafe.getLongVolatile(null, this.slotAddress);
+      if ((prev & VALID_FLAG) == 0) {
+        // Slot is no longer valid.
+        return false;
+      }
+      return ((prev & 0x7fffffff) != 0);
+    }
+
+    /**
+     * Try to add an anchor for a given slot.
+     *
+     * When a slot is anchored, we know that the block it refers to is resident
+     * in memory.
+     *
+     * @return          True if the slot is anchored.
+     */
+    public boolean addAnchor() {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        if ((prev & VALID_FLAG) == 0) {
+          // Slot is no longer valid.
+          return false;
+        }
+        if ((prev & ANCHORABLE_FLAG) == 0) {
+          // Slot can't be anchored right now.
+          return false;
+        }
+        if ((prev & 0x7fffffff) == 0x7fffffff) {
+          // Too many other threads have anchored the slot (2 billion?)
+          return false;
+        }
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev + 1));
+      return true;
+    }
+
+    /**
+     * Remove an anchor for a given slot.
+     */
+    public void removeAnchor() {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        Preconditions.checkState((prev & 0x7fffffff) != 0,
+            "Tried to remove anchor for slot " + slotAddress +", which was " +
+            "not anchored.");
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev - 1));
+    }
+
+    @Override
+    public String toString() {
+      return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")";
+    }
+  }
+
+  /**
+   * ID for this SharedMemorySegment.
+   */
+  private final ShmId shmId;
+
+  /**
+   * The base address of the memory-mapped file.
+   */
+  private final long baseAddress;
+
+  /**
+   * The mmapped length of the shared memory segment
+   */
+  private final int mmappedLength;
+
+  /**
+   * The slots associated with this shared memory segment.
+   * slot[i] contains the slot at offset i * BYTES_PER_SLOT,
+   * or null if that slot is not allocated.
+   */
+  private final Slot slots[];
+
+  /**
+   * A bitset where each bit represents a slot which is in use.
+   */
+  private final BitSet allocatedSlots;
+
+  /**
+   * Create the ShortCircuitShm.
+   * 
+   * @param shmId       The ID to use.
+   * @param stream      The stream that we're going to use to create this 
+   *                    shared memory segment.
+   *                    
+   *                    Although this is a FileInputStream, we are going to
+   *                    assume that the underlying file descriptor is writable
+   *                    as well as readable. It would be more appropriate to 
use
+   *                    a RandomAccessFile here, but that class does not have
+   *                    any public accessor which returns a FileDescriptor,
+   *                    unlike FileInputStream.
+   */
+  public ShortCircuitShm(ShmId shmId, FileInputStream stream)
+        throws IOException {
+    if (!NativeIO.isAvailable()) {
+      throw new UnsupportedOperationException("NativeIO is not available.");
+    }
+    if (Shell.WINDOWS) {
+      throw new UnsupportedOperationException(
+          "DfsClientShm is not yet implemented for Windows.");
+    }
+    if (unsafe == null) {
+      throw new UnsupportedOperationException(
+          "can't use DfsClientShm because we failed to " +
+          "load misc.Unsafe.");
+    }
+    this.shmId = shmId;
+    this.mmappedLength = getUsableLength(stream);
+    this.baseAddress = POSIX.mmap(stream.getFD(), 
+        POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
+    this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
+    this.allocatedSlots = new BitSet(slots.length);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("creating " + this.getClass().getSimpleName() +
+          "(shmId=" + shmId +
+          ", mmappedLength=" + mmappedLength +
+          ", baseAddress=" + String.format("%x", baseAddress) +
+          ", slots.length=" + slots.length + ")");
+    }
+  }
+
+  public final ShmId getShmId() {
+    return shmId;
+  }
+  
+  /**
+   * Determine if this shared memory object is empty.
+   *
+   * @return    True if the shared memory object is empty.
+   */
+  synchronized final public boolean isEmpty() {
+    return allocatedSlots.nextSetBit(0) == -1;
+  }
+
+  /**
+   * Determine if this shared memory object is full.
+   *
+   * @return    True if the shared memory object is full.
+   */
+  synchronized final public boolean isFull() {
+    return allocatedSlots.nextClearBit(0) >= slots.length;
+  }
+
+  /**
+   * Calculate the base address of a slot.
+   *
+   * @param slotIdx   Index of the slot.
+   * @return          The base address of the slot.
+   */
+  private final long calculateSlotAddress(int slotIdx) {
+    long offset = slotIdx;
+    offset *= BYTES_PER_SLOT;
+    return this.baseAddress + offset;
+  }
+
+  /**
+   * Allocate a new slot and register it.
+   *
+   * This function chooses an empty slot, initializes it, and then returns
+   * the relevant Slot object.
+   *
+   * @return    The new slot.
+   */
+  synchronized public final Slot allocAndRegisterSlot(
+      ExtendedBlockId blockId) {
+    int idx = allocatedSlots.nextClearBit(0);
+    if (idx >= slots.length) {
+      throw new RuntimeException(this + ": no more slots are available.");
+    }
+    allocatedSlots.set(idx, true);
+    Slot slot = new Slot(calculateSlotAddress(idx), blockId);
+    slot.clear();
+    slot.makeValid();
+    slots[idx] = slot;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + 
allocatedSlots +
+                  StringUtils.getStackTrace(Thread.currentThread()));
+    }
+    return slot;
+  }
+
+  synchronized public final Slot getSlot(int slotIdx)
+      throws InvalidRequestException {
+    if (!allocatedSlots.get(slotIdx)) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " does not exist.");
+    }
+    return slots[slotIdx];
+  }
+
+  /**
+   * Register a slot.
+   *
+   * This function looks at a slot which has already been initialized (by
+   * another process), and registers it with us.  Then, it returns the 
+   * relevant Slot object.
+   *
+   * @return    The slot.
+   *
+   * @throws InvalidRequestException
+   *            If the slot index we're trying to allocate has not been
+   *            initialized, or is already in use.
+   */
+  synchronized public final Slot registerSlot(int slotIdx,
+      ExtendedBlockId blockId) throws InvalidRequestException {
+    if (slotIdx < 0) {
+      throw new InvalidRequestException(this + ": invalid negative slot " +
+          "index " + slotIdx);
+    }
+    if (slotIdx >= slots.length) {
+      throw new InvalidRequestException(this + ": invalid slot " +
+          "index " + slotIdx);
+    }
+    if (allocatedSlots.get(slotIdx)) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " is already in use.");
+    }
+    Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
+    if (!slot.isValid()) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " is not marked as valid.");
+    }
+    slots[slotIdx] = slot;
+    allocatedSlots.set(slotIdx, true);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + 
allocatedSlots +
+                  StringUtils.getStackTrace(Thread.currentThread()));
+    }
+    return slot;
+  }
+
+  /**
+   * Unregisters a slot.
+   * 
+   * This doesn't alter the contents of the slot.  It just means
+   *
+   * @param slotIdx  Index of the slot to unregister.
+   */
+  synchronized public final void unregisterSlot(int slotIdx) {
+    Preconditions.checkState(allocatedSlots.get(slotIdx),
+        "tried to unregister slot " + slotIdx + ", which was not registered.");
+    allocatedSlots.set(slotIdx, false);
+    slots[slotIdx] = null;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": unregisterSlot " + slotIdx);
+    }
+  }
+  
+  /**
+   * Iterate over all allocated slots.
+   * 
+   * Note that this method isn't safe if 
+   *
+   * @return        The slot iterator.
+   */
+  public SlotIterator slotIterator() {
+    return new SlotIterator();
+  }
+
+  public void free() {
+    try {
+      POSIX.munmap(baseAddress, mmappedLength);
+    } catch (IOException e) {
+      LOG.warn(this + ": failed to munmap", e);
+    }
+    LOG.trace(this + ": freed");
+  }
+  
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "(" + shmId + ")";
+  }
+}

Reply via email to