http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index a95f397..02b20d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -698,7 +698,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements RpcController controller, GetDatanodeReportRequestProto req) throws ServiceException { try { - List<? extends DatanodeInfoProto> result = PBHelperClient.convert(server + List<? extends DatanodeInfoProto> result = PBHelper.convert(server .getDatanodeReport(PBHelper.convert(req.getType()))); return GetDatanodeReportResponseProto.newBuilder() .addAllDi(result).build(); @@ -892,7 +892,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements server.setQuota(req.getPath(), req.getNamespaceQuota(), req.getStoragespaceQuota(), req.hasStorageType() ? - PBHelperClient.convertStorageType(req.getStorageType()): null); + PBHelper.convertStorageType(req.getStorageType()): null); return VOID_SETQUOTA_RESPONSE; } catch (IOException e) { throw new ServiceException(e); @@ -992,7 +992,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements GetDelegationTokenResponseProto.Builder rspBuilder = GetDelegationTokenResponseProto.newBuilder(); if (token != null) { - rspBuilder.setToken(PBHelperClient.convert(token)); + rspBuilder.setToken(PBHelper.convert(token)); } return rspBuilder.build(); } catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index a0431b1..7e57b97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -390,7 +390,7 @@ public class ClientNamenodeProtocolTranslatorPB implements String holder) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder() - .setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder) + .setB(PBHelper.convert(b)).setSrc(src).setHolder(holder) .setFileId(fileId).build(); try { rpcProxy.abandonBlock(null, req); @@ -409,9 +409,9 @@ public class ClientNamenodeProtocolTranslatorPB implements AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) - req.setPrevious(PBHelperClient.convert(previous)); - if (excludeNodes != null) - req.addAllExcludeNodes(PBHelperClient.convert(excludeNodes)); + req.setPrevious(PBHelper.convert(previous)); + if (excludeNodes != null) + req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } @@ -433,10 +433,10 @@ public class ClientNamenodeProtocolTranslatorPB implements .newBuilder() .setSrc(src) .setFileId(fileId) - .setBlk(PBHelperClient.convert(blk)) - .addAllExistings(PBHelperClient.convert(existings)) + .setBlk(PBHelper.convert(blk)) + .addAllExistings(PBHelper.convert(existings)) .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs)) - .addAllExcludes(PBHelperClient.convert(excludes)) + .addAllExcludes(PBHelper.convert(excludes)) .setNumAdditionalNodes(numAdditionalNodes) .setClientName(clientName) .build(); @@ -458,7 +458,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setClientName(clientName) .setFileId(fileId); if (last != null) - req.setLast(PBHelperClient.convert(last)); + req.setLast(PBHelper.convert(last)); try { return rpcProxy.complete(null, req.build()).getResult(); } catch (ServiceException e) { @@ -817,7 +817,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setNamespaceQuota(namespaceQuota) .setStoragespaceQuota(storagespaceQuota); if (type != null) { - builder.setStorageType(PBHelperClient.convertStorageType(type)); + builder.setStorageType(PBHelper.convertStorageType(type)); } final SetQuotaRequestProto req = builder.build(); try { @@ -895,7 +895,7 @@ public class ClientNamenodeProtocolTranslatorPB implements String clientName) throws IOException { UpdateBlockForPipelineRequestProto req = UpdateBlockForPipelineRequestProto .newBuilder() - .setBlock(PBHelperClient.convert(block)) + .setBlock(PBHelper.convert(block)) .setClientName(clientName) .build(); try { @@ -911,8 +911,8 @@ public class ClientNamenodeProtocolTranslatorPB implements ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException { UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() .setClientName(clientName) - .setOldBlock(PBHelperClient.convert(oldBlock)) - .setNewBlock(PBHelperClient.convert(newBlock)) + .setOldBlock(PBHelper.convert(oldBlock)) + .setNewBlock(PBHelper.convert(newBlock)) .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes))) .addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs)) .build(); @@ -943,7 +943,7 @@ public class ClientNamenodeProtocolTranslatorPB implements public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException { RenewDelegationTokenRequestProto req = RenewDelegationTokenRequestProto.newBuilder(). - setToken(PBHelperClient.convert(token)). + setToken(PBHelper.convert(token)). build(); try { return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime(); @@ -957,7 +957,7 @@ public class ClientNamenodeProtocolTranslatorPB implements throws IOException { CancelDelegationTokenRequestProto req = CancelDelegationTokenRequestProto .newBuilder() - .setToken(PBHelperClient.convert(token)) + .setToken(PBHelper.convert(token)) .build(); try { rpcProxy.cancelDelegationToken(null, req); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 0b46927..94028a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -298,11 +298,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements ) throws IOException { CommitBlockSynchronizationRequestProto.Builder builder = CommitBlockSynchronizationRequestProto.newBuilder() - .setBlock(PBHelperClient.convert(block)).setNewGenStamp(newgenerationstamp) + .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp) .setNewLength(newlength).setCloseFile(closeFile) .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { - builder.addNewTaragets(PBHelperClient.convert(newtargets[i])); + builder.addNewTaragets(PBHelper.convert(newtargets[i])); builder.addNewTargetStorages(newtargetstorages[i]); } CommitBlockSynchronizationRequestProto req = builder.build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 17ba196..fee62a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -105,7 +105,7 @@ public class InterDatanodeProtocolTranslatorPB implements long recoveryId, long newBlockId, long newLength) throws IOException { UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto.newBuilder() - .setBlock(PBHelperClient.convert(oldBlock)) + .setBlock(PBHelper.convert(oldBlock)) .setNewLength(newLength).setNewBlockId(newBlockId) .setRecoveryId(recoveryId).build(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index bcb96ba..82c5c4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -101,7 +101,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() - .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) + .setDatanode(PBHelper.convert((DatanodeID)datanode)).setSize(size) .build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index dbb1861..a252262 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -347,7 +347,7 @@ public class PBHelper { if (types == null || types.length == 0) { return null; } - List<StorageTypeProto> list = PBHelperClient.convertStorageTypes(types); + List<StorageTypeProto> list = convertStorageTypes(types); return StorageTypesProto.newBuilder().addAllStorageTypes(list).build(); } @@ -382,6 +382,20 @@ public class PBHelper { .getInfoSecurePort() : 0, dn.getIpcPort()); } + public static DatanodeIDProto convert(DatanodeID dn) { + // For wire compatibility with older versions we transmit the StorageID + // which is the same as the DatanodeUuid. Since StorageID is a required + // field we pass the empty string if the DatanodeUuid is not yet known. + return DatanodeIDProto.newBuilder() + .setIpAddr(dn.getIpAddr()) + .setHostName(dn.getHostName()) + .setXferPort(dn.getXferPort()) + .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") + .setInfoPort(dn.getInfoPort()) + .setInfoSecurePort(dn.getInfoSecurePort()) + .setIpcPort(dn.getIpcPort()).build(); + } + // Arrays of DatanodeId public static DatanodeIDProto[] convert(DatanodeID[] did) { if (did == null) @@ -389,7 +403,7 @@ public class PBHelper { final int len = did.length; DatanodeIDProto[] result = new DatanodeIDProto[len]; for (int i = 0; i < len; ++i) { - result[i] = PBHelperClient.convert(did[i]); + result[i] = convert(did[i]); } return result; } @@ -420,7 +434,7 @@ public class PBHelper { .setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) - .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())) + .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())) .build(); } @@ -582,6 +596,16 @@ public class PBHelper { eb.getGenerationStamp()); } + public static ExtendedBlockProto convert(final ExtendedBlock b) { + if (b == null) return null; + return ExtendedBlockProto.newBuilder(). + setPoolId(b.getBlockPoolId()). + setBlockId(b.getBlockId()). + setNumBytes(b.getNumBytes()). + setGenerationStamp(b.getGenerationStamp()). + build(); + } + public static RecoveringBlockProto convert(RecoveringBlock b) { if (b == null) { return null; @@ -602,6 +626,17 @@ public class PBHelper { new RecoveringBlock(block, locs, b.getNewGenStamp()); } + public static DatanodeInfoProto.AdminState convert( + final DatanodeInfo.AdminStates inAs) { + switch (inAs) { + case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; + case DECOMMISSION_INPROGRESS: + return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; + case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; + default: return DatanodeInfoProto.AdminState.NORMAL; + } + } + static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) return null; return new DatanodeInfo( @@ -613,6 +648,12 @@ public class PBHelper { di.getXceiverCount(), PBHelper.convert(di.getAdminState())); } + static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { + if (di == null) return null; + return convert(di); + } + + static public DatanodeInfo[] convert(DatanodeInfoProto di[]) { if (di == null) return null; DatanodeInfo[] result = new DatanodeInfo[di.length]; @@ -622,6 +663,27 @@ public class PBHelper { return result; } + public static List<? extends HdfsProtos.DatanodeInfoProto> convert( + DatanodeInfo[] dnInfos) { + return convert(dnInfos, 0); + } + + /** + * Copy from {@code dnInfos} to a target of list of same size starting at + * {@code startIdx}. + */ + public static List<? extends HdfsProtos.DatanodeInfoProto> convert( + DatanodeInfo[] dnInfos, int startIdx) { + if (dnInfos == null) + return null; + ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists + .newArrayListWithCapacity(dnInfos.length); + for (int i = startIdx; i < dnInfos.length; i++) { + protos.add(convert(dnInfos[i])); + } + return protos; + } + public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) { DatanodeInfo[] info = new DatanodeInfo[list.size()]; for (int i = 0; i < info.length; i++) { @@ -629,11 +691,32 @@ public class PBHelper { } return info; } + + public static DatanodeInfoProto convert(DatanodeInfo info) { + DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); + if (info.getNetworkLocation() != null) { + builder.setLocation(info.getNetworkLocation()); + } + builder + .setId(PBHelper.convert((DatanodeID)info)) + .setCapacity(info.getCapacity()) + .setDfsUsed(info.getDfsUsed()) + .setRemaining(info.getRemaining()) + .setBlockPoolUsed(info.getBlockPoolUsed()) + .setCacheCapacity(info.getCacheCapacity()) + .setCacheUsed(info.getCacheUsed()) + .setLastUpdate(info.getLastUpdate()) + .setLastUpdateMonotonic(info.getLastUpdateMonotonic()) + .setXceiverCount(info.getXceiverCount()) + .setAdminState(PBHelper.convert(info.getAdminState())) + .build(); + return builder.build(); + } public static DatanodeStorageReportProto convertDatanodeStorageReport( DatanodeStorageReport report) { return DatanodeStorageReportProto.newBuilder() - .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo())) + .setDatanodeInfo(convert(report.getDatanodeInfo())) .addAllStorageReports(convertStorageReports(report.getStorageReports())) .build(); } @@ -685,7 +768,7 @@ public class PBHelper { Lists.newLinkedList(Arrays.asList(b.getCachedLocations())); for (int i = 0; i < locs.length; i++) { DatanodeInfo loc = locs[i]; - builder.addLocs(i, PBHelperClient.convert(loc)); + builder.addLocs(i, PBHelper.convert(loc)); boolean locIsCached = cachedLocs.contains(loc); builder.addIsCached(locIsCached); if (locIsCached) { @@ -699,7 +782,7 @@ public class PBHelper { StorageType[] storageTypes = b.getStorageTypes(); if (storageTypes != null) { for (int i = 0; i < storageTypes.length; ++i) { - builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i])); + builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); } } final String[] storageIDs = b.getStorageIDs(); @@ -707,8 +790,8 @@ public class PBHelper { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } - return builder.setB(PBHelperClient.convert(b.getBlock())) - .setBlockToken(PBHelperClient.convert(b.getBlockToken())) + return builder.setB(PBHelper.convert(b.getBlock())) + .setBlockToken(PBHelper.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } @@ -749,6 +832,14 @@ public class PBHelper { return lb; } + public static TokenProto convert(Token<?> tok) { + return TokenProto.newBuilder(). + setIdentifier(ByteString.copyFrom(tok.getIdentifier())). + setPassword(ByteString.copyFrom(tok.getPassword())). + setKind(tok.getKind().toString()). + setService(tok.getService().toString()).build(); + } + public static Token<BlockTokenIdentifier> convert( TokenProto blockToken) { return new Token<BlockTokenIdentifier>(blockToken.getIdentifier() @@ -800,7 +891,7 @@ public class PBHelper { DatanodeRegistration registration) { DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto .newBuilder(); - return builder.setDatanodeID(PBHelperClient.convert((DatanodeID) registration)) + return builder.setDatanodeID(PBHelper.convert((DatanodeID) registration)) .setStorageInfo(PBHelper.convert(registration.getStorageInfo())) .setKeys(PBHelper.convert(registration.getExportedKeys())) .setSoftwareVersion(registration.getSoftwareVersion()).build(); @@ -892,7 +983,7 @@ public class PBHelper { if (types != null) { for (StorageType[] ts : types) { StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); - builder.addAllStorageTypes(PBHelperClient.convertStorageTypes(ts)); + builder.addAllStorageTypes(convertStorageTypes(ts)); list.add(builder.build()); } } @@ -923,7 +1014,7 @@ public class PBHelper { DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; for (int i = 0; i < targets.length; i++) { ret[i] = DatanodeInfosProto.newBuilder() - .addAllDatanodes(PBHelperClient.convert(targets[i])).build(); + .addAllDatanodes(PBHelper.convert(targets[i])).build(); } return Arrays.asList(ret); } @@ -1247,7 +1338,7 @@ public class PBHelper { fs.getFileBufferSize(), fs.getEncryptDataTransfer(), fs.getTrashInterval(), - PBHelperClient.convert(fs.getChecksumType())); + PBHelper.convert(fs.getChecksumType())); } public static FsServerDefaultsProto convert(FsServerDefaults fs) { @@ -1260,7 +1351,7 @@ public class PBHelper { .setFileBufferSize(fs.getFileBufferSize()) .setEncryptDataTransfer(fs.getEncryptDataTransfer()) .setTrashInterval(fs.getTrashInterval()) - .setChecksumType(PBHelperClient.convert(fs.getChecksumType())) + .setChecksumType(PBHelper.convert(fs.getChecksumType())) .build(); } @@ -1648,7 +1739,7 @@ public class PBHelper { if (cs.hasTypeQuotaInfos()) { for (HdfsProtos.StorageTypeQuotaInfoProto info : cs.getTypeQuotaInfos().getTypeQuotaInfoList()) { - StorageType type = PBHelperClient.convertStorageType(info.getType()); + StorageType type = PBHelper.convertStorageType(info.getType()); builder.typeConsumed(type, info.getConsumed()); builder.typeQuota(type, info.getQuota()); } @@ -1672,7 +1763,7 @@ public class PBHelper { for (StorageType t: StorageType.getTypesSupportingQuota()) { HdfsProtos.StorageTypeQuotaInfoProto info = HdfsProtos.StorageTypeQuotaInfoProto.newBuilder(). - setType(PBHelperClient.convertStorageType(t)). + setType(convertStorageType(t)). setConsumed(cs.getTypeConsumed(t)). setQuota(cs.getTypeQuota(t)). build(); @@ -1717,7 +1808,7 @@ public class PBHelper { public static DatanodeStorageProto convert(DatanodeStorage s) { return DatanodeStorageProto.newBuilder() .setState(PBHelper.convertState(s.getState())) - .setStorageType(PBHelperClient.convertStorageType(s.getStorageType())) + .setStorageType(PBHelper.convertStorageType(s.getStorageType())) .setStorageUuid(s.getStorageID()).build(); } @@ -1731,10 +1822,44 @@ public class PBHelper { } } + public static List<StorageTypeProto> convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List<StorageTypeProto> convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(convertStorageType(types[i])); + } + return protos; + } + + public static StorageTypeProto convertStorageType(StorageType type) { + switch(type) { + case DISK: + return StorageTypeProto.DISK; + case SSD: + return StorageTypeProto.SSD; + case ARCHIVE: + return StorageTypeProto.ARCHIVE; + case RAM_DISK: + return StorageTypeProto.RAM_DISK; + default: + throw new IllegalStateException( + "BUG: StorageType not found, type=" + type); + } + } + public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelperClient.convertStorageType(s.getStorageType())); + PBHelper.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1747,6 +1872,22 @@ public class PBHelper { } } + public static StorageType convertStorageType(StorageTypeProto type) { + switch(type) { + case DISK: + return StorageType.DISK; + case SSD: + return StorageType.SSD; + case ARCHIVE: + return StorageType.ARCHIVE; + case RAM_DISK: + return StorageType.RAM_DISK; + default: + throw new IllegalStateException( + "BUG: StorageTypeProto not found, type=" + type); + } + } + public static StorageType[] convertStorageTypes( List<StorageTypeProto> storageTypesList, int expectedSize) { final StorageType[] storageTypes = new StorageType[expectedSize]; @@ -1755,7 +1896,7 @@ public class PBHelper { Arrays.fill(storageTypes, StorageType.DEFAULT); } else { for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = PBHelperClient.convertStorageType(storageTypesList.get(i)); + storageTypes[i] = convertStorageType(storageTypesList.get(i)); } } return storageTypes; @@ -1939,6 +2080,10 @@ public class PBHelper { return reportProto; } + public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { + return DataChecksum.Type.valueOf(type.getNumber()); + } + public static CacheDirectiveInfoProto convert (CacheDirectiveInfo info) { CacheDirectiveInfoProto.Builder builder = @@ -2111,6 +2256,9 @@ public class PBHelper { return new CachePoolEntry(info, stats); } + public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { + return HdfsProtos.ChecksumTypeProto.valueOf(type.id); + } public static DatanodeLocalInfoProto convert(DatanodeLocalInfo info) { DatanodeLocalInfoProto.Builder builder = DatanodeLocalInfoProto.newBuilder(); @@ -2125,6 +2273,17 @@ public class PBHelper { proto.getConfigVersion(), proto.getUptime()); } + public static InputStream vintPrefixed(final InputStream input) + throws IOException { + final int firstByte = input.read(); + if (firstByte == -1) { + throw new EOFException("Premature EOF: no length prefix available"); + } + + int size = CodedInputStream.readRawVarint32(firstByte, input); + assert size >= 0; + return new ExactSizeInputStream(input, size); + } private static AclEntryScopeProto convert(AclEntryScope v) { return AclEntryScopeProto.valueOf(v.ordinal()); @@ -2348,11 +2507,30 @@ public class PBHelper { proto.getKeyName()); } + public static ShortCircuitShmSlotProto convert(SlotId slotId) { + return ShortCircuitShmSlotProto.newBuilder(). + setShmId(convert(slotId.getShmId())). + setSlotIdx(slotId.getSlotIdx()). + build(); + } + + public static ShortCircuitShmIdProto convert(ShmId shmId) { + return ShortCircuitShmIdProto.newBuilder(). + setHi(shmId.getHi()). + setLo(shmId.getLo()). + build(); + + } + public static SlotId convert(ShortCircuitShmSlotProto slotId) { - return new SlotId(PBHelperClient.convert(slotId.getShmId()), + return new SlotId(PBHelper.convert(slotId.getShmId()), slotId.getSlotIdx()); } + public static ShmId convert(ShortCircuitShmIdProto shmId) { + return new ShmId(shmId.getHi(), shmId.getLo()); + } + private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType type) { switch (type) { @@ -2859,6 +3037,18 @@ public class PBHelper { ezKeyVersionName); } + public static List<Boolean> convert(boolean[] targetPinnings, int idx) { + List<Boolean> pinnings = new ArrayList<Boolean>(); + if (targetPinnings == null) { + pinnings.add(Boolean.FALSE); + } else { + for (; idx < targetPinnings.length; ++idx) { + pinnings.add(Boolean.valueOf(targetPinnings[idx])); + } + } + return pinnings; + } + public static boolean[] convertBooleanList( List<Boolean> targetPinningsList) { final boolean[] targetPinnings = new boolean[targetPinningsList.size()]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java new file mode 100644 index 0000000..2fa86fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.security.token.block; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Access token verification failed. + */ [email protected] [email protected] +public class InvalidBlockTokenException extends IOException { + private static final long serialVersionUID = 168L; + + public InvalidBlockTokenException() { + super(); + } + + public InvalidBlockTokenException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index dc967ff..40564df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.balancer; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java new file mode 100644 index 0000000..215df13 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +/** + * The caching strategy we should use for an HDFS read or write operation. + */ +public class CachingStrategy { + private final Boolean dropBehind; // null = use server defaults + private final Long readahead; // null = use server defaults + + public static CachingStrategy newDefaultStrategy() { + return new CachingStrategy(null, null); + } + + public static CachingStrategy newDropBehind() { + return new CachingStrategy(true, null); + } + + public static class Builder { + private Boolean dropBehind; + private Long readahead; + + public Builder(CachingStrategy prev) { + this.dropBehind = prev.dropBehind; + this.readahead = prev.readahead; + } + + public Builder setDropBehind(Boolean dropBehind) { + this.dropBehind = dropBehind; + return this; + } + + public Builder setReadahead(Long readahead) { + this.readahead = readahead; + return this; + } + + public CachingStrategy build() { + return new CachingStrategy(dropBehind, readahead); + } + } + + public CachingStrategy(Boolean dropBehind, Long readahead) { + this.dropBehind = dropBehind; + this.readahead = readahead; + } + + public Boolean getDropBehind() { + return dropBehind; + } + + public Long getReadahead() { + return readahead; + } + + public String toString() { + return "CachingStrategy(dropBehind=" + dropBehind + + ", readahead=" + readahead + ")"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index df03808..fa3b78c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -137,7 +137,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; @@ -2172,7 +2172,7 @@ public class DataNode extends ReconfigurableBase // read ack if (isClient) { DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index dfaa525..e9cf436 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; @@ -427,7 +427,7 @@ class DataXceiver extends Receiver implements Runnable { throws IOException { DataNodeFaultInjector.get().sendShortCircuitShmResponse(); ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). - setId(PBHelperClient.convert(shmInfo.shmId)).build(). + setId(PBHelper.convert(shmInfo.shmId)).build(). writeDelimitedTo(socketOut); // Send the file descriptor for the shared memory segment. byte buf[] = new byte[] { (byte)0 }; @@ -559,7 +559,7 @@ class DataXceiver extends Receiver implements Runnable { // to respond with a Status enum. try { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (!stat.hasStatus()) { LOG.warn("Client " + peer.getRemoteAddressString() + " did not send a valid status code after reading. " + @@ -745,7 +745,7 @@ class DataXceiver extends Receiver implements Runnable { // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { @@ -962,7 +962,7 @@ class DataXceiver extends Receiver implements Runnable { .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelperClient.convert(checksum.getChecksumType()))) + .setCrcType(PBHelper.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); @@ -1147,8 +1147,8 @@ class DataXceiver extends Receiver implements Runnable { // receive the response from the proxy BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(proxyReply)); - + PBHelper.vintPrefixed(proxyReply)); + String logInfo = "copy block " + block + " from " + proxySock.getRemoteSocketAddress(); DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 0afb06c..9df1713 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -155,7 +154,7 @@ public final class FSImageFormatPBINode { QuotaByStorageTypeFeatureProto proto) { ImmutableList.Builder<QuotaByStorageTypeEntry> b = ImmutableList.builder(); for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) { - StorageType type = PBHelperClient.convertStorageType(quotaEntry.getStorageType()); + StorageType type = PBHelper.convertStorageType(quotaEntry.getStorageType()); long quota = quotaEntry.getQuota(); b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type) .setQuota(quota).build()); @@ -460,7 +459,7 @@ public final class FSImageFormatPBINode { if (q.getTypeSpace(t) >= 0) { QuotaByStorageTypeEntryProto.Builder eb = QuotaByStorageTypeEntryProto.newBuilder(). - setStorageType(PBHelperClient.convertStorageType(t)). + setStorageType(PBHelper.convertStorageType(t)). setQuota(q.getTypeSpace(t)); b.addQuotas(eb); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java new file mode 100644 index 0000000..81cc68d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; + +import com.google.common.base.Preconditions; + +/** + * DfsClientShm is a subclass of ShortCircuitShm which is used by the + * DfsClient. + * When the UNIX domain socket associated with this shared memory segment + * closes unexpectedly, we mark the slots inside this segment as disconnected. + * ShortCircuitReplica objects that contain disconnected slots are stale, + * and will not be used to service new reads or mmap operations. + * However, in-progress read or mmap operations will continue to proceed. + * Once the last slot is deallocated, the segment can be safely munmapped. + * + * Slots may also become stale because the associated replica has been deleted + * on the DataNode. In this case, the DataNode will clear the 'valid' bit. + * The client will then see these slots as stale (see + * #{ShortCircuitReplica#isStale}). + */ +public class DfsClientShm extends ShortCircuitShm + implements DomainSocketWatcher.Handler { + /** + * The EndpointShmManager associated with this shared memory segment. + */ + private final EndpointShmManager manager; + + /** + * The UNIX domain socket associated with this DfsClientShm. + * We rely on the DomainSocketWatcher to close the socket associated with + * this DomainPeer when necessary. + */ + private final DomainPeer peer; + + /** + * True if this shared memory segment has lost its connection to the + * DataNode. + * + * {@link DfsClientShm#handle} sets this to true. + */ + private boolean disconnected = false; + + DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, + DomainPeer peer) throws IOException { + super(shmId, stream); + this.manager = manager; + this.peer = peer; + } + + public EndpointShmManager getEndpointShmManager() { + return manager; + } + + public DomainPeer getPeer() { + return peer; + } + + /** + * Determine if the shared memory segment is disconnected from the DataNode. + * + * This must be called with the DfsClientShmManager lock held. + * + * @return True if the shared memory segment is stale. + */ + public synchronized boolean isDisconnected() { + return disconnected; + } + + /** + * Handle the closure of the UNIX domain socket associated with this shared + * memory segment by marking this segment as stale. + * + * If there are no slots associated with this shared memory segment, it will + * be freed immediately in this function. + */ + @Override + public boolean handle(DomainSocket sock) { + manager.unregisterShm(getShmId()); + synchronized (this) { + Preconditions.checkState(!disconnected); + disconnected = true; + boolean hadSlots = false; + for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) { + Slot slot = iter.next(); + slot.makeInvalid(); + hadSlots = true; + } + if (!hadSlots) { + free(); + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java new file mode 100644 index 0000000..062539a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Manages short-circuit memory segments for an HDFS client. + * + * Clients are responsible for requesting and releasing shared memory segments used + * for communicating with the DataNode. The client will try to allocate new slots + * in the set of existing segments, falling back to getting a new segment from the + * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}. + * + * The counterpart to this class on the DataNode is {@link ShortCircuitRegistry}. + * See {@link ShortCircuitRegistry} for more information on the communication protocol. + */ [email protected] +public class DfsClientShmManager implements Closeable { + private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class); + + /** + * Manages short-circuit memory segments that pertain to a given DataNode. + */ + class EndpointShmManager { + /** + * The datanode we're managing. + */ + private final DatanodeInfo datanode; + + /** + * Shared memory segments which have no empty slots. + * + * Protected by the manager lock. + */ + private final TreeMap<ShmId, DfsClientShm> full = + new TreeMap<ShmId, DfsClientShm>(); + + /** + * Shared memory segments which have at least one empty slot. + * + * Protected by the manager lock. + */ + private final TreeMap<ShmId, DfsClientShm> notFull = + new TreeMap<ShmId, DfsClientShm>(); + + /** + * True if this datanode doesn't support short-circuit shared memory + * segments. + * + * Protected by the manager lock. + */ + private boolean disabled = false; + + /** + * True if we're in the process of loading a shared memory segment from + * this DataNode. + * + * Protected by the manager lock. + */ + private boolean loading = false; + + EndpointShmManager (DatanodeInfo datanode) { + this.datanode = datanode; + } + + /** + * Pull a slot out of a preexisting shared memory segment. + * + * Must be called with the manager lock held. + * + * @param blockId The blockId to put inside the Slot object. + * + * @return null if none of our shared memory segments contain a + * free slot; the slot object otherwise. + */ + private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { + if (notFull.isEmpty()) { + return null; + } + Entry<ShmId, DfsClientShm> entry = notFull.firstEntry(); + DfsClientShm shm = entry.getValue(); + ShmId shmId = shm.getShmId(); + Slot slot = shm.allocAndRegisterSlot(blockId); + if (shm.isFull()) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + + " out of " + shm); + } + DfsClientShm removedShm = notFull.remove(shmId); + Preconditions.checkState(removedShm == shm); + full.put(shmId, shm); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + + " out of " + shm); + } + } + return slot; + } + + /** + * Ask the DataNode for a new shared memory segment. This function must be + * called with the manager lock held. We will release the lock while + * communicating with the DataNode. + * + * @param clientName The current client name. + * @param peer The peer to use to talk to the DataNode. + * + * @return Null if the DataNode does not support shared memory + * segments, or experienced an error creating the + * shm. The shared memory segment itself on success. + * @throws IOException If there was an error communicating over the socket. + * We will not throw an IOException unless the socket + * itself (or the network) is the problem. + */ + private DfsClientShm requestNewShm(String clientName, DomainPeer peer) + throws IOException { + final DataOutputStream out = + new DataOutputStream( + new BufferedOutputStream(peer.getOutputStream())); + new Sender(out).requestShortCircuitShm(clientName); + ShortCircuitShmResponseProto resp = + ShortCircuitShmResponseProto.parseFrom( + PBHelper.vintPrefixed(peer.getInputStream())); + String error = resp.hasError() ? resp.getError() : "(unknown)"; + switch (resp.getStatus()) { + case SUCCESS: + DomainSocket sock = peer.getDomainSocket(); + byte buf[] = new byte[1]; + FileInputStream fis[] = new FileInputStream[1]; + if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { + throw new EOFException("got EOF while trying to transfer the " + + "file descriptor for the shared memory segment."); + } + if (fis[0] == null) { + throw new IOException("the datanode " + datanode + " failed to " + + "pass a file descriptor for the shared memory segment."); + } + try { + DfsClientShm shm = + new DfsClientShm(PBHelper.convert(resp.getId()), + fis[0], this, peer); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": createNewShm: created " + shm); + } + return shm; + } finally { + IOUtils.cleanup(LOG, fis[0]); + } + case ERROR_UNSUPPORTED: + // The DataNode just does not support short-circuit shared memory + // access, and we should stop asking. + LOG.info(this + ": datanode does not support short-circuit " + + "shared memory access: " + error); + disabled = true; + return null; + default: + // The datanode experienced some kind of unexpected error when trying to + // create the short-circuit shared memory segment. + LOG.warn(this + ": error requesting short-circuit shared memory " + + "access: " + error); + return null; + } + } + + /** + * Allocate a new shared memory slot connected to this datanode. + * + * Must be called with the EndpointShmManager lock held. + * + * @param peer The peer to use to talk to the DataNode. + * @param usedPeer (out param) Will be set to true if we used the peer. + * When a peer is used + * + * @param clientName The client name. + * @param blockId The block ID to use. + * @return null if the DataNode does not support shared memory + * segments, or experienced an error creating the + * shm. The shared memory segment itself on success. + * @throws IOException If there was an error communicating over the socket. + */ + Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, + String clientName, ExtendedBlockId blockId) throws IOException { + while (true) { + if (closed) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": the DfsClientShmManager has been closed."); + } + return null; + } + if (disabled) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": shared memory segment access is disabled."); + } + return null; + } + // Try to use an existing slot. + Slot slot = allocSlotFromExistingShm(blockId); + if (slot != null) { + return slot; + } + // There are no free slots. If someone is loading more slots, wait + // for that to finish. + if (loading) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": waiting for loading to finish..."); + } + finishedLoading.awaitUninterruptibly(); + } else { + // Otherwise, load the slot ourselves. + loading = true; + lock.unlock(); + DfsClientShm shm; + try { + shm = requestNewShm(clientName, peer); + if (shm == null) continue; + // See #{DfsClientShmManager#domainSocketWatcher} for details + // about why we do this before retaking the manager lock. + domainSocketWatcher.add(peer.getDomainSocket(), shm); + // The DomainPeer is now our responsibility, and should not be + // closed by the caller. + usedPeer.setValue(true); + } finally { + lock.lock(); + loading = false; + finishedLoading.signalAll(); + } + if (shm.isDisconnected()) { + // If the peer closed immediately after the shared memory segment + // was created, the DomainSocketWatcher callback might already have + // fired and marked the shm as disconnected. In this case, we + // obviously don't want to add the SharedMemorySegment to our list + // of valid not-full segments. + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": the UNIX domain socket associated with " + + "this short-circuit memory closed before we could make " + + "use of the shm."); + } + } else { + notFull.put(shm.getShmId(), shm); + } + } + } + } + + /** + * Stop tracking a slot. + * + * Must be called with the EndpointShmManager lock held. + * + * @param slot The slot to release. + */ + void freeSlot(Slot slot) { + DfsClientShm shm = (DfsClientShm)slot.getShm(); + shm.unregisterSlot(slot.getSlotIdx()); + if (shm.isDisconnected()) { + // Stale shared memory segments should not be tracked here. + Preconditions.checkState(!full.containsKey(shm.getShmId())); + Preconditions.checkState(!notFull.containsKey(shm.getShmId())); + if (shm.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": freeing empty stale " + shm); + } + shm.free(); + } + } else { + ShmId shmId = shm.getShmId(); + full.remove(shmId); // The shm can't be full if we just freed a slot. + if (shm.isEmpty()) { + notFull.remove(shmId); + + // If the shared memory segment is now empty, we call shutdown(2) on + // the UNIX domain socket associated with it. The DomainSocketWatcher, + // which is watching this socket, will call DfsClientShm#handle, + // cleaning up this shared memory segment. + // + // See #{DfsClientShmManager#domainSocketWatcher} for details about why + // we don't want to call DomainSocketWatcher#remove directly here. + // + // Note that we could experience 'fragmentation' here, where the + // DFSClient allocates a bunch of slots in different shared memory + // segments, and then frees most of them, but never fully empties out + // any segment. We make some attempt to avoid this fragmentation by + // always allocating new slots out of the shared memory segment with the + // lowest ID, but it could still occur. In most workloads, + // fragmentation should not be a major concern, since it doesn't impact + // peak file descriptor usage or the speed of allocation. + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": shutting down UNIX domain socket for " + + "empty " + shm); + } + shutdown(shm); + } else { + notFull.put(shmId, shm); + } + } + } + + /** + * Unregister a shared memory segment. + * + * Once a segment is unregistered, we will not allocate any more slots + * inside that segment. + * + * The DomainSocketWatcher calls this while holding the DomainSocketWatcher + * lock. + * + * @param shmId The ID of the shared memory segment to unregister. + */ + void unregisterShm(ShmId shmId) { + lock.lock(); + try { + full.remove(shmId); + notFull.remove(shmId); + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + return String.format("EndpointShmManager(%s, parent=%s)", + datanode, DfsClientShmManager.this); + } + + PerDatanodeVisitorInfo getVisitorInfo() { + return new PerDatanodeVisitorInfo(full, notFull, disabled); + } + + final void shutdown(DfsClientShm shm) { + try { + shm.getPeer().getDomainSocket().shutdown(); + } catch (IOException e) { + LOG.warn(this + ": error shutting down shm: got IOException calling " + + "shutdown(SHUT_RDWR)", e); + } + } + } + + private boolean closed = false; + + private final ReentrantLock lock = new ReentrantLock(); + + /** + * A condition variable which is signalled when we finish loading a segment + * from the Datanode. + */ + private final Condition finishedLoading = lock.newCondition(); + + /** + * Information about each Datanode. + */ + private final HashMap<DatanodeInfo, EndpointShmManager> datanodes = + new HashMap<DatanodeInfo, EndpointShmManager>(1); + + /** + * The DomainSocketWatcher which keeps track of the UNIX domain socket + * associated with each shared memory segment. + * + * Note: because the DomainSocketWatcher makes callbacks into this + * DfsClientShmManager object, you must MUST NOT attempt to take the + * DomainSocketWatcher lock while holding the DfsClientShmManager lock, + * or else deadlock might result. This means that most DomainSocketWatcher + * methods are off-limits unless you release the manager lock first. + */ + private final DomainSocketWatcher domainSocketWatcher; + + DfsClientShmManager(int interruptCheckPeriodMs) throws IOException { + this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs, + "client"); + } + + public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, + MutableBoolean usedPeer, ExtendedBlockId blockId, + String clientName) throws IOException { + lock.lock(); + try { + if (closed) { + LOG.trace(this + ": the DfsClientShmManager isclosed."); + return null; + } + EndpointShmManager shmManager = datanodes.get(datanode); + if (shmManager == null) { + shmManager = new EndpointShmManager(datanode); + datanodes.put(datanode, shmManager); + } + return shmManager.allocSlot(peer, usedPeer, clientName, blockId); + } finally { + lock.unlock(); + } + } + + public void freeSlot(Slot slot) { + lock.lock(); + try { + DfsClientShm shm = (DfsClientShm)slot.getShm(); + shm.getEndpointShmManager().freeSlot(slot); + } finally { + lock.unlock(); + } + } + + @VisibleForTesting + public static class PerDatanodeVisitorInfo { + public final TreeMap<ShmId, DfsClientShm> full; + public final TreeMap<ShmId, DfsClientShm> notFull; + public final boolean disabled; + + PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full, + TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) { + this.full = full; + this.notFull = notFull; + this.disabled = disabled; + } + } + + @VisibleForTesting + public interface Visitor { + void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) + throws IOException; + } + + @VisibleForTesting + public void visit(Visitor visitor) throws IOException { + lock.lock(); + try { + HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = + new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>(); + for (Entry<DatanodeInfo, EndpointShmManager> entry : + datanodes.entrySet()) { + info.put(entry.getKey(), entry.getValue().getVisitorInfo()); + } + visitor.visit(info); + } finally { + lock.unlock(); + } + } + + /** + * Close the DfsClientShmManager. + */ + @Override + public void close() throws IOException { + lock.lock(); + try { + if (closed) return; + closed = true; + } finally { + lock.unlock(); + } + // When closed, the domainSocketWatcher will issue callbacks that mark + // all the outstanding DfsClientShm segments as stale. + IOUtils.cleanup(LOG, domainSocketWatcher); + } + + + @Override + public String toString() { + return String.format("ShortCircuitShmManager(%08x)", + System.identityHashCode(this)); + } + + @VisibleForTesting + public DomainSocketWatcher getDomainSocketWatcher() { + return domainSocketWatcher; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index 15b8dea..db4cbe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; @@ -201,7 +201,7 @@ public class ShortCircuitCache implements Closeable { DataInputStream in = new DataInputStream(sock.getInputStream()); ReleaseShortCircuitAccessResponseProto resp = ReleaseShortCircuitAccessResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (resp.getStatus() != Status.SUCCESS) { String error = resp.hasError() ? resp.getError() : "(unknown)"; throw new IOException(resp.getStatus().toString() + ": " + error); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java new file mode 100644 index 0000000..7b89d0a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java @@ -0,0 +1,646 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.BitSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Random; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; + +import sun.misc.Unsafe; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; +import com.google.common.primitives.Ints; + +/** + * A shared memory segment used to implement short-circuit reads. + */ +public class ShortCircuitShm { + private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class); + + protected static final int BYTES_PER_SLOT = 64; + + private static final Unsafe unsafe = safetyDance(); + + private static Unsafe safetyDance() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe)f.get(null); + } catch (Throwable e) { + LOG.error("failed to load misc.Unsafe", e); + } + return null; + } + + /** + * Calculate the usable size of a shared memory segment. + * We round down to a multiple of the slot size and do some validation. + * + * @param stream The stream we're using. + * @return The usable size of the shared memory segment. + */ + private static int getUsableLength(FileInputStream stream) + throws IOException { + int intSize = Ints.checkedCast(stream.getChannel().size()); + int slots = intSize / BYTES_PER_SLOT; + if (slots == 0) { + throw new IOException("size of shared memory segment was " + + intSize + ", but that is not enough to hold even one slot."); + } + return slots * BYTES_PER_SLOT; + } + + /** + * Identifies a DfsClientShm. + */ + public static class ShmId implements Comparable<ShmId> { + private static final Random random = new Random(); + private final long hi; + private final long lo; + + /** + * Generate a random ShmId. + * + * We generate ShmIds randomly to prevent a malicious client from + * successfully guessing one and using that to interfere with another + * client. + */ + public static ShmId createRandom() { + return new ShmId(random.nextLong(), random.nextLong()); + } + + public ShmId(long hi, long lo) { + this.hi = hi; + this.lo = lo; + } + + public long getHi() { + return hi; + } + + public long getLo() { + return lo; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + ShmId other = (ShmId)o; + return new EqualsBuilder(). + append(hi, other.hi). + append(lo, other.lo). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.hi). + append(this.lo). + toHashCode(); + } + + @Override + public String toString() { + return String.format("%016x%016x", hi, lo); + } + + @Override + public int compareTo(ShmId other) { + return ComparisonChain.start(). + compare(hi, other.hi). + compare(lo, other.lo). + result(); + } + }; + + /** + * Uniquely identifies a slot. + */ + public static class SlotId { + private final ShmId shmId; + private final int slotIdx; + + public SlotId(ShmId shmId, int slotIdx) { + this.shmId = shmId; + this.slotIdx = slotIdx; + } + + public ShmId getShmId() { + return shmId; + } + + public int getSlotIdx() { + return slotIdx; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + SlotId other = (SlotId)o; + return new EqualsBuilder(). + append(shmId, other.shmId). + append(slotIdx, other.slotIdx). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.shmId). + append(this.slotIdx). + toHashCode(); + } + + @Override + public String toString() { + return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx); + } + } + + public class SlotIterator implements Iterator<Slot> { + int slotIdx = -1; + + @Override + public boolean hasNext() { + synchronized (ShortCircuitShm.this) { + return allocatedSlots.nextSetBit(slotIdx + 1) != -1; + } + } + + @Override + public Slot next() { + synchronized (ShortCircuitShm.this) { + int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1); + if (nextSlotIdx == -1) { + throw new NoSuchElementException(); + } + slotIdx = nextSlotIdx; + return slots[nextSlotIdx]; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("SlotIterator " + + "doesn't support removal"); + } + } + + /** + * A slot containing information about a replica. + * + * The format is: + * word 0 + * bit 0:32 Slot flags (see below). + * bit 33:63 Anchor count. + * word 1:7 + * Reserved for future use, such as statistics. + * Padding is also useful for avoiding false sharing. + * + * Little-endian versus big-endian is not relevant here since both the client + * and the server reside on the same computer and use the same orientation. + */ + public class Slot { + /** + * Flag indicating that the slot is valid. + * + * The DFSClient sets this flag when it allocates a new slot within one of + * its shared memory regions. + * + * The DataNode clears this flag when the replica associated with this slot + * is no longer valid. The client itself also clears this flag when it + * believes that the DataNode is no longer using this slot to communicate. + */ + private static final long VALID_FLAG = 1L<<63; + + /** + * Flag indicating that the slot can be anchored. + */ + private static final long ANCHORABLE_FLAG = 1L<<62; + + /** + * The slot address in memory. + */ + private final long slotAddress; + + /** + * BlockId of the block this slot is used for. + */ + private final ExtendedBlockId blockId; + + Slot(long slotAddress, ExtendedBlockId blockId) { + this.slotAddress = slotAddress; + this.blockId = blockId; + } + + /** + * Get the short-circuit memory segment associated with this Slot. + * + * @return The enclosing short-circuit memory segment. + */ + public ShortCircuitShm getShm() { + return ShortCircuitShm.this; + } + + /** + * Get the ExtendedBlockId associated with this slot. + * + * @return The ExtendedBlockId of this slot. + */ + public ExtendedBlockId getBlockId() { + return blockId; + } + + /** + * Get the SlotId of this slot, containing both shmId and slotIdx. + * + * @return The SlotId of this slot. + */ + public SlotId getSlotId() { + return new SlotId(getShmId(), getSlotIdx()); + } + + /** + * Get the Slot index. + * + * @return The index of this slot. + */ + public int getSlotIdx() { + return Ints.checkedCast( + (slotAddress - baseAddress) / BYTES_PER_SLOT); + } + + /** + * Clear the slot. + */ + void clear() { + unsafe.putLongVolatile(null, this.slotAddress, 0); + } + + private boolean isSet(long flag) { + long prev = unsafe.getLongVolatile(null, this.slotAddress); + return (prev & flag) != 0; + } + + private void setFlag(long flag) { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & flag) != 0) { + return; + } + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev | flag)); + } + + private void clearFlag(long flag) { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & flag) == 0) { + return; + } + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev & (~flag))); + } + + public boolean isValid() { + return isSet(VALID_FLAG); + } + + public void makeValid() { + setFlag(VALID_FLAG); + } + + public void makeInvalid() { + clearFlag(VALID_FLAG); + } + + public boolean isAnchorable() { + return isSet(ANCHORABLE_FLAG); + } + + public void makeAnchorable() { + setFlag(ANCHORABLE_FLAG); + } + + public void makeUnanchorable() { + clearFlag(ANCHORABLE_FLAG); + } + + public boolean isAnchored() { + long prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & VALID_FLAG) == 0) { + // Slot is no longer valid. + return false; + } + return ((prev & 0x7fffffff) != 0); + } + + /** + * Try to add an anchor for a given slot. + * + * When a slot is anchored, we know that the block it refers to is resident + * in memory. + * + * @return True if the slot is anchored. + */ + public boolean addAnchor() { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + if ((prev & VALID_FLAG) == 0) { + // Slot is no longer valid. + return false; + } + if ((prev & ANCHORABLE_FLAG) == 0) { + // Slot can't be anchored right now. + return false; + } + if ((prev & 0x7fffffff) == 0x7fffffff) { + // Too many other threads have anchored the slot (2 billion?) + return false; + } + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev + 1)); + return true; + } + + /** + * Remove an anchor for a given slot. + */ + public void removeAnchor() { + long prev; + do { + prev = unsafe.getLongVolatile(null, this.slotAddress); + Preconditions.checkState((prev & 0x7fffffff) != 0, + "Tried to remove anchor for slot " + slotAddress +", which was " + + "not anchored."); + } while (!unsafe.compareAndSwapLong(null, this.slotAddress, + prev, prev - 1)); + } + + @Override + public String toString() { + return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")"; + } + } + + /** + * ID for this SharedMemorySegment. + */ + private final ShmId shmId; + + /** + * The base address of the memory-mapped file. + */ + private final long baseAddress; + + /** + * The mmapped length of the shared memory segment + */ + private final int mmappedLength; + + /** + * The slots associated with this shared memory segment. + * slot[i] contains the slot at offset i * BYTES_PER_SLOT, + * or null if that slot is not allocated. + */ + private final Slot slots[]; + + /** + * A bitset where each bit represents a slot which is in use. + */ + private final BitSet allocatedSlots; + + /** + * Create the ShortCircuitShm. + * + * @param shmId The ID to use. + * @param stream The stream that we're going to use to create this + * shared memory segment. + * + * Although this is a FileInputStream, we are going to + * assume that the underlying file descriptor is writable + * as well as readable. It would be more appropriate to use + * a RandomAccessFile here, but that class does not have + * any public accessor which returns a FileDescriptor, + * unlike FileInputStream. + */ + public ShortCircuitShm(ShmId shmId, FileInputStream stream) + throws IOException { + if (!NativeIO.isAvailable()) { + throw new UnsupportedOperationException("NativeIO is not available."); + } + if (Shell.WINDOWS) { + throw new UnsupportedOperationException( + "DfsClientShm is not yet implemented for Windows."); + } + if (unsafe == null) { + throw new UnsupportedOperationException( + "can't use DfsClientShm because we failed to " + + "load misc.Unsafe."); + } + this.shmId = shmId; + this.mmappedLength = getUsableLength(stream); + this.baseAddress = POSIX.mmap(stream.getFD(), + POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); + this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; + this.allocatedSlots = new BitSet(slots.length); + if (LOG.isTraceEnabled()) { + LOG.trace("creating " + this.getClass().getSimpleName() + + "(shmId=" + shmId + + ", mmappedLength=" + mmappedLength + + ", baseAddress=" + String.format("%x", baseAddress) + + ", slots.length=" + slots.length + ")"); + } + } + + public final ShmId getShmId() { + return shmId; + } + + /** + * Determine if this shared memory object is empty. + * + * @return True if the shared memory object is empty. + */ + synchronized final public boolean isEmpty() { + return allocatedSlots.nextSetBit(0) == -1; + } + + /** + * Determine if this shared memory object is full. + * + * @return True if the shared memory object is full. + */ + synchronized final public boolean isFull() { + return allocatedSlots.nextClearBit(0) >= slots.length; + } + + /** + * Calculate the base address of a slot. + * + * @param slotIdx Index of the slot. + * @return The base address of the slot. + */ + private final long calculateSlotAddress(int slotIdx) { + long offset = slotIdx; + offset *= BYTES_PER_SLOT; + return this.baseAddress + offset; + } + + /** + * Allocate a new slot and register it. + * + * This function chooses an empty slot, initializes it, and then returns + * the relevant Slot object. + * + * @return The new slot. + */ + synchronized public final Slot allocAndRegisterSlot( + ExtendedBlockId blockId) { + int idx = allocatedSlots.nextClearBit(0); + if (idx >= slots.length) { + throw new RuntimeException(this + ": no more slots are available."); + } + allocatedSlots.set(idx, true); + Slot slot = new Slot(calculateSlotAddress(idx), blockId); + slot.clear(); + slot.makeValid(); + slots[idx] = slot; + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + + StringUtils.getStackTrace(Thread.currentThread())); + } + return slot; + } + + synchronized public final Slot getSlot(int slotIdx) + throws InvalidRequestException { + if (!allocatedSlots.get(slotIdx)) { + throw new InvalidRequestException(this + ": slot " + slotIdx + + " does not exist."); + } + return slots[slotIdx]; + } + + /** + * Register a slot. + * + * This function looks at a slot which has already been initialized (by + * another process), and registers it with us. Then, it returns the + * relevant Slot object. + * + * @return The slot. + * + * @throws InvalidRequestException + * If the slot index we're trying to allocate has not been + * initialized, or is already in use. + */ + synchronized public final Slot registerSlot(int slotIdx, + ExtendedBlockId blockId) throws InvalidRequestException { + if (slotIdx < 0) { + throw new InvalidRequestException(this + ": invalid negative slot " + + "index " + slotIdx); + } + if (slotIdx >= slots.length) { + throw new InvalidRequestException(this + ": invalid slot " + + "index " + slotIdx); + } + if (allocatedSlots.get(slotIdx)) { + throw new InvalidRequestException(this + ": slot " + slotIdx + + " is already in use."); + } + Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); + if (!slot.isValid()) { + throw new InvalidRequestException(this + ": slot " + slotIdx + + " is not marked as valid."); + } + slots[slotIdx] = slot; + allocatedSlots.set(slotIdx, true); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots + + StringUtils.getStackTrace(Thread.currentThread())); + } + return slot; + } + + /** + * Unregisters a slot. + * + * This doesn't alter the contents of the slot. It just means + * + * @param slotIdx Index of the slot to unregister. + */ + synchronized public final void unregisterSlot(int slotIdx) { + Preconditions.checkState(allocatedSlots.get(slotIdx), + "tried to unregister slot " + slotIdx + ", which was not registered."); + allocatedSlots.set(slotIdx, false); + slots[slotIdx] = null; + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": unregisterSlot " + slotIdx); + } + } + + /** + * Iterate over all allocated slots. + * + * Note that this method isn't safe if + * + * @return The slot iterator. + */ + public SlotIterator slotIterator() { + return new SlotIterator(); + } + + public void free() { + try { + POSIX.munmap(baseAddress, mmappedLength); + } catch (IOException e) { + LOG.warn(this + ": failed to munmap", e); + } + LOG.trace(this + ": freed"); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "(" + shmId + ")"; + } +}
