http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 8e81fdc..beaa903 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -695,7 +695,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements RpcController controller, GetDatanodeReportRequestProto req) throws ServiceException { try { - List<? extends DatanodeInfoProto> result = PBHelper.convert(server + List<? extends DatanodeInfoProto> result = PBHelperClient.convert(server .getDatanodeReport(PBHelper.convert(req.getType()))); return GetDatanodeReportResponseProto.newBuilder() .addAllDi(result).build(); @@ -890,7 +890,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements server.setQuota(req.getPath(), req.getNamespaceQuota(), req.getStoragespaceQuota(), req.hasStorageType() ? - PBHelper.convertStorageType(req.getStorageType()): null); + PBHelperClient.convertStorageType(req.getStorageType()): null); return VOID_SETQUOTA_RESPONSE; } catch (IOException e) { throw new ServiceException(e); @@ -990,7 +990,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements GetDelegationTokenResponseProto.Builder rspBuilder = GetDelegationTokenResponseProto.newBuilder(); if (token != null) { - rspBuilder.setToken(PBHelper.convert(token)); + rspBuilder.setToken(PBHelperClient.convert(token)); } return rspBuilder.build(); } catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index d6afa6e..d30982a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -390,7 +390,7 @@ public class ClientNamenodeProtocolTranslatorPB implements String holder) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder() - .setB(PBHelper.convert(b)).setSrc(src).setHolder(holder) + .setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder) .setFileId(fileId).build(); try { rpcProxy.abandonBlock(null, req); @@ -409,9 +409,9 @@ public class ClientNamenodeProtocolTranslatorPB implements AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) - req.setPrevious(PBHelper.convert(previous)); - if (excludeNodes != null) - req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); + req.setPrevious(PBHelperClient.convert(previous)); + if (excludeNodes != null) + req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes)); if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } @@ -433,10 +433,10 @@ public class ClientNamenodeProtocolTranslatorPB implements .newBuilder() .setSrc(src) .setFileId(fileId) - .setBlk(PBHelper.convert(blk)) - .addAllExistings(PBHelper.convert(existings)) + .setBlk(PBHelperClient.convert(blk)) + .addAllExistings(PBHelperClient.convert(existings)) .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs)) - .addAllExcludes(PBHelper.convert(excludes)) + .addAllExcludes(PBHelperClient.convert(excludes)) .setNumAdditionalNodes(numAdditionalNodes) .setClientName(clientName) .build(); @@ -458,7 +458,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setClientName(clientName) .setFileId(fileId); if (last != null) - req.setLast(PBHelper.convert(last)); + req.setLast(PBHelperClient.convert(last)); try { return rpcProxy.complete(null, req.build()).getResult(); } catch (ServiceException e) { @@ -819,7 +819,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setNamespaceQuota(namespaceQuota) .setStoragespaceQuota(storagespaceQuota); if (type != null) { - builder.setStorageType(PBHelper.convertStorageType(type)); + builder.setStorageType(PBHelperClient.convertStorageType(type)); } final SetQuotaRequestProto req = builder.build(); try { @@ -897,7 +897,7 @@ public class ClientNamenodeProtocolTranslatorPB implements String clientName) throws IOException { UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto .newBuilder() - .setBlock(PBHelper.convert(block)) + .setBlock(PBHelperClient.convert(block)) .setClientName(clientName) .build(); try { @@ -913,8 +913,8 @@ public class ClientNamenodeProtocolTranslatorPB implements ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException { UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() .setClientName(clientName) - .setOldBlock(PBHelper.convert(oldBlock)) - .setNewBlock(PBHelper.convert(newBlock)) + .setOldBlock(PBHelperClient.convert(oldBlock)) + .setNewBlock(PBHelperClient.convert(newBlock)) .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes))) .addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs)) .build(); @@ -945,7 +945,7 @@ public class ClientNamenodeProtocolTranslatorPB implements public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException { RenewDelegationTokenRequestProto req = RenewDelegationTokenRequestProto.newBuilder(). - setToken(PBHelper.convert(token)). + setToken(PBHelperClient.convert(token)). build(); try { return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime(); @@ -959,7 +959,7 @@ public class ClientNamenodeProtocolTranslatorPB implements throws IOException { CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto .newBuilder() - .setToken(PBHelper.convert(token)) + .setToken(PBHelperClient.convert(token)) .build(); try { rpcProxy.cancelDelegationToken(null, req); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 94028a2..0b46927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -298,11 +298,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements ) throws IOException { CommitBlockSynchronizationRequestProto.Builder builder = CommitBlockSynchronizationRequestProto.newBuilder() - .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp) + .setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp) .setNewLength(newlength).setCloseFile(closeFile) .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { - builder.addNewTaragets(PBHelper.convert(newtargets[i])); + builder.addNewTaragets(PBHelperClient.convert(newtargets[i])); builder.addNewTargetStorages(newtargetstorages[i]); } CommitBlockSynchronizationRequestProto req = builder.build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index fee62a4..17ba196 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -105,7 +105,7 @@ public class InterDatanodeProtocolTranslatorPB implements long recoveryId, long newBlockId, long newLength) throws IOException { UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto.newBuilder() - .setBlock(PBHelper.convert(oldBlock)) + .setBlock(PBHelperClient.convert(oldBlock)) .setNewLength(newLength).setNewBlockId(newBlockId) .setRecoveryId(recoveryId).build(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 82c5c4c..bcb96ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -101,7 +101,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() - .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size) + .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) .build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 4ca5b26..887accf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -346,7 +346,7 @@ public class PBHelper { if (types == null || types.length == 0) { return null; } - List<StorageTypeProto> list = convertStorageTypes(types); + List<StorageTypeProto> list = PBHelperClient.convertStorageTypes(types); return StorageTypesProto.newBuilder().addAllStorageTypes(list).build(); } @@ -381,20 +381,6 @@ public class PBHelper { .getInfoSecurePort() : 0, dn.getIpcPort()); } - public static DatanodeIDProto convert(DatanodeID dn) { - // For wire compatibility with older versions we transmit the StorageID - // which is the same as the DatanodeUuid. Since StorageID is a required - // field we pass the empty string if the DatanodeUuid is not yet known. - return DatanodeIDProto.newBuilder() - .setIpAddr(dn.getIpAddr()) - .setHostName(dn.getHostName()) - .setXferPort(dn.getXferPort()) - .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") - .setInfoPort(dn.getInfoPort()) - .setInfoSecurePort(dn.getInfoSecurePort()) - .setIpcPort(dn.getIpcPort()).build(); - } - // Arrays of DatanodeId public static DatanodeIDProto[] convert(DatanodeID[] did) { if (did == null) @@ -402,7 +388,7 @@ public class PBHelper { final int len = did.length; DatanodeIDProto[] result = new DatanodeIDProto[len]; for (int i = 0; i < len; ++i) { - result[i] = convert(did[i]); + result[i] = PBHelperClient.convert(did[i]); } return result; } @@ -433,7 +419,7 @@ public class PBHelper { .setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) - .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())) + .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())) .build(); } @@ -595,16 +581,6 @@ public class PBHelper { eb.getGenerationStamp()); } - public static ExtendedBlockProto convert(final ExtendedBlock b) { - if (b == null) return null; - return ExtendedBlockProto.newBuilder(). - setPoolId(b.getBlockPoolId()). - setBlockId(b.getBlockId()). - setNumBytes(b.getNumBytes()). - setGenerationStamp(b.getGenerationStamp()). - build(); - } - public static RecoveringBlockProto convert(RecoveringBlock b) { if (b == null) { return null; @@ -625,17 +601,6 @@ public class PBHelper { new RecoveringBlock(block, locs, b.getNewGenStamp()); } - public static DatanodeInfoProto.AdminState convert( - final DatanodeInfo.AdminStates inAs) { - switch (inAs) { - case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; - case DECOMMISSION_INPROGRESS: - return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; - case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; - default: return DatanodeInfoProto.AdminState.NORMAL; - } - } - static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) return null; return new DatanodeInfo( @@ -647,12 +612,6 @@ public class PBHelper { di.getXceiverCount(), PBHelper.convert(di.getAdminState())); } - static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { - if (di == null) return null; - return convert(di); - } - - static public DatanodeInfo[] convert(DatanodeInfoProto di[]) { if (di == null) return null; DatanodeInfo[] result = new DatanodeInfo[di.length]; @@ -662,27 +621,6 @@ public class PBHelper { return result; } - public static List<? extends HdfsProtos.DatanodeInfoProto> convert( - DatanodeInfo[] dnInfos) { - return convert(dnInfos, 0); - } - - /** - * Copy from {@code dnInfos} to a target of list of same size starting at - * {@code startIdx}. - */ - public static List<? extends HdfsProtos.DatanodeInfoProto> convert( - DatanodeInfo[] dnInfos, int startIdx) { - if (dnInfos == null) - return null; - ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists - .newArrayListWithCapacity(dnInfos.length); - for (int i = startIdx; i < dnInfos.length; i++) { - protos.add(convert(dnInfos[i])); - } - return protos; - } - public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) { DatanodeInfo[] info = new DatanodeInfo[list.size()]; for (int i = 0; i < info.length; i++) { @@ -690,32 +628,11 @@ public class PBHelper { } return info; } - - public static DatanodeInfoProto convert(DatanodeInfo info) { - DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); - if (info.getNetworkLocation() != null) { - builder.setLocation(info.getNetworkLocation()); - } - builder - .setId(PBHelper.convert((DatanodeID)info)) - .setCapacity(info.getCapacity()) - .setDfsUsed(info.getDfsUsed()) - .setRemaining(info.getRemaining()) - .setBlockPoolUsed(info.getBlockPoolUsed()) - .setCacheCapacity(info.getCacheCapacity()) - .setCacheUsed(info.getCacheUsed()) - .setLastUpdate(info.getLastUpdate()) - .setLastUpdateMonotonic(info.getLastUpdateMonotonic()) - .setXceiverCount(info.getXceiverCount()) - .setAdminState(PBHelper.convert(info.getAdminState())) - .build(); - return builder.build(); - } public static DatanodeStorageReportProto convertDatanodeStorageReport( DatanodeStorageReport report) { return DatanodeStorageReportProto.newBuilder() - .setDatanodeInfo(convert(report.getDatanodeInfo())) + .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo())) .addAllStorageReports(convertStorageReports(report.getStorageReports())) .build(); } @@ -767,7 +684,7 @@ public class PBHelper { Lists.newLinkedList(Arrays.asList(b.getCachedLocations())); for (int i = 0; i < locs.length; i++) { DatanodeInfo loc = locs[i]; - builder.addLocs(i, PBHelper.convert(loc)); + builder.addLocs(i, PBHelperClient.convert(loc)); boolean locIsCached = cachedLocs.contains(loc); builder.addIsCached(locIsCached); if (locIsCached) { @@ -781,7 +698,7 @@ public class PBHelper { StorageType[] storageTypes = b.getStorageTypes(); if (storageTypes != null) { for (int i = 0; i < storageTypes.length; ++i) { - builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); + builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i])); } } final String[] storageIDs = b.getStorageIDs(); @@ -789,8 +706,8 @@ public class PBHelper { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } - return builder.setB(PBHelper.convert(b.getBlock())) - .setBlockToken(PBHelper.convert(b.getBlockToken())) + return builder.setB(PBHelperClient.convert(b.getBlock())) + .setBlockToken(PBHelperClient.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } @@ -831,14 +748,6 @@ public class PBHelper { return lb; } - public static TokenProto convert(Token<?> tok) { - return TokenProto.newBuilder(). - setIdentifier(ByteString.copyFrom(tok.getIdentifier())). - setPassword(ByteString.copyFrom(tok.getPassword())). - setKind(tok.getKind().toString()). - setService(tok.getService().toString()).build(); - } - public static Token<BlockTokenIdentifier> convert( TokenProto blockToken) { return new Token<BlockTokenIdentifier>(blockToken.getIdentifier() @@ -890,7 +799,7 @@ public class PBHelper { DatanodeRegistration registration) { DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto .newBuilder(); - return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration)) + return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) registration)) .setStorageInfo(PBHelper.convert(registration.getStorageInfo())) .setKeys(PBHelper.convert(registration.getExportedKeys())) .setSoftwareVersion(registration.getSoftwareVersion()).build(); @@ -982,7 +891,7 @@ public class PBHelper { if (types != null) { for (StorageType[] ts : types) { StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); - builder.addAllStorageTypes(convertStorageTypes(ts)); + builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts)); list.add(builder.build()); } } @@ -1013,7 +922,7 @@ public class PBHelper { DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; for (int i = 0; i < targets.length; i++) { ret[i] = DatanodeInfosProto.newBuilder() - .addAllDatanodes(PBHelper.convert(targets[i])).build(); + .addAllDatanodes(PBHelperClient.convert(targets[i])).build(); } return Arrays.asList(ret); } @@ -1337,7 +1246,7 @@ public class PBHelper { fs.getFileBufferSize(), fs.getEncryptDataTransfer(), fs.getTrashInterval(), - PBHelper.convert(fs.getChecksumType())); + PBHelperClient.convert(fs.getChecksumType())); } public static FsServerDefaultsProto convert(FsServerDefaults fs) { @@ -1350,7 +1259,7 @@ public class PBHelper { .setFileBufferSize(fs.getFileBufferSize()) .setEncryptDataTransfer(fs.getEncryptDataTransfer()) .setTrashInterval(fs.getTrashInterval()) - .setChecksumType(PBHelper.convert(fs.getChecksumType())) + .setChecksumType(PBHelperClient.convert(fs.getChecksumType())) .build(); } @@ -1738,7 +1647,7 @@ public class PBHelper { if (cs.hasTypeQuotaInfos()) { for (HdfsProtos.StorageTypeQuotaInfoProto info : cs.getTypeQuotaInfos().getTypeQuotaInfoList()) { - StorageType type = PBHelper.convertStorageType(info.getType()); + StorageType type = PBHelperClient.convertStorageType(info.getType()); builder.typeConsumed(type, info.getConsumed()); builder.typeQuota(type, info.getQuota()); } @@ -1762,7 +1671,7 @@ public class PBHelper { for (StorageType t: StorageType.getTypesSupportingQuota()) { HdfsProtos.StorageTypeQuotaInfoProto info = HdfsProtos.StorageTypeQuotaInfoProto.newBuilder(). - setType(convertStorageType(t)). + setType(PBHelperClient.convertStorageType(t)). setConsumed(cs.getTypeConsumed(t)). setQuota(cs.getTypeQuota(t)). build(); @@ -1807,7 +1716,7 @@ public class PBHelper { public static DatanodeStorageProto convert(DatanodeStorage s) { return DatanodeStorageProto.newBuilder() .setState(PBHelper.convertState(s.getState())) - .setStorageType(PBHelper.convertStorageType(s.getStorageType())) + .setStorageType(PBHelperClient.convertStorageType(s.getStorageType())) .setStorageUuid(s.getStorageID()).build(); } @@ -1821,44 +1730,10 @@ public class PBHelper { } } - public static List<StorageTypeProto> convertStorageTypes( - StorageType[] types) { - return convertStorageTypes(types, 0); - } - - public static List<StorageTypeProto> convertStorageTypes( - StorageType[] types, int startIdx) { - if (types == null) { - return null; - } - final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>( - types.length); - for (int i = startIdx; i < types.length; ++i) { - protos.add(convertStorageType(types[i])); - } - return protos; - } - - public static StorageTypeProto convertStorageType(StorageType type) { - switch(type) { - case DISK: - return StorageTypeProto.DISK; - case SSD: - return StorageTypeProto.SSD; - case ARCHIVE: - return StorageTypeProto.ARCHIVE; - case RAM_DISK: - return StorageTypeProto.RAM_DISK; - default: - throw new IllegalStateException( - "BUG: StorageType not found, type=" + type); - } - } - public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelper.convertStorageType(s.getStorageType())); + PBHelperClient.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1871,22 +1746,6 @@ public class PBHelper { } } - public static StorageType convertStorageType(StorageTypeProto type) { - switch(type) { - case DISK: - return StorageType.DISK; - case SSD: - return StorageType.SSD; - case ARCHIVE: - return StorageType.ARCHIVE; - case RAM_DISK: - return StorageType.RAM_DISK; - default: - throw new IllegalStateException( - "BUG: StorageTypeProto not found, type=" + type); - } - } - public static StorageType[] convertStorageTypes( List<StorageTypeProto> storageTypesList, int expectedSize) { final StorageType[] storageTypes = new StorageType[expectedSize]; @@ -1895,7 +1754,7 @@ public class PBHelper { Arrays.fill(storageTypes, StorageType.DEFAULT); } else { for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = convertStorageType(storageTypesList.get(i)); + storageTypes[i] = PBHelperClient.convertStorageType(storageTypesList.get(i)); } } return storageTypes; @@ -2079,10 +1938,6 @@ public class PBHelper { return reportProto; } - public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { - return DataChecksum.Type.valueOf(type.getNumber()); - } - public static CacheDirectiveInfoProto convert (CacheDirectiveInfo info) { CacheDirectiveInfoProto.Builder builder = @@ -2255,9 +2110,6 @@ public class PBHelper { return new CachePoolEntry(info, stats); } - public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { - return HdfsProtos.ChecksumTypeProto.valueOf(type.id); - } public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) { DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder(); @@ -2272,17 +2124,6 @@ public class PBHelper { proto.getConfigVersion(), proto.getUptime()); } - public static InputStream vintPrefixed(final InputStream input) - throws IOException { - final int firstByte = input.read(); - if (firstByte == -1) { - throw new EOFException("Premature EOF: no length prefix available"); - } - - int size = CodedInputStream.readRawVarint32(firstByte, input); - assert size >= 0; - return new ExactSizeInputStream(input, size); - } private static AclEntryScopeProto convert(AclEntryScope v) { return AclEntryScopeProto.valueOf(v.ordinal()); @@ -2506,30 +2347,11 @@ public class PBHelper { proto.getKeyName()); } - public static ShortCircuitShmSlotProto convert(SlotId slotId) { - return ShortCircuitShmSlotProto.newBuilder(). - setShmId(convert(slotId.getShmId())). - setSlotIdx(slotId.getSlotIdx()). - build(); - } - - public static ShortCircuitShmIdProto convert(ShmId shmId) { - return ShortCircuitShmIdProto.newBuilder(). - setHi(shmId.getHi()). - setLo(shmId.getLo()). - build(); - - } - public static SlotId convert(ShortCircuitShmSlotProto slotId) { - return new SlotId(PBHelper.convert(slotId.getShmId()), + return new SlotId(PBHelperClient.convert(slotId.getShmId()), slotId.getSlotIdx()); } - public static ShmId convert(ShortCircuitShmIdProto shmId) { - return new ShmId(shmId.getHi(), shmId.getLo()); - } - private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType type) { switch (type) { @@ -3036,18 +2858,6 @@ public class PBHelper { ezKeyVersionName); } - public static List<Boolean> convert(boolean[] targetPinnings, int idx) { - List<Boolean> pinnings = new ArrayList<Boolean>(); - if (targetPinnings == null) { - pinnings.add(Boolean.FALSE); - } else { - for (; idx < targetPinnings.length; ++idx) { - pinnings.add(Boolean.valueOf(targetPinnings[idx])); - } - } - return pinnings; - } - public static boolean[] convertBooleanList( List<Boolean> targetPinningsList) { final boolean[] targetPinnings = new boolean[targetPinningsList.size()]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java deleted file mode 100644 index 2fa86fa..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.security.token.block; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Access token verification failed. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class InvalidBlockTokenException extends IOException { - private static final long serialVersionUID = 168L; - - public InvalidBlockTokenException() { - super(); - } - - public InvalidBlockTokenException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index a5e22ec..be1a9ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.balancer; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java deleted file mode 100644 index 215df13..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -/** - * The caching strategy we should use for an HDFS read or write operation. - */ -public class CachingStrategy { - private final Boolean dropBehind; // null = use server defaults - private final Long readahead; // null = use server defaults - - public static CachingStrategy newDefaultStrategy() { - return new CachingStrategy(null, null); - } - - public static CachingStrategy newDropBehind() { - return new CachingStrategy(true, null); - } - - public static class Builder { - private Boolean dropBehind; - private Long readahead; - - public Builder(CachingStrategy prev) { - this.dropBehind = prev.dropBehind; - this.readahead = prev.readahead; - } - - public Builder setDropBehind(Boolean dropBehind) { - this.dropBehind = dropBehind; - return this; - } - - public Builder setReadahead(Long readahead) { - this.readahead = readahead; - return this; - } - - public CachingStrategy build() { - return new CachingStrategy(dropBehind, readahead); - } - } - - public CachingStrategy(Boolean dropBehind, Long readahead) { - this.dropBehind = dropBehind; - this.readahead = readahead; - } - - public Boolean getDropBehind() { - return dropBehind; - } - - public Long getReadahead() { - return readahead; - } - - public String toString() { - return "CachingStrategy(dropBehind=" + dropBehind + - ", readahead=" + readahead + ")"; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ecf139c..5bc50b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -135,7 +135,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; @@ -2142,7 +2142,7 @@ public class DataNode extends ReconfigurableBase // read ack if (isClient) { DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - PBHelper.vintPrefixed(in)); + PBHelperClient.vintPrefixed(in)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index e9cf436..dfaa525 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; @@ -427,7 +427,7 @@ class DataXceiver extends Receiver implements Runnable { throws IOException { DataNodeFaultInjector.get().sendShortCircuitShmResponse(); ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). - setId(PBHelper.convert(shmInfo.shmId)).build(). + setId(PBHelperClient.convert(shmInfo.shmId)).build(). writeDelimitedTo(socketOut); // Send the file descriptor for the shared memory segment. byte buf[] = new byte[] { (byte)0 }; @@ -559,7 +559,7 @@ class DataXceiver extends Receiver implements Runnable { // to respond with a Status enum. try { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( - PBHelper.vintPrefixed(in)); + PBHelperClient.vintPrefixed(in)); if (!stat.hasStatus()) { LOG.warn("Client " + peer.getRemoteAddressString() + " did not send a valid status code after reading. " + @@ -745,7 +745,7 @@ class DataXceiver extends Receiver implements Runnable { // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = - BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn)); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { @@ -962,7 +962,7 @@ class DataXceiver extends Receiver implements Runnable { .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelper.convert(checksum.getChecksumType()))) + .setCrcType(PBHelperClient.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); @@ -1147,8 +1147,8 @@ class DataXceiver extends Receiver implements Runnable { // receive the response from the proxy BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( - PBHelper.vintPrefixed(proxyReply)); - + PBHelperClient.vintPrefixed(proxyReply)); + String logInfo = "copy block " + block + " from " + proxySock.getRemoteSocketAddress(); DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 25fd99d..3a9c64e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -155,7 +156,7 @@ public final class FSImageFormatPBINode { QuotaByStorageTypeFeatureProto proto) { ImmutableList.Builder<QuotaByStorageTypeEntry> b = ImmutableList.builder(); for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) { - StorageType type = PBHelper.convertStorageType(quotaEntry.getStorageType()); + StorageType type = PBHelperClient.convertStorageType(quotaEntry.getStorageType()); long quota = quotaEntry.getQuota(); b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type) .setQuota(quota).build()); @@ -462,7 +463,7 @@ public final class FSImageFormatPBINode { if (q.getTypeSpace(t) >= 0) { QuotaByStorageTypeEntryProto.Builder eb = QuotaByStorageTypeEntryProto.newBuilder(). - setStorageType(PBHelper.convertStorageType(t)). + setStorageType(PBHelperClient.convertStorageType(t)). setQuota(q.getTypeSpace(t)); b.addQuotas(eb); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java deleted file mode 100644 index 81cc68d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.DomainSocketWatcher; - -import com.google.common.base.Preconditions; - -/** - * DfsClientShm is a subclass of ShortCircuitShm which is used by the - * DfsClient. - * When the UNIX domain socket associated with this shared memory segment - * closes unexpectedly, we mark the slots inside this segment as disconnected. - * ShortCircuitReplica objects that contain disconnected slots are stale, - * and will not be used to service new reads or mmap operations. - * However, in-progress read or mmap operations will continue to proceed. - * Once the last slot is deallocated, the segment can be safely munmapped. - * - * Slots may also become stale because the associated replica has been deleted - * on the DataNode. In this case, the DataNode will clear the 'valid' bit. - * The client will then see these slots as stale (see - * #{ShortCircuitReplica#isStale}). - */ -public class DfsClientShm extends ShortCircuitShm - implements DomainSocketWatcher.Handler { - /** - * The EndpointShmManager associated with this shared memory segment. - */ - private final EndpointShmManager manager; - - /** - * The UNIX domain socket associated with this DfsClientShm. - * We rely on the DomainSocketWatcher to close the socket associated with - * this DomainPeer when necessary. - */ - private final DomainPeer peer; - - /** - * True if this shared memory segment has lost its connection to the - * DataNode. - * - * {@link DfsClientShm#handle} sets this to true. - */ - private boolean disconnected = false; - - DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, - DomainPeer peer) throws IOException { - super(shmId, stream); - this.manager = manager; - this.peer = peer; - } - - public EndpointShmManager getEndpointShmManager() { - return manager; - } - - public DomainPeer getPeer() { - return peer; - } - - /** - * Determine if the shared memory segment is disconnected from the DataNode. - * - * This must be called with the DfsClientShmManager lock held. - * - * @return True if the shared memory segment is stale. - */ - public synchronized boolean isDisconnected() { - return disconnected; - } - - /** - * Handle the closure of the UNIX domain socket associated with this shared - * memory segment by marking this segment as stale. - * - * If there are no slots associated with this shared memory segment, it will - * be freed immediately in this function. - */ - @Override - public boolean handle(DomainSocket sock) { - manager.unregisterShm(getShmId()); - synchronized (this) { - Preconditions.checkState(!disconnected); - disconnected = true; - boolean hadSlots = false; - for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) { - Slot slot = iter.next(); - slot.makeInvalid(); - hadSlots = true; - } - if (!hadSlots) { - free(); - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java deleted file mode 100644 index 062539a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java +++ /dev/null @@ -1,514 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.DomainSocketWatcher; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * Manages short-circuit memory segments for an HDFS client. - * - * Clients are responsible for requesting and releasing shared memory segments used - * for communicating with the DataNode. The client will try to allocate new slots - * in the set of existing segments, falling back to getting a new segment from the - * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}. - * - * The counterpart to this class on the DataNode is {@link ShortCircuitRegistry}. - * See {@link ShortCircuitRegistry} for more information on the communication protocol. - */ -@InterfaceAudience.Private -public class DfsClientShmManager implements Closeable { - private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class); - - /** - * Manages short-circuit memory segments that pertain to a given DataNode. - */ - class EndpointShmManager { - /** - * The datanode we're managing. - */ - private final DatanodeInfo datanode; - - /** - * Shared memory segments which have no empty slots. - * - * Protected by the manager lock. - */ - private final TreeMap<ShmId, DfsClientShm> full = - new TreeMap<ShmId, DfsClientShm>(); - - /** - * Shared memory segments which have at least one empty slot. - * - * Protected by the manager lock. - */ - private final TreeMap<ShmId, DfsClientShm> notFull = - new TreeMap<ShmId, DfsClientShm>(); - - /** - * True if this datanode doesn't support short-circuit shared memory - * segments. - * - * Protected by the manager lock. - */ - private boolean disabled = false; - - /** - * True if we're in the process of loading a shared memory segment from - * this DataNode. - * - * Protected by the manager lock. - */ - private boolean loading = false; - - EndpointShmManager (DatanodeInfo datanode) { - this.datanode = datanode; - } - - /** - * Pull a slot out of a preexisting shared memory segment. - * - * Must be called with the manager lock held. - * - * @param blockId The blockId to put inside the Slot object. - * - * @return null if none of our shared memory segments contain a - * free slot; the slot object otherwise. - */ - private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { - if (notFull.isEmpty()) { - return null; - } - Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); - DfsClientShm shm = entry.getValue(); - ShmId shmId = shm.getShmId(); - Slot slot = shm.allocAndRegisterSlot(blockId); - if (shm.isFull()) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + - " out of " + shm); - } - DfsClientShm removedShm = notFull.remove(shmId); - Preconditions.checkState(removedShm == shm); - full.put(shmId, shm); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + - " out of " + shm); - } - } - return slot; - } - - /** - * Ask the DataNode for a new shared memory segment. This function must be - * called with the manager lock held. We will release the lock while - * communicating with the DataNode. - * - * @param clientName The current client name. - * @param peer The peer to use to talk to the DataNode. - * - * @return Null if the DataNode does not support shared memory - * segments, or experienced an error creating the - * shm. The shared memory segment itself on success. - * @throws IOException If there was an error communicating over the socket. - * We will not throw an IOException unless the socket - * itself (or the network) is the problem. - */ - private DfsClientShm requestNewShm(String clientName, DomainPeer peer) - throws IOException { - final DataOutputStream out = - new DataOutputStream( - new BufferedOutputStream(peer.getOutputStream())); - new Sender(out).requestShortCircuitShm(clientName); - ShortCircuitShmResponseProto resp = - ShortCircuitShmResponseProto.parseFrom( - PBHelper.vintPrefixed(peer.getInputStream())); - String error = resp.hasError() ? resp.getError() : "(unknown)"; - switch (resp.getStatus()) { - case SUCCESS: - DomainSocket sock = peer.getDomainSocket(); - byte buf[] = new byte[1]; - FileInputStream fis[] = new FileInputStream[1]; - if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { - throw new EOFException("got EOF while trying to transfer the " + - "file descriptor for the shared memory segment."); - } - if (fis[0] == null) { - throw new IOException("the datanode " + datanode + " failed to " + - "pass a file descriptor for the shared memory segment."); - } - try { - DfsClientShm shm = - new DfsClientShm(PBHelper.convert(resp.getId()), - fis[0], this, peer); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": createNewShm: created " + shm); - } - return shm; - } finally { - IOUtils.cleanup(LOG, fis[0]); - } - case ERROR_UNSUPPORTED: - // The DataNode just does not support short-circuit shared memory - // access, and we should stop asking. - LOG.info(this + ": datanode does not support short-circuit " + - "shared memory access: " + error); - disabled = true; - return null; - default: - // The datanode experienced some kind of unexpected error when trying to - // create the short-circuit shared memory segment. - LOG.warn(this + ": error requesting short-circuit shared memory " + - "access: " + error); - return null; - } - } - - /** - * Allocate a new shared memory slot connected to this datanode. - * - * Must be called with the EndpointShmManager lock held. - * - * @param peer The peer to use to talk to the DataNode. - * @param usedPeer (out param) Will be set to true if we used the peer. - * When a peer is used - * - * @param clientName The client name. - * @param blockId The block ID to use. - * @return null if the DataNode does not support shared memory - * segments, or experienced an error creating the - * shm. The shared memory segment itself on success. - * @throws IOException If there was an error communicating over the socket. - */ - Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, - String clientName, ExtendedBlockId blockId) throws IOException { - while (true) { - if (closed) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": the DfsClientShmManager has been closed."); - } - return null; - } - if (disabled) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": shared memory segment access is disabled."); - } - return null; - } - // Try to use an existing slot. - Slot slot = allocSlotFromExistingShm(blockId); - if (slot != null) { - return slot; - } - // There are no free slots. If someone is loading more slots, wait - // for that to finish. - if (loading) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": waiting for loading to finish..."); - } - finishedLoading.awaitUninterruptibly(); - } else { - // Otherwise, load the slot ourselves. - loading = true; - lock.unlock(); - DfsClientShm shm; - try { - shm = requestNewShm(clientName, peer); - if (shm == null) continue; - // See #{DfsClientShmManager#domainSocketWatcher} for details - // about why we do this before retaking the manager lock. - domainSocketWatcher.add(peer.getDomainSocket(), shm); - // The DomainPeer is now our responsibility, and should not be - // closed by the caller. - usedPeer.setValue(true); - } finally { - lock.lock(); - loading = false; - finishedLoading.signalAll(); - } - if (shm.isDisconnected()) { - // If the peer closed immediately after the shared memory segment - // was created, the DomainSocketWatcher callback might already have - // fired and marked the shm as disconnected. In this case, we - // obviously don't want to add the SharedMemorySegment to our list - // of valid not-full segments. - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": the UNIX domain socket associated with " + - "this short-circuit memory closed before we could make " + - "use of the shm."); - } - } else { - notFull.put(shm.getShmId(), shm); - } - } - } - } - - /** - * Stop tracking a slot. - * - * Must be called with the EndpointShmManager lock held. - * - * @param slot The slot to release. - */ - void freeSlot(Slot slot) { - DfsClientShm shm = (DfsClientShm)slot.getShm(); - shm.unregisterSlot(slot.getSlotIdx()); - if (shm.isDisconnected()) { - // Stale shared memory segments should not be tracked here. - Preconditions.checkState(!full.containsKey(shm.getShmId())); - Preconditions.checkState(!notFull.containsKey(shm.getShmId())); - if (shm.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": freeing empty stale " + shm); - } - shm.free(); - } - } else { - ShmId shmId = shm.getShmId(); - full.remove(shmId); // The shm can't be full if we just freed a slot. - if (shm.isEmpty()) { - notFull.remove(shmId); - - // If the shared memory segment is now empty, we call shutdown(2) on - // the UNIX domain socket associated with it. The DomainSocketWatcher, - // which is watching this socket, will call DfsClientShm#handle, - // cleaning up this shared memory segment. - // - // See #{DfsClientShmManager#domainSocketWatcher} for details about why - // we don't want to call DomainSocketWatcher#remove directly here. - // - // Note that we could experience 'fragmentation' here, where the - // DFSClient allocates a bunch of slots in different shared memory - // segments, and then frees most of them, but never fully empties out - // any segment. We make some attempt to avoid this fragmentation by - // always allocating new slots out of the shared memory segment with the - // lowest ID, but it could still occur. In most workloads, - // fragmentation should not be a major concern, since it doesn't impact - // peak file descriptor usage or the speed of allocation. - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": shutting down UNIX domain socket for " + - "empty " + shm); - } - shutdown(shm); - } else { - notFull.put(shmId, shm); - } - } - } - - /** - * Unregister a shared memory segment. - * - * Once a segment is unregistered, we will not allocate any more slots - * inside that segment. - * - * The DomainSocketWatcher calls this while holding the DomainSocketWatcher - * lock. - * - * @param shmId The ID of the shared memory segment to unregister. - */ - void unregisterShm(ShmId shmId) { - lock.lock(); - try { - full.remove(shmId); - notFull.remove(shmId); - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - return String.format("EndpointShmManager(%s, parent=%s)", - datanode, DfsClientShmManager.this); - } - - PerDatanodeVisitorInfo getVisitorInfo() { - return new PerDatanodeVisitorInfo(full, notFull, disabled); - } - - final void shutdown(DfsClientShm shm) { - try { - shm.getPeer().getDomainSocket().shutdown(); - } catch (IOException e) { - LOG.warn(this + ": error shutting down shm: got IOException calling " + - "shutdown(SHUT_RDWR)", e); - } - } - } - - private boolean closed = false; - - private final ReentrantLock lock = new ReentrantLock(); - - /** - * A condition variable which is signalled when we finish loading a segment - * from the Datanode. - */ - private final Condition finishedLoading = lock.newCondition(); - - /** - * Information about each Datanode. - */ - private final HashMap<DatanodeInfo, EndpointShmManager> datanodes = - new HashMap<DatanodeInfo, EndpointShmManager>(1); - - /** - * The DomainSocketWatcher which keeps track of the UNIX domain socket - * associated with each shared memory segment. - * - * Note: because the DomainSocketWatcher makes callbacks into this - * DfsClientShmManager object, you must MUST NOT attempt to take the - * DomainSocketWatcher lock while holding the DfsClientShmManager lock, - * or else deadlock might result. This means that most DomainSocketWatcher - * methods are off-limits unless you release the manager lock first. - */ - private final DomainSocketWatcher domainSocketWatcher; - - DfsClientShmManager(int interruptCheckPeriodMs) throws IOException { - this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs, - "client"); - } - - public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, - MutableBoolean usedPeer, ExtendedBlockId blockId, - String clientName) throws IOException { - lock.lock(); - try { - if (closed) { - LOG.trace(this + ": the DfsClientShmManager isclosed."); - return null; - } - EndpointShmManager shmManager = datanodes.get(datanode); - if (shmManager == null) { - shmManager = new EndpointShmManager(datanode); - datanodes.put(datanode, shmManager); - } - return shmManager.allocSlot(peer, usedPeer, clientName, blockId); - } finally { - lock.unlock(); - } - } - - public void freeSlot(Slot slot) { - lock.lock(); - try { - DfsClientShm shm = (DfsClientShm)slot.getShm(); - shm.getEndpointShmManager().freeSlot(slot); - } finally { - lock.unlock(); - } - } - - @VisibleForTesting - public static class PerDatanodeVisitorInfo { - public final TreeMap<ShmId, DfsClientShm> full; - public final TreeMap<ShmId, DfsClientShm> notFull; - public final boolean disabled; - - PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full, - TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) { - this.full = full; - this.notFull = notFull; - this.disabled = disabled; - } - } - - @VisibleForTesting - public interface Visitor { - void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) - throws IOException; - } - - @VisibleForTesting - public void visit(Visitor visitor) throws IOException { - lock.lock(); - try { - HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = - new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>(); - for (Entry<DatanodeInfo, EndpointShmManager> entry : - datanodes.entrySet()) { - info.put(entry.getKey(), entry.getValue().getVisitorInfo()); - } - visitor.visit(info); - } finally { - lock.unlock(); - } - } - - /** - * Close the DfsClientShmManager. - */ - @Override - public void close() throws IOException { - lock.lock(); - try { - if (closed) return; - closed = true; - } finally { - lock.unlock(); - } - // When closed, the domainSocketWatcher will issue callbacks that mark - // all the outstanding DfsClientShm segments as stale. - IOUtils.cleanup(LOG, domainSocketWatcher); - } - - - @Override - public String toString() { - return String.format("ShortCircuitShmManager(%08x)", - System.identityHashCode(this)); - } - - @VisibleForTesting - public DomainSocketWatcher getDomainSocketWatcher() { - return domainSocketWatcher; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index db4cbe2..15b8dea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; @@ -201,7 +201,7 @@ public class ShortCircuitCache implements Closeable { DataInputStream in = new DataInputStream(sock.getInputStream()); ReleaseShortCircuitAccessResponseProto resp = ReleaseShortCircuitAccessResponseProto.parseFrom( - PBHelper.vintPrefixed(in)); + PBHelperClient.vintPrefixed(in)); if (resp.getStatus() != Status.SUCCESS) { String error = resp.hasError() ? resp.getError() : "(unknown)"; throw new IOException(resp.getStatus().toString() + ": " + error); http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java deleted file mode 100644 index 7b89d0a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java +++ /dev/null @@ -1,646 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.FileInputStream; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.BitSet; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Random; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.InvalidRequestException; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.NativeIO.POSIX; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.StringUtils; - -import sun.misc.Unsafe; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ComparisonChain; -import com.google.common.primitives.Ints; - -/** - * A shared memory segment used to implement short-circuit reads. - */ -public class ShortCircuitShm { - private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class); - - protected static final int BYTES_PER_SLOT = 64; - - private static final Unsafe unsafe = safetyDance(); - - private static Unsafe safetyDance() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe)f.get(null); - } catch (Throwable e) { - LOG.error("failed to load misc.Unsafe", e); - } - return null; - } - - /** - * Calculate the usable size of a shared memory segment. - * We round down to a multiple of the slot size and do some validation. - * - * @param stream The stream we're using. - * @return The usable size of the shared memory segment. - */ - private static int getUsableLength(FileInputStream stream) - throws IOException { - int intSize = Ints.checkedCast(stream.getChannel().size()); - int slots = intSize / BYTES_PER_SLOT; - if (slots == 0) { - throw new IOException("size of shared memory segment was " + - intSize + ", but that is not enough to hold even one slot."); - } - return slots * BYTES_PER_SLOT; - } - - /** - * Identifies a DfsClientShm. - */ - public static class ShmId implements Comparable<ShmId> { - private static final Random random = new Random(); - private final long hi; - private final long lo; - - /** - * Generate a random ShmId. - * - * We generate ShmIds randomly to prevent a malicious client from - * successfully guessing one and using that to interfere with another - * client. - */ - public static ShmId createRandom() { - return new ShmId(random.nextLong(), random.nextLong()); - } - - public ShmId(long hi, long lo) { - this.hi = hi; - this.lo = lo; - } - - public long getHi() { - return hi; - } - - public long getLo() { - return lo; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - ShmId other = (ShmId)o; - return new EqualsBuilder(). - append(hi, other.hi). - append(lo, other.lo). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.hi). - append(this.lo). - toHashCode(); - } - - @Override - public String toString() { - return String.format("%016x%016x", hi, lo); - } - - @Override - public int compareTo(ShmId other) { - return ComparisonChain.start(). - compare(hi, other.hi). - compare(lo, other.lo). - result(); - } - }; - - /** - * Uniquely identifies a slot. - */ - public static class SlotId { - private final ShmId shmId; - private final int slotIdx; - - public SlotId(ShmId shmId, int slotIdx) { - this.shmId = shmId; - this.slotIdx = slotIdx; - } - - public ShmId getShmId() { - return shmId; - } - - public int getSlotIdx() { - return slotIdx; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - SlotId other = (SlotId)o; - return new EqualsBuilder(). - append(shmId, other.shmId). - append(slotIdx, other.slotIdx). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.shmId). - append(this.slotIdx). - toHashCode(); - } - - @Override - public String toString() { - return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx); - } - } - - public class SlotIterator implements Iterator<Slot> { - int slotIdx = -1; - - @Override - public boolean hasNext() { - synchronized (ShortCircuitShm.this) { - return allocatedSlots.nextSetBit(slotIdx + 1) != -1; - } - } - - @Override - public Slot next() { - synchronized (ShortCircuitShm.this) { - int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1); - if (nextSlotIdx == -1) { - throw new NoSuchElementException(); - } - slotIdx = nextSlotIdx; - return slots[nextSlotIdx]; - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("SlotIterator " + - "doesn't support removal"); - } - } - - /** - * A slot containing information about a replica. - * - * The format is: - * word 0 - * bit 0:32 Slot flags (see below). - * bit 33:63 Anchor count. - * word 1:7 - * Reserved for future use, such as statistics. - * Padding is also useful for avoiding false sharing. - * - * Little-endian versus big-endian is not relevant here since both the client - * and the server reside on the same computer and use the same orientation. - */ - public class Slot { - /** - * Flag indicating that the slot is valid. - * - * The DFSClient sets this flag when it allocates a new slot within one of - * its shared memory regions. - * - * The DataNode clears this flag when the replica associated with this slot - * is no longer valid. The client itself also clears this flag when it - * believes that the DataNode is no longer using this slot to communicate. - */ - private static final long VALID_FLAG = 1L<<63; - - /** - * Flag indicating that the slot can be anchored. - */ - private static final long ANCHORABLE_FLAG = 1L<<62; - - /** - * The slot address in memory. - */ - private final long slotAddress; - - /** - * BlockId of the block this slot is used for. - */ - private final ExtendedBlockId blockId; - - Slot(long slotAddress, ExtendedBlockId blockId) { - this.slotAddress = slotAddress; - this.blockId = blockId; - } - - /** - * Get the short-circuit memory segment associated with this Slot. - * - * @return The enclosing short-circuit memory segment. - */ - public ShortCircuitShm getShm() { - return ShortCircuitShm.this; - } - - /** - * Get the ExtendedBlockId associated with this slot. - * - * @return The ExtendedBlockId of this slot. - */ - public ExtendedBlockId getBlockId() { - return blockId; - } - - /** - * Get the SlotId of this slot, containing both shmId and slotIdx. - * - * @return The SlotId of this slot. - */ - public SlotId getSlotId() { - return new SlotId(getShmId(), getSlotIdx()); - } - - /** - * Get the Slot index. - * - * @return The index of this slot. - */ - public int getSlotIdx() { - return Ints.checkedCast( - (slotAddress - baseAddress) / BYTES_PER_SLOT); - } - - /** - * Clear the slot. - */ - void clear() { - unsafe.putLongVolatile(null, this.slotAddress, 0); - } - - private boolean isSet(long flag) { - long prev = unsafe.getLongVolatile(null, this.slotAddress); - return (prev & flag) != 0; - } - - private void setFlag(long flag) { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & flag) != 0) { - return; - } - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev | flag)); - } - - private void clearFlag(long flag) { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & flag) == 0) { - return; - } - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev & (~flag))); - } - - public boolean isValid() { - return isSet(VALID_FLAG); - } - - public void makeValid() { - setFlag(VALID_FLAG); - } - - public void makeInvalid() { - clearFlag(VALID_FLAG); - } - - public boolean isAnchorable() { - return isSet(ANCHORABLE_FLAG); - } - - public void makeAnchorable() { - setFlag(ANCHORABLE_FLAG); - } - - public void makeUnanchorable() { - clearFlag(ANCHORABLE_FLAG); - } - - public boolean isAnchored() { - long prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & VALID_FLAG) == 0) { - // Slot is no longer valid. - return false; - } - return ((prev & 0x7fffffff) != 0); - } - - /** - * Try to add an anchor for a given slot. - * - * When a slot is anchored, we know that the block it refers to is resident - * in memory. - * - * @return True if the slot is anchored. - */ - public boolean addAnchor() { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - if ((prev & VALID_FLAG) == 0) { - // Slot is no longer valid. - return false; - } - if ((prev & ANCHORABLE_FLAG) == 0) { - // Slot can't be anchored right now. - return false; - } - if ((prev & 0x7fffffff) == 0x7fffffff) { - // Too many other threads have anchored the slot (2 billion?) - return false; - } - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev + 1)); - return true; - } - - /** - * Remove an anchor for a given slot. - */ - public void removeAnchor() { - long prev; - do { - prev = unsafe.getLongVolatile(null, this.slotAddress); - Preconditions.checkState((prev & 0x7fffffff) != 0, - "Tried to remove anchor for slot " + slotAddress +", which was " + - "not anchored."); - } while (!unsafe.compareAndSwapLong(null, this.slotAddress, - prev, prev - 1)); - } - - @Override - public String toString() { - return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")"; - } - } - - /** - * ID for this SharedMemorySegment. - */ - private final ShmId shmId; - - /** - * The base address of the memory-mapped file. - */ - private final long baseAddress; - - /** - * The mmapped length of the shared memory segment - */ - private final int mmappedLength; - - /** - * The slots associated with this shared memory segment. - * slot[i] contains the slot at offset i * BYTES_PER_SLOT, - * or null if that slot is not allocated. - */ - private final Slot slots[]; - - /** - * A bitset where each bit represents a slot which is in use. - */ - private final BitSet allocatedSlots; - - /** - * Create the ShortCircuitShm. - * - * @param shmId The ID to use. - * @param stream The stream that we're going to use to create this - * shared memory segment. - * - * Although this is a FileInputStream, we are going to - * assume that the underlying file descriptor is writable - * as well as readable. It would be more appropriate to use - * a RandomAccessFile here, but that class does not have - * any public accessor which returns a FileDescriptor, - * unlike FileInputStream. - */ - public ShortCircuitShm(ShmId shmId, FileInputStream stream) - throws IOException { - if (!NativeIO.isAvailable()) { - throw new UnsupportedOperationException("NativeIO is not available."); - } - if (Shell.WINDOWS) { - throw new UnsupportedOperationException( - "DfsClientShm is not yet implemented for Windows."); - } - if (unsafe == null) { - throw new UnsupportedOperationException( - "can't use DfsClientShm because we failed to " + - "load misc.Unsafe."); - } - this.shmId = shmId; - this.mmappedLength = getUsableLength(stream); - this.baseAddress = POSIX.mmap(stream.getFD(), - POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); - this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; - this.allocatedSlots = new BitSet(slots.length); - if (LOG.isTraceEnabled()) { - LOG.trace("creating " + this.getClass().getSimpleName() + - "(shmId=" + shmId + - ", mmappedLength=" + mmappedLength + - ", baseAddress=" + String.format("%x", baseAddress) + - ", slots.length=" + slots.length + ")"); - } - } - - public final ShmId getShmId() { - return shmId; - } - - /** - * Determine if this shared memory object is empty. - * - * @return True if the shared memory object is empty. - */ - synchronized final public boolean isEmpty() { - return allocatedSlots.nextSetBit(0) == -1; - } - - /** - * Determine if this shared memory object is full. - * - * @return True if the shared memory object is full. - */ - synchronized final public boolean isFull() { - return allocatedSlots.nextClearBit(0) >= slots.length; - } - - /** - * Calculate the base address of a slot. - * - * @param slotIdx Index of the slot. - * @return The base address of the slot. - */ - private final long calculateSlotAddress(int slotIdx) { - long offset = slotIdx; - offset *= BYTES_PER_SLOT; - return this.baseAddress + offset; - } - - /** - * Allocate a new slot and register it. - * - * This function chooses an empty slot, initializes it, and then returns - * the relevant Slot object. - * - * @return The new slot. - */ - synchronized public final Slot allocAndRegisterSlot( - ExtendedBlockId blockId) { - int idx = allocatedSlots.nextClearBit(0); - if (idx >= slots.length) { - throw new RuntimeException(this + ": no more slots are available."); - } - allocatedSlots.set(idx, true); - Slot slot = new Slot(calculateSlotAddress(idx), blockId); - slot.clear(); - slot.makeValid(); - slots[idx] = slot; - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + - StringUtils.getStackTrace(Thread.currentThread())); - } - return slot; - } - - synchronized public final Slot getSlot(int slotIdx) - throws InvalidRequestException { - if (!allocatedSlots.get(slotIdx)) { - throw new InvalidRequestException(this + ": slot " + slotIdx + - " does not exist."); - } - return slots[slotIdx]; - } - - /** - * Register a slot. - * - * This function looks at a slot which has already been initialized (by - * another process), and registers it with us. Then, it returns the - * relevant Slot object. - * - * @return The slot. - * - * @throws InvalidRequestException - * If the slot index we're trying to allocate has not been - * initialized, or is already in use. - */ - synchronized public final Slot registerSlot(int slotIdx, - ExtendedBlockId blockId) throws InvalidRequestException { - if (slotIdx < 0) { - throw new InvalidRequestException(this + ": invalid negative slot " + - "index " + slotIdx); - } - if (slotIdx >= slots.length) { - throw new InvalidRequestException(this + ": invalid slot " + - "index " + slotIdx); - } - if (allocatedSlots.get(slotIdx)) { - throw new InvalidRequestException(this + ": slot " + slotIdx + - " is already in use."); - } - Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); - if (!slot.isValid()) { - throw new InvalidRequestException(this + ": slot " + slotIdx + - " is not marked as valid."); - } - slots[slotIdx] = slot; - allocatedSlots.set(slotIdx, true); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots + - StringUtils.getStackTrace(Thread.currentThread())); - } - return slot; - } - - /** - * Unregisters a slot. - * - * This doesn't alter the contents of the slot. It just means - * - * @param slotIdx Index of the slot to unregister. - */ - synchronized public final void unregisterSlot(int slotIdx) { - Preconditions.checkState(allocatedSlots.get(slotIdx), - "tried to unregister slot " + slotIdx + ", which was not registered."); - allocatedSlots.set(slotIdx, false); - slots[slotIdx] = null; - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": unregisterSlot " + slotIdx); - } - } - - /** - * Iterate over all allocated slots. - * - * Note that this method isn't safe if - * - * @return The slot iterator. - */ - public SlotIterator slotIterator() { - return new SlotIterator(); - } - - public void free() { - try { - POSIX.munmap(baseAddress, mmappedLength); - } catch (IOException e) { - LOG.warn(this + ": failed to munmap", e); - } - LOG.trace(this + ": freed"); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "(" + shmId + ")"; - } -}