http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 8e81fdc..beaa903 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -695,7 +695,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB 
implements
       RpcController controller, GetDatanodeReportRequestProto req)
       throws ServiceException {
     try {
-      List<? extends DatanodeInfoProto> result = PBHelper.convert(server
+      List<? extends DatanodeInfoProto> result = PBHelperClient.convert(server
           .getDatanodeReport(PBHelper.convert(req.getType())));
       return GetDatanodeReportResponseProto.newBuilder()
           .addAllDi(result).build();
@@ -890,7 +890,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB 
implements
       server.setQuota(req.getPath(), req.getNamespaceQuota(),
           req.getStoragespaceQuota(),
           req.hasStorageType() ?
-          PBHelper.convertStorageType(req.getStorageType()): null);
+          PBHelperClient.convertStorageType(req.getStorageType()): null);
       return VOID_SETQUOTA_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -990,7 +990,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB 
implements
       GetDelegationTokenResponseProto.Builder rspBuilder = 
           GetDelegationTokenResponseProto.newBuilder();
       if (token != null) {
-        rspBuilder.setToken(PBHelper.convert(token));
+        rspBuilder.setToken(PBHelperClient.convert(token));
       }
       return rspBuilder.build();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index d6afa6e..d30982a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -390,7 +390,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       String holder) throws AccessControlException, FileNotFoundException,
         UnresolvedLinkException, IOException {
     AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder()
-        .setB(PBHelper.convert(b)).setSrc(src).setHolder(holder)
+        .setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder)
             .setFileId(fileId).build();
     try {
       rpcProxy.abandonBlock(null, req);
@@ -409,9 +409,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
     AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
         .setSrc(src).setClientName(clientName).setFileId(fileId);
     if (previous != null) 
-      req.setPrevious(PBHelper.convert(previous)); 
-    if (excludeNodes != null) 
-      req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
+      req.setPrevious(PBHelperClient.convert(previous));
+    if (excludeNodes != null)
+      req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes));
     if (favoredNodes != null) {
       req.addAllFavoredNodes(Arrays.asList(favoredNodes));
     }
@@ -433,10 +433,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .newBuilder()
         .setSrc(src)
         .setFileId(fileId)
-        .setBlk(PBHelper.convert(blk))
-        .addAllExistings(PBHelper.convert(existings))
+        .setBlk(PBHelperClient.convert(blk))
+        .addAllExistings(PBHelperClient.convert(existings))
         .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
-        .addAllExcludes(PBHelper.convert(excludes))
+        .addAllExcludes(PBHelperClient.convert(excludes))
         .setNumAdditionalNodes(numAdditionalNodes)
         .setClientName(clientName)
         .build();
@@ -458,7 +458,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setClientName(clientName)
         .setFileId(fileId);
     if (last != null)
-      req.setLast(PBHelper.convert(last));
+      req.setLast(PBHelperClient.convert(last));
     try {
       return rpcProxy.complete(null, req.build()).getResult();
     } catch (ServiceException e) {
@@ -819,7 +819,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setNamespaceQuota(namespaceQuota)
         .setStoragespaceQuota(storagespaceQuota);
     if (type != null) {
-      builder.setStorageType(PBHelper.convertStorageType(type));
+      builder.setStorageType(PBHelperClient.convertStorageType(type));
     }
     final SetQuotaRequestProto req = builder.build();
     try {
@@ -897,7 +897,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       String clientName) throws IOException {
     UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto
         .newBuilder()
-        .setBlock(PBHelper.convert(block))
+        .setBlock(PBHelperClient.convert(block))
         .setClientName(clientName)
         .build();
     try {
@@ -913,8 +913,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) 
throws IOException {
     UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder()
         .setClientName(clientName)
-        .setOldBlock(PBHelper.convert(oldBlock))
-        .setNewBlock(PBHelper.convert(newBlock))
+        .setOldBlock(PBHelperClient.convert(oldBlock))
+        .setNewBlock(PBHelperClient.convert(newBlock))
         .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes)))
         .addAllStorageIDs(storageIDs == null ? null : 
Arrays.asList(storageIDs))
         .build();
@@ -945,7 +945,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
     RenewDelegationTokenRequestProto req = 
RenewDelegationTokenRequestProto.newBuilder().
-        setToken(PBHelper.convert(token)).
+        setToken(PBHelperClient.convert(token)).
         build();
     try {
       return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime();
@@ -959,7 +959,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throws IOException {
     CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto
         .newBuilder()
-        .setToken(PBHelper.convert(token))
+        .setToken(PBHelperClient.convert(token))
         .build();
     try {
       rpcProxy.cancelDelegationToken(null, req);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 94028a2..0b46927 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -298,11 +298,11 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       ) throws IOException {
     CommitBlockSynchronizationRequestProto.Builder builder = 
         CommitBlockSynchronizationRequestProto.newBuilder()
-        .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
+        
.setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp)
         .setNewLength(newlength).setCloseFile(closeFile)
         .setDeleteBlock(deleteblock);
     for (int i = 0; i < newtargets.length; i++) {
-      builder.addNewTaragets(PBHelper.convert(newtargets[i]));
+      builder.addNewTaragets(PBHelperClient.convert(newtargets[i]));
       builder.addNewTargetStorages(newtargetstorages[i]);
     }
     CommitBlockSynchronizationRequestProto req = builder.build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
index fee62a4..17ba196 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
@@ -105,7 +105,7 @@ public class InterDatanodeProtocolTranslatorPB implements
       long recoveryId, long newBlockId, long newLength) throws IOException {
     UpdateReplicaUnderRecoveryRequestProto req = 
         UpdateReplicaUnderRecoveryRequestProto.newBuilder()
-        .setBlock(PBHelper.convert(oldBlock))
+        .setBlock(PBHelperClient.convert(oldBlock))
         .setNewLength(newLength).setNewBlockId(newBlockId)
         .setRecoveryId(recoveryId).build();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 82c5c4c..bcb96ba 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -101,7 +101,7 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
       throws IOException {
     GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
-        .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size)
+        
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
         .build();
     try {
       return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4ca5b26..887accf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -346,7 +346,7 @@ public class PBHelper {
     if (types == null || types.length == 0) {
       return null;
     }
-    List<StorageTypeProto> list = convertStorageTypes(types);
+    List<StorageTypeProto> list = PBHelperClient.convertStorageTypes(types);
     return StorageTypesProto.newBuilder().addAllStorageTypes(list).build();
   }
 
@@ -381,20 +381,6 @@ public class PBHelper {
         .getInfoSecurePort() : 0, dn.getIpcPort());
   }
 
-  public static DatanodeIDProto convert(DatanodeID dn) {
-    // For wire compatibility with older versions we transmit the StorageID
-    // which is the same as the DatanodeUuid. Since StorageID is a required
-    // field we pass the empty string if the DatanodeUuid is not yet known.
-    return DatanodeIDProto.newBuilder()
-        .setIpAddr(dn.getIpAddr())
-        .setHostName(dn.getHostName())
-        .setXferPort(dn.getXferPort())
-        .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : 
"")
-        .setInfoPort(dn.getInfoPort())
-        .setInfoSecurePort(dn.getInfoSecurePort())
-        .setIpcPort(dn.getIpcPort()).build();
-  }
-
   // Arrays of DatanodeId
   public static DatanodeIDProto[] convert(DatanodeID[] did) {
     if (did == null)
@@ -402,7 +388,7 @@ public class PBHelper {
     final int len = did.length;
     DatanodeIDProto[] result = new DatanodeIDProto[len];
     for (int i = 0; i < len; ++i) {
-      result[i] = convert(did[i]);
+      result[i] = PBHelperClient.convert(did[i]);
     }
     return result;
   }
@@ -433,7 +419,7 @@ public class PBHelper {
         .setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
         .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
-        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
+        
.addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()))
         .build();
   }
 
@@ -595,16 +581,6 @@ public class PBHelper {
        eb.getGenerationStamp());
   }
   
-  public static ExtendedBlockProto convert(final ExtendedBlock b) {
-    if (b == null) return null;
-   return ExtendedBlockProto.newBuilder().
-      setPoolId(b.getBlockPoolId()).
-      setBlockId(b.getBlockId()).
-      setNumBytes(b.getNumBytes()).
-      setGenerationStamp(b.getGenerationStamp()).
-      build();
-  }
-  
   public static RecoveringBlockProto convert(RecoveringBlock b) {
     if (b == null) {
       return null;
@@ -625,17 +601,6 @@ public class PBHelper {
         new RecoveringBlock(block, locs, b.getNewGenStamp());
   }
   
-  public static DatanodeInfoProto.AdminState convert(
-      final DatanodeInfo.AdminStates inAs) {
-    switch (inAs) {
-    case NORMAL: return  DatanodeInfoProto.AdminState.NORMAL;
-    case DECOMMISSION_INPROGRESS: 
-        return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
-    case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
-    default: return DatanodeInfoProto.AdminState.NORMAL;
-    }
-  }
-  
   static public DatanodeInfo convert(DatanodeInfoProto di) {
     if (di == null) return null;
     return new DatanodeInfo(
@@ -647,12 +612,6 @@ public class PBHelper {
         di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
   }
   
-  static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
-    if (di == null) return null;
-    return convert(di);
-  }
-  
-  
   static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
     if (di == null) return null;
     DatanodeInfo[] result = new DatanodeInfo[di.length];
@@ -662,27 +621,6 @@ public class PBHelper {
     return result;
   }
 
-  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
-      DatanodeInfo[] dnInfos) {
-    return convert(dnInfos, 0);
-  }
-  
-  /**
-   * Copy from {@code dnInfos} to a target of list of same size starting at
-   * {@code startIdx}.
-   */
-  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
-      DatanodeInfo[] dnInfos, int startIdx) {
-    if (dnInfos == null)
-      return null;
-    ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
-        .newArrayListWithCapacity(dnInfos.length);
-    for (int i = startIdx; i < dnInfos.length; i++) {
-      protos.add(convert(dnInfos[i]));
-    }
-    return protos;
-  }
-
   public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
     DatanodeInfo[] info = new DatanodeInfo[list.size()];
     for (int i = 0; i < info.length; i++) {
@@ -690,32 +628,11 @@ public class PBHelper {
     }
     return info;
   }
-  
-  public static DatanodeInfoProto convert(DatanodeInfo info) {
-    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
-    if (info.getNetworkLocation() != null) {
-      builder.setLocation(info.getNetworkLocation());
-    }
-    builder
-        .setId(PBHelper.convert((DatanodeID)info))
-        .setCapacity(info.getCapacity())
-        .setDfsUsed(info.getDfsUsed())
-        .setRemaining(info.getRemaining())
-        .setBlockPoolUsed(info.getBlockPoolUsed())
-        .setCacheCapacity(info.getCacheCapacity())
-        .setCacheUsed(info.getCacheUsed())
-        .setLastUpdate(info.getLastUpdate())
-        .setLastUpdateMonotonic(info.getLastUpdateMonotonic())
-        .setXceiverCount(info.getXceiverCount())
-        .setAdminState(PBHelper.convert(info.getAdminState()))
-        .build();
-    return builder.build();
-  }
 
   public static DatanodeStorageReportProto convertDatanodeStorageReport(
       DatanodeStorageReport report) {
     return DatanodeStorageReportProto.newBuilder()
-        .setDatanodeInfo(convert(report.getDatanodeInfo()))
+        .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo()))
         
.addAllStorageReports(convertStorageReports(report.getStorageReports()))
         .build();
   }
@@ -767,7 +684,7 @@ public class PBHelper {
         Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
     for (int i = 0; i < locs.length; i++) {
       DatanodeInfo loc = locs[i];
-      builder.addLocs(i, PBHelper.convert(loc));
+      builder.addLocs(i, PBHelperClient.convert(loc));
       boolean locIsCached = cachedLocs.contains(loc);
       builder.addIsCached(locIsCached);
       if (locIsCached) {
@@ -781,7 +698,7 @@ public class PBHelper {
     StorageType[] storageTypes = b.getStorageTypes();
     if (storageTypes != null) {
       for (int i = 0; i < storageTypes.length; ++i) {
-        builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
+        
builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i]));
       }
     }
     final String[] storageIDs = b.getStorageIDs();
@@ -789,8 +706,8 @@ public class PBHelper {
       builder.addAllStorageIDs(Arrays.asList(storageIDs));
     }
 
-    return builder.setB(PBHelper.convert(b.getBlock()))
-        .setBlockToken(PBHelper.convert(b.getBlockToken()))
+    return builder.setB(PBHelperClient.convert(b.getBlock()))
+        .setBlockToken(PBHelperClient.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
   }
   
@@ -831,14 +748,6 @@ public class PBHelper {
     return lb;
   }
 
-  public static TokenProto convert(Token<?> tok) {
-    return TokenProto.newBuilder().
-              setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
-              setPassword(ByteString.copyFrom(tok.getPassword())).
-              setKind(tok.getKind().toString()).
-              setService(tok.getService().toString()).build(); 
-  }
-  
   public static Token<BlockTokenIdentifier> convert(
       TokenProto blockToken) {
     return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
@@ -890,7 +799,7 @@ public class PBHelper {
       DatanodeRegistration registration) {
     DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
         .newBuilder();
-    return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration))
+    return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) 
registration))
         .setStorageInfo(PBHelper.convert(registration.getStorageInfo()))
         .setKeys(PBHelper.convert(registration.getExportedKeys()))
         .setSoftwareVersion(registration.getSoftwareVersion()).build();
@@ -982,7 +891,7 @@ public class PBHelper {
     if (types != null) {
       for (StorageType[] ts : types) {
         StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
-        builder.addAllStorageTypes(convertStorageTypes(ts));
+        builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts));
         list.add(builder.build());
       }
     }
@@ -1013,7 +922,7 @@ public class PBHelper {
     DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
     for (int i = 0; i < targets.length; i++) {
       ret[i] = DatanodeInfosProto.newBuilder()
-          .addAllDatanodes(PBHelper.convert(targets[i])).build();
+          .addAllDatanodes(PBHelperClient.convert(targets[i])).build();
     }
     return Arrays.asList(ret);
   }
@@ -1337,7 +1246,7 @@ public class PBHelper {
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
-        PBHelper.convert(fs.getChecksumType()));
+        PBHelperClient.convert(fs.getChecksumType()));
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -1350,7 +1259,7 @@ public class PBHelper {
       .setFileBufferSize(fs.getFileBufferSize())
       .setEncryptDataTransfer(fs.getEncryptDataTransfer())
       .setTrashInterval(fs.getTrashInterval())
-      .setChecksumType(PBHelper.convert(fs.getChecksumType()))
+      .setChecksumType(PBHelperClient.convert(fs.getChecksumType()))
       .build();
   }
   
@@ -1738,7 +1647,7 @@ public class PBHelper {
     if (cs.hasTypeQuotaInfos()) {
       for (HdfsProtos.StorageTypeQuotaInfoProto info :
           cs.getTypeQuotaInfos().getTypeQuotaInfoList()) {
-        StorageType type = PBHelper.convertStorageType(info.getType());
+        StorageType type = PBHelperClient.convertStorageType(info.getType());
         builder.typeConsumed(type, info.getConsumed());
         builder.typeQuota(type, info.getQuota());
       }
@@ -1762,7 +1671,7 @@ public class PBHelper {
       for (StorageType t: StorageType.getTypesSupportingQuota()) {
         HdfsProtos.StorageTypeQuotaInfoProto info =
             HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
-                setType(convertStorageType(t)).
+                setType(PBHelperClient.convertStorageType(t)).
                 setConsumed(cs.getTypeConsumed(t)).
                 setQuota(cs.getTypeQuota(t)).
                 build();
@@ -1807,7 +1716,7 @@ public class PBHelper {
   public static DatanodeStorageProto convert(DatanodeStorage s) {
     return DatanodeStorageProto.newBuilder()
         .setState(PBHelper.convertState(s.getState()))
-        .setStorageType(PBHelper.convertStorageType(s.getStorageType()))
+        .setStorageType(PBHelperClient.convertStorageType(s.getStorageType()))
         .setStorageUuid(s.getStorageID()).build();
   }
 
@@ -1821,44 +1730,10 @@ public class PBHelper {
     }
   }
 
-  public static List<StorageTypeProto> convertStorageTypes(
-      StorageType[] types) {
-    return convertStorageTypes(types, 0);
-  }
-
-  public static List<StorageTypeProto> convertStorageTypes(
-      StorageType[] types, int startIdx) {
-    if (types == null) {
-      return null;
-    }
-    final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
-        types.length);
-    for (int i = startIdx; i < types.length; ++i) {
-      protos.add(convertStorageType(types[i]));
-    }
-    return protos; 
-  }
-
-  public static StorageTypeProto convertStorageType(StorageType type) {
-    switch(type) {
-    case DISK:
-      return StorageTypeProto.DISK;
-    case SSD:
-      return StorageTypeProto.SSD;
-    case ARCHIVE:
-      return StorageTypeProto.ARCHIVE;
-    case RAM_DISK:
-      return StorageTypeProto.RAM_DISK;
-    default:
-      throw new IllegalStateException(
-          "BUG: StorageType not found, type=" + type);
-    }
-  }
-
   public static DatanodeStorage convert(DatanodeStorageProto s) {
     return new DatanodeStorage(s.getStorageUuid(),
                                PBHelper.convertState(s.getState()),
-                               
PBHelper.convertStorageType(s.getStorageType()));
+                               
PBHelperClient.convertStorageType(s.getStorageType()));
   }
 
   private static State convertState(StorageState state) {
@@ -1871,22 +1746,6 @@ public class PBHelper {
     }
   }
 
-  public static StorageType convertStorageType(StorageTypeProto type) {
-    switch(type) {
-      case DISK:
-        return StorageType.DISK;
-      case SSD:
-        return StorageType.SSD;
-      case ARCHIVE:
-        return StorageType.ARCHIVE;
-      case RAM_DISK:
-        return StorageType.RAM_DISK;
-      default:
-        throw new IllegalStateException(
-            "BUG: StorageTypeProto not found, type=" + type);
-    }
-  }
-
   public static StorageType[] convertStorageTypes(
       List<StorageTypeProto> storageTypesList, int expectedSize) {
     final StorageType[] storageTypes = new StorageType[expectedSize];
@@ -1895,7 +1754,7 @@ public class PBHelper {
       Arrays.fill(storageTypes, StorageType.DEFAULT);
     } else {
       for (int i = 0; i < storageTypes.length; ++i) {
-        storageTypes[i] = convertStorageType(storageTypesList.get(i));
+        storageTypes[i] = 
PBHelperClient.convertStorageType(storageTypesList.get(i));
       }
     }
     return storageTypes;
@@ -2079,10 +1938,6 @@ public class PBHelper {
     return reportProto;
   }
 
-  public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
-    return DataChecksum.Type.valueOf(type.getNumber());
-  }
-
   public static CacheDirectiveInfoProto convert
       (CacheDirectiveInfo info) {
     CacheDirectiveInfoProto.Builder builder = 
@@ -2255,9 +2110,6 @@ public class PBHelper {
     return new CachePoolEntry(info, stats);
   }
   
-  public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
-    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
-  }
 
   public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) {
     DatanodeLocalInfoProto.Builder builder = 
DatanodeLocalInfoProto.newBuilder();
@@ -2272,17 +2124,6 @@ public class PBHelper {
         proto.getConfigVersion(), proto.getUptime());
   }
 
-  public static InputStream vintPrefixed(final InputStream input)
-      throws IOException {
-    final int firstByte = input.read();
-    if (firstByte == -1) {
-      throw new EOFException("Premature EOF: no length prefix available");
-    }
-
-    int size = CodedInputStream.readRawVarint32(firstByte, input);
-    assert size >= 0;
-    return new ExactSizeInputStream(input, size);
-  }
 
   private static AclEntryScopeProto convert(AclEntryScope v) {
     return AclEntryScopeProto.valueOf(v.ordinal());
@@ -2506,30 +2347,11 @@ public class PBHelper {
         proto.getKeyName());
   }
 
-  public static ShortCircuitShmSlotProto convert(SlotId slotId) {
-    return ShortCircuitShmSlotProto.newBuilder().
-        setShmId(convert(slotId.getShmId())).
-        setSlotIdx(slotId.getSlotIdx()).
-        build();
-  }
-
-  public static ShortCircuitShmIdProto convert(ShmId shmId) {
-    return ShortCircuitShmIdProto.newBuilder().
-        setHi(shmId.getHi()).
-        setLo(shmId.getLo()).
-        build();
-
-  }
-
   public static SlotId convert(ShortCircuitShmSlotProto slotId) {
-    return new SlotId(PBHelper.convert(slotId.getShmId()),
+    return new SlotId(PBHelperClient.convert(slotId.getShmId()),
         slotId.getSlotIdx());
   }
 
-  public static ShmId convert(ShortCircuitShmIdProto shmId) {
-    return new ShmId(shmId.getHi(), shmId.getLo());
-  }
-
   private static Event.CreateEvent.INodeType 
createTypeConvert(InotifyProtos.INodeType
       type) {
     switch (type) {
@@ -3036,18 +2858,6 @@ public class PBHelper {
         ezKeyVersionName);
   }
 
-  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
-    List<Boolean> pinnings = new ArrayList<Boolean>();
-    if (targetPinnings == null) {
-      pinnings.add(Boolean.FALSE);
-    } else {
-      for (; idx < targetPinnings.length; ++idx) {
-        pinnings.add(Boolean.valueOf(targetPinnings[idx]));
-      }
-    }
-    return pinnings;
-  }
-
   public static boolean[] convertBooleanList(
     List<Boolean> targetPinningsList) {
     final boolean[] targetPinnings = new boolean[targetPinningsList.size()];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
deleted file mode 100644
index 2fa86fa..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.security.token.block;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Access token verification failed.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class InvalidBlockTokenException extends IOException {
-  private static final long serialVersionUID = 168L;
-
-  public InvalidBlockTokenException() {
-    super();
-  }
-
-  public InvalidBlockTokenException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index a5e22ec..be1a9ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
deleted file mode 100644
index 215df13..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-/**
- * The caching strategy we should use for an HDFS read or write operation.
- */
-public class CachingStrategy {
-  private final Boolean dropBehind; // null = use server defaults
-  private final Long readahead; // null = use server defaults
-  
-  public static CachingStrategy newDefaultStrategy() {
-    return new CachingStrategy(null, null);
-  }
-
-  public static CachingStrategy newDropBehind() {
-    return new CachingStrategy(true, null);
-  }
-
-  public static class Builder {
-    private Boolean dropBehind;
-    private Long readahead;
-
-    public Builder(CachingStrategy prev) {
-      this.dropBehind = prev.dropBehind;
-      this.readahead = prev.readahead;
-    }
-
-    public Builder setDropBehind(Boolean dropBehind) {
-      this.dropBehind = dropBehind;
-      return this;
-    }
-
-    public Builder setReadahead(Long readahead) {
-      this.readahead = readahead;
-      return this;
-    }
-
-    public CachingStrategy build() {
-      return new CachingStrategy(dropBehind, readahead);
-    }
-  }
-
-  public CachingStrategy(Boolean dropBehind, Long readahead) {
-    this.dropBehind = dropBehind;
-    this.readahead = readahead;
-  }
-
-  public Boolean getDropBehind() {
-    return dropBehind;
-  }
-  
-  public Long getReadahead() {
-    return readahead;
-  }
-
-  public String toString() {
-    return "CachingStrategy(dropBehind=" + dropBehind +
-        ", readahead=" + readahead + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index ecf139c..5bc50b0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -135,7 +135,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
 import 
org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
@@ -2142,7 +2142,7 @@ public class DataNode extends ReconfigurableBase
         // read ack
         if (isClient) {
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              PBHelper.vintPrefixed(in));
+              PBHelperClient.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
             LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index e9cf436..dfaa525 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -70,7 +70,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import 
org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
@@ -427,7 +427,7 @@ class DataXceiver extends Receiver implements Runnable {
       throws IOException {
     DataNodeFaultInjector.get().sendShortCircuitShmResponse();
     ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
-        setId(PBHelper.convert(shmInfo.shmId)).build().
+        setId(PBHelperClient.convert(shmInfo.shmId)).build().
         writeDelimitedTo(socketOut);
     // Send the file descriptor for the shared memory segment.
     byte buf[] = new byte[] { (byte)0 };
@@ -559,7 +559,7 @@ class DataXceiver extends Receiver implements Runnable {
         // to respond with a Status enum.
         try {
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
-              PBHelper.vintPrefixed(in));
+              PBHelperClient.vintPrefixed(in));
           if (!stat.hasStatus()) {
             LOG.warn("Client " + peer.getRemoteAddressString() +
                 " did not send a valid status code after reading. " +
@@ -745,7 +745,7 @@ class DataXceiver extends Receiver implements Runnable {
           // read connect ack (only for clients, not for replication req)
           if (isClient) {
             BlockOpResponseProto connectAck =
-              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
+              
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
             mirrorInStatus = connectAck.getStatus();
             firstBadLink = connectAck.getFirstBadLink();
             if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@@ -962,7 +962,7 @@ class DataXceiver extends Receiver implements Runnable {
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelper.convert(checksum.getChecksumType())))
+          .setCrcType(PBHelperClient.convert(checksum.getChecksumType())))
         .build()
         .writeDelimitedTo(out);
       out.flush();
@@ -1147,8 +1147,8 @@ class DataXceiver extends Receiver implements Runnable {
         // receive the response from the proxy
         
         BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
-            PBHelper.vintPrefixed(proxyReply));
-        
+            PBHelperClient.vintPrefixed(proxyReply));
+
         String logInfo = "copy block " + block + " from "
             + proxySock.getRemoteSocketAddress();
         DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 25fd99d..3a9c64e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -155,7 +156,7 @@ public final class FSImageFormatPBINode {
       QuotaByStorageTypeFeatureProto proto) {
       ImmutableList.Builder<QuotaByStorageTypeEntry> b = 
ImmutableList.builder();
       for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) {
-        StorageType type = 
PBHelper.convertStorageType(quotaEntry.getStorageType());
+        StorageType type = 
PBHelperClient.convertStorageType(quotaEntry.getStorageType());
         long quota = quotaEntry.getQuota();
         b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type)
             .setQuota(quota).build());
@@ -462,7 +463,7 @@ public final class FSImageFormatPBINode {
         if (q.getTypeSpace(t) >= 0) {
           QuotaByStorageTypeEntryProto.Builder eb =
               QuotaByStorageTypeEntryProto.newBuilder().
-              setStorageType(PBHelper.convertStorageType(t)).
+              setStorageType(PBHelperClient.convertStorageType(t)).
               setQuota(q.getTypeSpace(t));
           b.addQuotas(eb);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
deleted file mode 100644
index 81cc68d..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import 
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.DomainSocketWatcher;
-
-import com.google.common.base.Preconditions;
-
-/**
- * DfsClientShm is a subclass of ShortCircuitShm which is used by the
- * DfsClient.
- * When the UNIX domain socket associated with this shared memory segment
- * closes unexpectedly, we mark the slots inside this segment as disconnected.
- * ShortCircuitReplica objects that contain disconnected slots are stale,
- * and will not be used to service new reads or mmap operations.
- * However, in-progress read or mmap operations will continue to proceed.
- * Once the last slot is deallocated, the segment can be safely munmapped.
- *
- * Slots may also become stale because the associated replica has been deleted
- * on the DataNode.  In this case, the DataNode will clear the 'valid' bit.
- * The client will then see these slots as stale (see
- * #{ShortCircuitReplica#isStale}).
- */
-public class DfsClientShm extends ShortCircuitShm
-    implements DomainSocketWatcher.Handler {
-  /**
-   * The EndpointShmManager associated with this shared memory segment.
-   */
-  private final EndpointShmManager manager;
-
-  /**
-   * The UNIX domain socket associated with this DfsClientShm.
-   * We rely on the DomainSocketWatcher to close the socket associated with
-   * this DomainPeer when necessary.
-   */
-  private final DomainPeer peer;
-
-  /**
-   * True if this shared memory segment has lost its connection to the
-   * DataNode.
-   *
-   * {@link DfsClientShm#handle} sets this to true.
-   */
-  private boolean disconnected = false;
-
-  DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
-      DomainPeer peer) throws IOException {
-    super(shmId, stream);
-    this.manager = manager;
-    this.peer = peer;
-  }
-
-  public EndpointShmManager getEndpointShmManager() {
-    return manager;
-  }
-
-  public DomainPeer getPeer() {
-    return peer;
-  }
-
-  /**
-   * Determine if the shared memory segment is disconnected from the DataNode.
-   *
-   * This must be called with the DfsClientShmManager lock held.
-   *
-   * @return   True if the shared memory segment is stale.
-   */
-  public synchronized boolean isDisconnected() {
-    return disconnected;
-  }
-
-  /**
-   * Handle the closure of the UNIX domain socket associated with this shared
-   * memory segment by marking this segment as stale.
-   *
-   * If there are no slots associated with this shared memory segment, it will
-   * be freed immediately in this function.
-   */
-  @Override
-  public boolean handle(DomainSocket sock) {
-    manager.unregisterShm(getShmId());
-    synchronized (this) {
-      Preconditions.checkState(!disconnected);
-      disconnected = true;
-      boolean hadSlots = false;
-      for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
-        Slot slot = iter.next();
-        slot.makeInvalid();
-        hadSlots = true;
-      }
-      if (!hadSlots) {
-        free();
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
deleted file mode 100644
index 062539a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.DomainSocketWatcher;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * Manages short-circuit memory segments for an HDFS client.
- * 
- * Clients are responsible for requesting and releasing shared memory segments 
used
- * for communicating with the DataNode. The client will try to allocate new 
slots
- * in the set of existing segments, falling back to getting a new segment from 
the
- * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}.
- * 
- * The counterpart to this class on the DataNode is {@link 
ShortCircuitRegistry}.
- * See {@link ShortCircuitRegistry} for more information on the communication 
protocol.
- */
-@InterfaceAudience.Private
-public class DfsClientShmManager implements Closeable {
-  private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
-
-  /**
-   * Manages short-circuit memory segments that pertain to a given DataNode.
-   */
-  class EndpointShmManager {
-    /**
-     * The datanode we're managing.
-     */
-    private final DatanodeInfo datanode;
-
-    /**
-     * Shared memory segments which have no empty slots.
-     *
-     * Protected by the manager lock.
-     */
-    private final TreeMap<ShmId, DfsClientShm> full =
-        new TreeMap<ShmId, DfsClientShm>();
-
-    /**
-     * Shared memory segments which have at least one empty slot.
-     *
-     * Protected by the manager lock.
-     */
-    private final TreeMap<ShmId, DfsClientShm> notFull =
-        new TreeMap<ShmId, DfsClientShm>();
-
-    /**
-     * True if this datanode doesn't support short-circuit shared memory
-     * segments.
-     *
-     * Protected by the manager lock.
-     */
-    private boolean disabled = false;
-
-    /**
-     * True if we're in the process of loading a shared memory segment from
-     * this DataNode.
-     *
-     * Protected by the manager lock.
-     */
-    private boolean loading = false;
-
-    EndpointShmManager (DatanodeInfo datanode) {
-      this.datanode = datanode;
-    }
-
-    /**
-     * Pull a slot out of a preexisting shared memory segment.
-     *
-     * Must be called with the manager lock held.
-     *
-     * @param blockId     The blockId to put inside the Slot object.
-     *
-     * @return            null if none of our shared memory segments contain a
-     *                      free slot; the slot object otherwise.
-     */
-    private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
-      if (notFull.isEmpty()) {
-        return null;
-      }
-      Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
-      DfsClientShm shm = entry.getValue();
-      ShmId shmId = shm.getShmId();
-      Slot slot = shm.allocAndRegisterSlot(blockId);
-      if (shm.isFull()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
-              " out of " + shm);
-        }
-        DfsClientShm removedShm = notFull.remove(shmId);
-        Preconditions.checkState(removedShm == shm);
-        full.put(shmId, shm);
-      } else {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
-              " out of " + shm);
-        }
-      }
-      return slot;
-    }
-
-    /**
-     * Ask the DataNode for a new shared memory segment.  This function must be
-     * called with the manager lock held.  We will release the lock while
-     * communicating with the DataNode.
-     *
-     * @param clientName    The current client name.
-     * @param peer          The peer to use to talk to the DataNode.
-     *
-     * @return              Null if the DataNode does not support shared memory
-     *                        segments, or experienced an error creating the
-     *                        shm.  The shared memory segment itself on 
success.
-     * @throws IOException  If there was an error communicating over the 
socket.
-     *                        We will not throw an IOException unless the 
socket
-     *                        itself (or the network) is the problem.
-     */
-    private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
-        throws IOException {
-      final DataOutputStream out = 
-          new DataOutputStream(
-              new BufferedOutputStream(peer.getOutputStream()));
-      new Sender(out).requestShortCircuitShm(clientName);
-      ShortCircuitShmResponseProto resp = 
-          ShortCircuitShmResponseProto.parseFrom(
-              PBHelper.vintPrefixed(peer.getInputStream()));
-      String error = resp.hasError() ? resp.getError() : "(unknown)";
-      switch (resp.getStatus()) {
-      case SUCCESS:
-        DomainSocket sock = peer.getDomainSocket();
-        byte buf[] = new byte[1];
-        FileInputStream fis[] = new FileInputStream[1];
-        if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
-          throw new EOFException("got EOF while trying to transfer the " +
-              "file descriptor for the shared memory segment.");
-        }
-        if (fis[0] == null) {
-          throw new IOException("the datanode " + datanode + " failed to " +
-              "pass a file descriptor for the shared memory segment.");
-        }
-        try {
-          DfsClientShm shm = 
-              new DfsClientShm(PBHelper.convert(resp.getId()),
-                  fis[0], this, peer);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": createNewShm: created " + shm);
-          }
-          return shm;
-        } finally {
-          IOUtils.cleanup(LOG,  fis[0]);
-        }
-      case ERROR_UNSUPPORTED:
-        // The DataNode just does not support short-circuit shared memory
-        // access, and we should stop asking.
-        LOG.info(this + ": datanode does not support short-circuit " +
-            "shared memory access: " + error);
-        disabled = true;
-        return null;
-      default:
-        // The datanode experienced some kind of unexpected error when trying 
to
-        // create the short-circuit shared memory segment.
-        LOG.warn(this + ": error requesting short-circuit shared memory " +
-            "access: " + error);
-        return null;
-      }
-    }
-
-    /**
-     * Allocate a new shared memory slot connected to this datanode.
-     *
-     * Must be called with the EndpointShmManager lock held.
-     *
-     * @param peer          The peer to use to talk to the DataNode.
-     * @param usedPeer      (out param) Will be set to true if we used the 
peer.
-     *                        When a peer is used
-     *
-     * @param clientName    The client name.
-     * @param blockId       The block ID to use.
-     * @return              null if the DataNode does not support shared memory
-     *                        segments, or experienced an error creating the
-     *                        shm.  The shared memory segment itself on 
success.
-     * @throws IOException  If there was an error communicating over the 
socket.
-     */
-    Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
-        String clientName, ExtendedBlockId blockId) throws IOException {
-      while (true) {
-        if (closed) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": the DfsClientShmManager has been closed.");
-          }
-          return null;
-        }
-        if (disabled) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": shared memory segment access is disabled.");
-          }
-          return null;
-        }
-        // Try to use an existing slot.
-        Slot slot = allocSlotFromExistingShm(blockId);
-        if (slot != null) {
-          return slot;
-        }
-        // There are no free slots.  If someone is loading more slots, wait
-        // for that to finish.
-        if (loading) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": waiting for loading to finish...");
-          }
-          finishedLoading.awaitUninterruptibly();
-        } else {
-          // Otherwise, load the slot ourselves.
-          loading = true;
-          lock.unlock();
-          DfsClientShm shm;
-          try {
-            shm = requestNewShm(clientName, peer);
-            if (shm == null) continue;
-            // See #{DfsClientShmManager#domainSocketWatcher} for details
-            // about why we do this before retaking the manager lock.
-            domainSocketWatcher.add(peer.getDomainSocket(), shm);
-            // The DomainPeer is now our responsibility, and should not be
-            // closed by the caller.
-            usedPeer.setValue(true);
-          } finally {
-            lock.lock();
-            loading = false;
-            finishedLoading.signalAll();
-          }
-          if (shm.isDisconnected()) {
-            // If the peer closed immediately after the shared memory segment
-            // was created, the DomainSocketWatcher callback might already have
-            // fired and marked the shm as disconnected.  In this case, we
-            // obviously don't want to add the SharedMemorySegment to our list
-            // of valid not-full segments.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(this + ": the UNIX domain socket associated with " +
-                  "this short-circuit memory closed before we could make " +
-                  "use of the shm.");
-            }
-          } else {
-            notFull.put(shm.getShmId(), shm);
-          }
-        }
-      }
-    }
-    
-    /**
-     * Stop tracking a slot.
-     *
-     * Must be called with the EndpointShmManager lock held.
-     *
-     * @param slot          The slot to release.
-     */
-    void freeSlot(Slot slot) {
-      DfsClientShm shm = (DfsClientShm)slot.getShm();
-      shm.unregisterSlot(slot.getSlotIdx());
-      if (shm.isDisconnected()) {
-        // Stale shared memory segments should not be tracked here.
-        Preconditions.checkState(!full.containsKey(shm.getShmId()));
-        Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
-        if (shm.isEmpty()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": freeing empty stale " + shm);
-          }
-          shm.free();
-        }
-      } else {
-        ShmId shmId = shm.getShmId();
-        full.remove(shmId); // The shm can't be full if we just freed a slot.
-        if (shm.isEmpty()) {
-          notFull.remove(shmId);
-  
-          // If the shared memory segment is now empty, we call shutdown(2) on
-          // the UNIX domain socket associated with it.  The 
DomainSocketWatcher,
-          // which is watching this socket, will call DfsClientShm#handle,
-          // cleaning up this shared memory segment.
-          //
-          // See #{DfsClientShmManager#domainSocketWatcher} for details about 
why
-          // we don't want to call DomainSocketWatcher#remove directly here.
-          //
-          // Note that we could experience 'fragmentation' here, where the
-          // DFSClient allocates a bunch of slots in different shared memory
-          // segments, and then frees most of them, but never fully empties out
-          // any segment.  We make some attempt to avoid this fragmentation by
-          // always allocating new slots out of the shared memory segment with 
the
-          // lowest ID, but it could still occur.  In most workloads,
-          // fragmentation should not be a major concern, since it doesn't 
impact
-          // peak file descriptor usage or the speed of allocation.
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": shutting down UNIX domain socket for " +
-                "empty " + shm);
-          }
-          shutdown(shm);
-        } else {
-          notFull.put(shmId, shm);
-        }
-      }
-    }
-    
-    /**
-     * Unregister a shared memory segment.
-     *
-     * Once a segment is unregistered, we will not allocate any more slots
-     * inside that segment.
-     *
-     * The DomainSocketWatcher calls this while holding the DomainSocketWatcher
-     * lock.
-     *
-     * @param shmId         The ID of the shared memory segment to unregister.
-     */
-    void unregisterShm(ShmId shmId) {
-      lock.lock();
-      try {
-        full.remove(shmId);
-        notFull.remove(shmId);
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    @Override
-    public String toString() {
-      return String.format("EndpointShmManager(%s, parent=%s)",
-          datanode, DfsClientShmManager.this);
-    }
-
-    PerDatanodeVisitorInfo getVisitorInfo() {
-      return new PerDatanodeVisitorInfo(full, notFull, disabled);
-    }
-
-    final void shutdown(DfsClientShm shm) {
-      try {
-        shm.getPeer().getDomainSocket().shutdown();
-      } catch (IOException e) {
-        LOG.warn(this + ": error shutting down shm: got IOException calling " +
-            "shutdown(SHUT_RDWR)", e);
-      }
-    }
-  }
-
-  private boolean closed = false;
-
-  private final ReentrantLock lock = new ReentrantLock();
-
-  /**
-   * A condition variable which is signalled when we finish loading a segment
-   * from the Datanode.
-   */
-  private final Condition finishedLoading = lock.newCondition();
-
-  /**
-   * Information about each Datanode.
-   */
-  private final HashMap<DatanodeInfo, EndpointShmManager> datanodes =
-      new HashMap<DatanodeInfo, EndpointShmManager>(1);
-  
-  /**
-   * The DomainSocketWatcher which keeps track of the UNIX domain socket
-   * associated with each shared memory segment.
-   *
-   * Note: because the DomainSocketWatcher makes callbacks into this
-   * DfsClientShmManager object, you must MUST NOT attempt to take the
-   * DomainSocketWatcher lock while holding the DfsClientShmManager lock,
-   * or else deadlock might result.   This means that most DomainSocketWatcher
-   * methods are off-limits unless you release the manager lock first.
-   */
-  private final DomainSocketWatcher domainSocketWatcher;
-  
-  DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
-    this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs,
-        "client");
-  }
-  
-  public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
-      MutableBoolean usedPeer, ExtendedBlockId blockId,
-      String clientName) throws IOException {
-    lock.lock();
-    try {
-      if (closed) {
-        LOG.trace(this + ": the DfsClientShmManager isclosed.");
-        return null;
-      }
-      EndpointShmManager shmManager = datanodes.get(datanode);
-      if (shmManager == null) {
-        shmManager = new EndpointShmManager(datanode);
-        datanodes.put(datanode, shmManager);
-      }
-      return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
-    } finally {
-      lock.unlock();
-    }
-  }
-  
-  public void freeSlot(Slot slot) {
-    lock.lock();
-    try {
-      DfsClientShm shm = (DfsClientShm)slot.getShm();
-      shm.getEndpointShmManager().freeSlot(slot);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @VisibleForTesting
-  public static class PerDatanodeVisitorInfo {
-    public final TreeMap<ShmId, DfsClientShm> full;
-    public final TreeMap<ShmId, DfsClientShm> notFull;
-    public final boolean disabled;
-
-    PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full,
-        TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) {
-      this.full = full;
-      this.notFull = notFull;
-      this.disabled = disabled;
-    }
-  }
-
-  @VisibleForTesting
-  public interface Visitor {
-    void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
-        throws IOException;
-  }
-
-  @VisibleForTesting
-  public void visit(Visitor visitor) throws IOException {
-    lock.lock();
-    try {
-      HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = 
-          new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
-      for (Entry<DatanodeInfo, EndpointShmManager> entry :
-            datanodes.entrySet()) {
-        info.put(entry.getKey(), entry.getValue().getVisitorInfo());
-      }
-      visitor.visit(info);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Close the DfsClientShmManager.
-   */
-  @Override
-  public void close() throws IOException {
-    lock.lock();
-    try {
-      if (closed) return;
-      closed = true;
-    } finally {
-      lock.unlock();
-    }
-    // When closed, the domainSocketWatcher will issue callbacks that mark
-    // all the outstanding DfsClientShm segments as stale.
-    IOUtils.cleanup(LOG, domainSocketWatcher);
-  }
-
-
-  @Override
-  public String toString() {
-    return String.format("ShortCircuitShmManager(%08x)",
-        System.identityHashCode(this));
-  }
-
-  @VisibleForTesting
-  public DomainSocketWatcher getDomainSocketWatcher() {
-    return domainSocketWatcher;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index db4cbe2..15b8dea 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RetriableException;
@@ -201,7 +201,7 @@ public class ShortCircuitCache implements Closeable {
         DataInputStream in = new DataInputStream(sock.getInputStream());
         ReleaseShortCircuitAccessResponseProto resp =
             ReleaseShortCircuitAccessResponseProto.parseFrom(
-                PBHelper.vintPrefixed(in));
+                PBHelperClient.vintPrefixed(in));
         if (resp.getStatus() != Status.SUCCESS) {
           String error = resp.hasError() ? resp.getError() : "(unknown)";
           throw new IOException(resp.getStatus().toString() + ": " + error);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
deleted file mode 100644
index 7b89d0a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
+++ /dev/null
@@ -1,646 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Random;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.InvalidRequestException;
-import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
-
-import sun.misc.Unsafe;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.primitives.Ints;
-
-/**
- * A shared memory segment used to implement short-circuit reads.
- */
-public class ShortCircuitShm {
-  private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class);
-
-  protected static final int BYTES_PER_SLOT = 64;
-
-  private static final Unsafe unsafe = safetyDance();
-
-  private static Unsafe safetyDance() {
-    try {
-      Field f = Unsafe.class.getDeclaredField("theUnsafe");
-      f.setAccessible(true);
-      return (Unsafe)f.get(null);
-    } catch (Throwable e) {
-      LOG.error("failed to load misc.Unsafe", e);
-    }
-    return null;
-  }
-
-  /**
-   * Calculate the usable size of a shared memory segment.
-   * We round down to a multiple of the slot size and do some validation.
-   *
-   * @param stream The stream we're using.
-   * @return       The usable size of the shared memory segment.
-   */
-  private static int getUsableLength(FileInputStream stream)
-      throws IOException {
-    int intSize = Ints.checkedCast(stream.getChannel().size());
-    int slots = intSize / BYTES_PER_SLOT;
-    if (slots == 0) {
-      throw new IOException("size of shared memory segment was " +
-          intSize + ", but that is not enough to hold even one slot.");
-    }
-    return slots * BYTES_PER_SLOT;
-  }
-
-  /**
-   * Identifies a DfsClientShm.
-   */
-  public static class ShmId implements Comparable<ShmId> {
-    private static final Random random = new Random();
-    private final long hi;
-    private final long lo;
-
-    /**
-     * Generate a random ShmId.
-     * 
-     * We generate ShmIds randomly to prevent a malicious client from
-     * successfully guessing one and using that to interfere with another
-     * client.
-     */
-    public static ShmId createRandom() {
-      return new ShmId(random.nextLong(), random.nextLong());
-    }
-
-    public ShmId(long hi, long lo) {
-      this.hi = hi;
-      this.lo = lo;
-    }
-    
-    public long getHi() {
-      return hi;
-    }
-    
-    public long getLo() {
-      return lo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if ((o == null) || (o.getClass() != this.getClass())) {
-        return false;
-      }
-      ShmId other = (ShmId)o;
-      return new EqualsBuilder().
-          append(hi, other.hi).
-          append(lo, other.lo).
-          isEquals();
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().
-          append(this.hi).
-          append(this.lo).
-          toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%016x%016x", hi, lo);
-    }
-
-    @Override
-    public int compareTo(ShmId other) {
-      return ComparisonChain.start().
-          compare(hi, other.hi).
-          compare(lo, other.lo).
-          result();
-    }
-  };
-
-  /**
-   * Uniquely identifies a slot.
-   */
-  public static class SlotId {
-    private final ShmId shmId;
-    private final int slotIdx;
-    
-    public SlotId(ShmId shmId, int slotIdx) {
-      this.shmId = shmId;
-      this.slotIdx = slotIdx;
-    }
-
-    public ShmId getShmId() {
-      return shmId;
-    }
-
-    public int getSlotIdx() {
-      return slotIdx;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if ((o == null) || (o.getClass() != this.getClass())) {
-        return false;
-      }
-      SlotId other = (SlotId)o;
-      return new EqualsBuilder().
-          append(shmId, other.shmId).
-          append(slotIdx, other.slotIdx).
-          isEquals();
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().
-          append(this.shmId).
-          append(this.slotIdx).
-          toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx);
-    }
-  }
-
-  public class SlotIterator implements Iterator<Slot> {
-    int slotIdx = -1;
-
-    @Override
-    public boolean hasNext() {
-      synchronized (ShortCircuitShm.this) {
-        return allocatedSlots.nextSetBit(slotIdx + 1) != -1;
-      }
-    }
-
-    @Override
-    public Slot next() {
-      synchronized (ShortCircuitShm.this) {
-        int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1);
-        if (nextSlotIdx == -1) {
-          throw new NoSuchElementException();
-        }
-        slotIdx = nextSlotIdx;
-        return slots[nextSlotIdx];
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("SlotIterator " +
-          "doesn't support removal");
-    }
-  }
-  
-  /**
-   * A slot containing information about a replica.
-   *
-   * The format is:
-   * word 0
-   *   bit 0:32   Slot flags (see below).
-   *   bit 33:63  Anchor count.
-   * word 1:7
-   *   Reserved for future use, such as statistics.
-   *   Padding is also useful for avoiding false sharing.
-   *
-   * Little-endian versus big-endian is not relevant here since both the client
-   * and the server reside on the same computer and use the same orientation.
-   */
-  public class Slot {
-    /**
-     * Flag indicating that the slot is valid.  
-     * 
-     * The DFSClient sets this flag when it allocates a new slot within one of
-     * its shared memory regions.
-     * 
-     * The DataNode clears this flag when the replica associated with this slot
-     * is no longer valid.  The client itself also clears this flag when it
-     * believes that the DataNode is no longer using this slot to communicate.
-     */
-    private static final long VALID_FLAG =          1L<<63;
-
-    /**
-     * Flag indicating that the slot can be anchored.
-     */
-    private static final long ANCHORABLE_FLAG =     1L<<62;
-
-    /**
-     * The slot address in memory.
-     */
-    private final long slotAddress;
-
-    /**
-     * BlockId of the block this slot is used for.
-     */
-    private final ExtendedBlockId blockId;
-
-    Slot(long slotAddress, ExtendedBlockId blockId) {
-      this.slotAddress = slotAddress;
-      this.blockId = blockId;
-    }
-
-    /**
-     * Get the short-circuit memory segment associated with this Slot.
-     *
-     * @return      The enclosing short-circuit memory segment.
-     */
-    public ShortCircuitShm getShm() {
-      return ShortCircuitShm.this;
-    }
-
-    /**
-     * Get the ExtendedBlockId associated with this slot.
-     *
-     * @return      The ExtendedBlockId of this slot.
-     */
-    public ExtendedBlockId getBlockId() {
-      return blockId;
-    }
-
-    /**
-     * Get the SlotId of this slot, containing both shmId and slotIdx.
-     *
-     * @return      The SlotId of this slot.
-     */
-    public SlotId getSlotId() {
-      return new SlotId(getShmId(), getSlotIdx());
-    }
-
-    /**
-     * Get the Slot index.
-     *
-     * @return      The index of this slot.
-     */
-    public int getSlotIdx() {
-      return Ints.checkedCast(
-          (slotAddress - baseAddress) / BYTES_PER_SLOT);
-    }
-
-    /**
-     * Clear the slot.
-     */
-    void clear() {
-      unsafe.putLongVolatile(null, this.slotAddress, 0);
-    }
-
-    private boolean isSet(long flag) {
-      long prev = unsafe.getLongVolatile(null, this.slotAddress);
-      return (prev & flag) != 0;
-    }
-
-    private void setFlag(long flag) {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & flag) != 0) {
-          return;
-        }
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev | flag));
-    }
-
-    private void clearFlag(long flag) {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & flag) == 0) {
-          return;
-        }
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev & (~flag)));
-    }
-    
-    public boolean isValid() {
-      return isSet(VALID_FLAG);
-    }
-
-    public void makeValid() {
-      setFlag(VALID_FLAG);
-    }
-
-    public void makeInvalid() {
-      clearFlag(VALID_FLAG);
-    }
-
-    public boolean isAnchorable() {
-      return isSet(ANCHORABLE_FLAG);
-    }
-
-    public void makeAnchorable() {
-      setFlag(ANCHORABLE_FLAG);
-    }
-
-    public void makeUnanchorable() {
-      clearFlag(ANCHORABLE_FLAG);
-    }
-
-    public boolean isAnchored() {
-      long prev = unsafe.getLongVolatile(null, this.slotAddress);
-      if ((prev & VALID_FLAG) == 0) {
-        // Slot is no longer valid.
-        return false;
-      }
-      return ((prev & 0x7fffffff) != 0);
-    }
-
-    /**
-     * Try to add an anchor for a given slot.
-     *
-     * When a slot is anchored, we know that the block it refers to is resident
-     * in memory.
-     *
-     * @return          True if the slot is anchored.
-     */
-    public boolean addAnchor() {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & VALID_FLAG) == 0) {
-          // Slot is no longer valid.
-          return false;
-        }
-        if ((prev & ANCHORABLE_FLAG) == 0) {
-          // Slot can't be anchored right now.
-          return false;
-        }
-        if ((prev & 0x7fffffff) == 0x7fffffff) {
-          // Too many other threads have anchored the slot (2 billion?)
-          return false;
-        }
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev + 1));
-      return true;
-    }
-
-    /**
-     * Remove an anchor for a given slot.
-     */
-    public void removeAnchor() {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        Preconditions.checkState((prev & 0x7fffffff) != 0,
-            "Tried to remove anchor for slot " + slotAddress +", which was " +
-            "not anchored.");
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev - 1));
-    }
-
-    @Override
-    public String toString() {
-      return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")";
-    }
-  }
-
-  /**
-   * ID for this SharedMemorySegment.
-   */
-  private final ShmId shmId;
-
-  /**
-   * The base address of the memory-mapped file.
-   */
-  private final long baseAddress;
-
-  /**
-   * The mmapped length of the shared memory segment
-   */
-  private final int mmappedLength;
-
-  /**
-   * The slots associated with this shared memory segment.
-   * slot[i] contains the slot at offset i * BYTES_PER_SLOT,
-   * or null if that slot is not allocated.
-   */
-  private final Slot slots[];
-
-  /**
-   * A bitset where each bit represents a slot which is in use.
-   */
-  private final BitSet allocatedSlots;
-
-  /**
-   * Create the ShortCircuitShm.
-   * 
-   * @param shmId       The ID to use.
-   * @param stream      The stream that we're going to use to create this 
-   *                    shared memory segment.
-   *                    
-   *                    Although this is a FileInputStream, we are going to
-   *                    assume that the underlying file descriptor is writable
-   *                    as well as readable. It would be more appropriate to 
use
-   *                    a RandomAccessFile here, but that class does not have
-   *                    any public accessor which returns a FileDescriptor,
-   *                    unlike FileInputStream.
-   */
-  public ShortCircuitShm(ShmId shmId, FileInputStream stream)
-        throws IOException {
-    if (!NativeIO.isAvailable()) {
-      throw new UnsupportedOperationException("NativeIO is not available.");
-    }
-    if (Shell.WINDOWS) {
-      throw new UnsupportedOperationException(
-          "DfsClientShm is not yet implemented for Windows.");
-    }
-    if (unsafe == null) {
-      throw new UnsupportedOperationException(
-          "can't use DfsClientShm because we failed to " +
-          "load misc.Unsafe.");
-    }
-    this.shmId = shmId;
-    this.mmappedLength = getUsableLength(stream);
-    this.baseAddress = POSIX.mmap(stream.getFD(), 
-        POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
-    this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
-    this.allocatedSlots = new BitSet(slots.length);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("creating " + this.getClass().getSimpleName() +
-          "(shmId=" + shmId +
-          ", mmappedLength=" + mmappedLength +
-          ", baseAddress=" + String.format("%x", baseAddress) +
-          ", slots.length=" + slots.length + ")");
-    }
-  }
-
-  public final ShmId getShmId() {
-    return shmId;
-  }
-  
-  /**
-   * Determine if this shared memory object is empty.
-   *
-   * @return    True if the shared memory object is empty.
-   */
-  synchronized final public boolean isEmpty() {
-    return allocatedSlots.nextSetBit(0) == -1;
-  }
-
-  /**
-   * Determine if this shared memory object is full.
-   *
-   * @return    True if the shared memory object is full.
-   */
-  synchronized final public boolean isFull() {
-    return allocatedSlots.nextClearBit(0) >= slots.length;
-  }
-
-  /**
-   * Calculate the base address of a slot.
-   *
-   * @param slotIdx   Index of the slot.
-   * @return          The base address of the slot.
-   */
-  private final long calculateSlotAddress(int slotIdx) {
-    long offset = slotIdx;
-    offset *= BYTES_PER_SLOT;
-    return this.baseAddress + offset;
-  }
-
-  /**
-   * Allocate a new slot and register it.
-   *
-   * This function chooses an empty slot, initializes it, and then returns
-   * the relevant Slot object.
-   *
-   * @return    The new slot.
-   */
-  synchronized public final Slot allocAndRegisterSlot(
-      ExtendedBlockId blockId) {
-    int idx = allocatedSlots.nextClearBit(0);
-    if (idx >= slots.length) {
-      throw new RuntimeException(this + ": no more slots are available.");
-    }
-    allocatedSlots.set(idx, true);
-    Slot slot = new Slot(calculateSlotAddress(idx), blockId);
-    slot.clear();
-    slot.makeValid();
-    slots[idx] = slot;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + 
allocatedSlots +
-                  StringUtils.getStackTrace(Thread.currentThread()));
-    }
-    return slot;
-  }
-
-  synchronized public final Slot getSlot(int slotIdx)
-      throws InvalidRequestException {
-    if (!allocatedSlots.get(slotIdx)) {
-      throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " does not exist.");
-    }
-    return slots[slotIdx];
-  }
-
-  /**
-   * Register a slot.
-   *
-   * This function looks at a slot which has already been initialized (by
-   * another process), and registers it with us.  Then, it returns the 
-   * relevant Slot object.
-   *
-   * @return    The slot.
-   *
-   * @throws InvalidRequestException
-   *            If the slot index we're trying to allocate has not been
-   *            initialized, or is already in use.
-   */
-  synchronized public final Slot registerSlot(int slotIdx,
-      ExtendedBlockId blockId) throws InvalidRequestException {
-    if (slotIdx < 0) {
-      throw new InvalidRequestException(this + ": invalid negative slot " +
-          "index " + slotIdx);
-    }
-    if (slotIdx >= slots.length) {
-      throw new InvalidRequestException(this + ": invalid slot " +
-          "index " + slotIdx);
-    }
-    if (allocatedSlots.get(slotIdx)) {
-      throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " is already in use.");
-    }
-    Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
-    if (!slot.isValid()) {
-      throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " is not marked as valid.");
-    }
-    slots[slotIdx] = slot;
-    allocatedSlots.set(slotIdx, true);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + 
allocatedSlots +
-                  StringUtils.getStackTrace(Thread.currentThread()));
-    }
-    return slot;
-  }
-
-  /**
-   * Unregisters a slot.
-   * 
-   * This doesn't alter the contents of the slot.  It just means
-   *
-   * @param slotIdx  Index of the slot to unregister.
-   */
-  synchronized public final void unregisterSlot(int slotIdx) {
-    Preconditions.checkState(allocatedSlots.get(slotIdx),
-        "tried to unregister slot " + slotIdx + ", which was not registered.");
-    allocatedSlots.set(slotIdx, false);
-    slots[slotIdx] = null;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": unregisterSlot " + slotIdx);
-    }
-  }
-  
-  /**
-   * Iterate over all allocated slots.
-   * 
-   * Note that this method isn't safe if 
-   *
-   * @return        The slot iterator.
-   */
-  public SlotIterator slotIterator() {
-    return new SlotIterator();
-  }
-
-  public void free() {
-    try {
-      POSIX.munmap(baseAddress, mmappedLength);
-    } catch (IOException e) {
-      LOG.warn(this + ": failed to munmap", e);
-    }
-    LOG.trace(this + ": freed");
-  }
-  
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "(" + shmId + ")";
-  }
-}

Reply via email to