Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Dec 13 17:28:14 2013 @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; @@ -57,12 +58,12 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto; @@ -133,6 +134,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -264,17 +267,20 @@ public class PBHelper { // DatanodeId public static DatanodeID convert(DatanodeIDProto dn) { - return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(), + return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getDatanodeUuid(), dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn .getInfoSecurePort() : 0, dn.getIpcPort()); } public static DatanodeIDProto convert(DatanodeID dn) { + // For wire compatibility with older versions we transmit the StorageID + // which is the same as the DatanodeUuid. Since StorageID is a required + // field we pass the empty string if the DatanodeUuid is not yet known. return DatanodeIDProto.newBuilder() .setIpAddr(dn.getIpAddr()) .setHostName(dn.getHostName()) - .setStorageID(dn.getStorageID()) .setXferPort(dn.getXferPort()) + .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") .setInfoPort(dn.getInfoPort()) .setInfoSecurePort(dn.getInfoSecurePort()) .setIpcPort(dn.getIpcPort()).build(); @@ -316,12 +322,16 @@ public class PBHelper { public static BlockWithLocationsProto convert(BlockWithLocations blk) { return BlockWithLocationsProto.newBuilder() .setBlock(convert(blk.getBlock())) - .addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build(); + .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) + .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { - return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList() - .toArray(new String[0])); + final List<String> datanodeUuids = b.getDatanodeUuidsList(); + final List<String> storageUuids = b.getStorageUuidsList(); + return new BlockWithLocations(convert(b.getBlock()), + datanodeUuids.toArray(new String[datanodeUuids.size()]), + storageUuids.toArray(new String[storageUuids.size()])); } public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { @@ -623,6 +633,17 @@ public class PBHelper { "Found additional cached replica locations that are not in the set of" + " storage-backed locations!"); + StorageType[] storageTypes = b.getStorageTypes(); + if (storageTypes != null) { + for (int i = 0; i < storageTypes.length; ++i) { + builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); + } + } + final String[] storageIDs = b.getStorageIDs(); + if (storageIDs != null) { + builder.addAllStorageIDs(Arrays.asList(storageIDs)); + } + return builder.setB(PBHelper.convert(b.getBlock())) .setBlockToken(PBHelper.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); @@ -635,6 +656,25 @@ public class PBHelper { for (int i = 0; i < locs.size(); i++) { targets[i] = PBHelper.convert(locs.get(i)); } + + final int storageTypesCount = proto.getStorageTypesCount(); + final StorageType[] storageTypes; + if (storageTypesCount == 0) { + storageTypes = null; + } else { + Preconditions.checkState(storageTypesCount == locs.size()); + storageTypes = convertStorageTypeProtos(proto.getStorageTypesList()); + } + + final int storageIDsCount = proto.getStorageIDsCount(); + final String[] storageIDs; + if (storageIDsCount == 0) { + storageIDs = null; + } else { + Preconditions.checkState(storageIDsCount == locs.size()); + storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]); + } + // Set values from the isCached list, re-using references from loc List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size()); List<Boolean> isCachedList = proto.getIsCachedList(); @@ -645,7 +685,7 @@ public class PBHelper { } LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, - proto.getOffset(), proto.getCorrupt(), + storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(), cachedLocs.toArray(new DatanodeInfo[0])); lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); @@ -789,7 +829,8 @@ public class PBHelper { for (int i = 0; i < blocks.length; i++) { builder.addBlocks(PBHelper.convert(blocks[i])); } - builder.addAllTargets(PBHelper.convert(cmd.getTargets())); + builder.addAllTargets(convert(cmd.getTargets())) + .addAllTargetStorageUuids(convert(cmd.getTargetStorageIDs())); return builder.build(); } @@ -822,6 +863,15 @@ public class PBHelper { return Arrays.asList(ret); } + private static List<StorageUuidsProto> convert(String[][] targetStorageUuids) { + StorageUuidsProto[] ret = new StorageUuidsProto[targetStorageUuids.length]; + for (int i = 0; i < targetStorageUuids.length; i++) { + ret[i] = StorageUuidsProto.newBuilder() + .addAllStorageUuids(Arrays.asList(targetStorageUuids[i])).build(); + } + return Arrays.asList(ret); + } + public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder(); if (datanodeCommand == null) { @@ -901,6 +951,14 @@ public class PBHelper { for (int i = 0; i < targetList.size(); i++) { targets[i] = PBHelper.convert(targetList.get(i)); } + + List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList(); + String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][]; + for(int i = 0; i < targetStorageIDs.length; i++) { + List<String> storageIDs = targetStorageUuidsList.get(i).getStorageUuidsList(); + targetStorageIDs[i] = storageIDs.toArray(new String[storageIDs.size()]); + } + int action = DatanodeProtocol.DNA_UNKNOWN; switch (blkCmd.getAction()) { case TRANSFER: @@ -915,7 +973,8 @@ public class PBHelper { default: throw new AssertionError("Unknown action type: " + blkCmd.getAction()); } - return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); + return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets, + targetStorageIDs); } public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { @@ -1445,11 +1504,12 @@ public class PBHelper { public static DatanodeStorageProto convert(DatanodeStorage s) { return DatanodeStorageProto.newBuilder() - .setState(PBHelper.convert(s.getState())) - .setStorageID(s.getStorageID()).build(); + .setState(PBHelper.convertState(s.getState())) + .setStorageType(PBHelper.convertStorageType(s.getStorageType())) + .setStorageUuid(s.getStorageID()).build(); } - private static StorageState convert(State state) { + private static StorageState convertState(State state) { switch(state) { case READ_ONLY: return StorageState.READ_ONLY; @@ -1459,11 +1519,26 @@ public class PBHelper { } } + private static StorageTypeProto convertStorageType( + StorageType type) { + switch(type) { + case DISK: + return StorageTypeProto.DISK; + case SSD: + return StorageTypeProto.SSD; + default: + throw new IllegalStateException( + "BUG: StorageType not found, type=" + type); + } + } + public static DatanodeStorage convert(DatanodeStorageProto s) { - return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState())); + return new DatanodeStorage(s.getStorageUuid(), + PBHelper.convertState(s.getState()), + PBHelper.convertType(s.getStorageType())); } - private static State convert(StorageState state) { + private static State convertState(StorageState state) { switch(state) { case READ_ONLY: return DatanodeStorage.State.READ_ONLY; @@ -1473,14 +1548,50 @@ public class PBHelper { } } + private static StorageType convertType(StorageTypeProto type) { + switch(type) { + case DISK: + return StorageType.DISK; + case SSD: + return StorageType.SSD; + default: + throw new IllegalStateException( + "BUG: StorageTypeProto not found, type=" + type); + } + } + + private static StorageType[] convertStorageTypeProtos( + List<StorageTypeProto> storageTypesList) { + final StorageType[] storageTypes = new StorageType[storageTypesList.size()]; + for (int i = 0; i < storageTypes.length; ++i) { + storageTypes[i] = PBHelper.convertType(storageTypesList.get(i)); + } + return storageTypes; + } + public static StorageReportProto convert(StorageReport r) { StorageReportProto.Builder builder = StorageReportProto.newBuilder() .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity()) .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) - .setStorageID(r.getStorageID()); + .setStorageUuid(r.getStorageID()); return builder.build(); } + public static StorageReport convert(StorageReportProto p) { + return new StorageReport(p.getStorageUuid(), p.getFailed(), + p.getCapacity(), p.getDfsUsed(), p.getRemaining(), + p.getBlockPoolUsed()); + } + + public static StorageReport[] convertStorageReports( + List<StorageReportProto> list) { + final StorageReport[] report = new StorageReport[list.size()]; + for (int i = 0; i < report.length; i++) { + report[i] = convert(list.get(i)); + } + return report; + } + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; @@ -1838,3 +1949,4 @@ public class PBHelper { return GetAclStatusResponseProto.newBuilder().setResult(r).build(); } } +
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Fri Dec 13 17:28:14 2013 @@ -109,7 +109,7 @@ interface AsyncLogger { * Fetch the list of edit logs available on the remote node. */ public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( - long fromTxnId, boolean forReading, boolean inProgressOk); + long fromTxnId, boolean inProgressOk); /** * Prepare recovery. See the HDFS-3077 design document for details. Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Fri Dec 13 17:28:14 2013 @@ -261,13 +261,13 @@ class AsyncLoggerSet { } public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest( - long fromTxnId, boolean forReading, boolean inProgressOk) { + long fromTxnId, boolean inProgressOk) { Map<AsyncLogger, ListenableFuture<RemoteEditLogManifest>> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { ListenableFuture<RemoteEditLogManifest> future = - logger.getEditLogManifest(fromTxnId, forReading, inProgressOk); + logger.getEditLogManifest(fromTxnId, inProgressOk); calls.put(logger, future); } return QuorumCall.create(calls); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Fri Dec 13 17:28:14 2013 @@ -181,6 +181,7 @@ public class IPCLoggerChannel implements @Override public void close() { + QuorumJournalManager.LOG.info("Closing", new Exception()); // No more tasks may be submitted after this point. executor.shutdown(); if (proxy != null) { @@ -520,13 +521,12 @@ public class IPCLoggerChannel implements @Override public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( - final long fromTxnId, final boolean forReading, - final boolean inProgressOk) { + final long fromTxnId, final boolean inProgressOk) { return executor.submit(new Callable<RemoteEditLogManifest>() { @Override public RemoteEditLogManifest call() throws IOException { GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( - journalId, fromTxnId, forReading, inProgressOk); + journalId, fromTxnId, inProgressOk); // Update the http port, since we need this to build URLs to any of the // returned logs. constructHttpServerURI(ret); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Fri Dec 13 17:28:14 2013 @@ -449,18 +449,13 @@ public class QuorumJournalManager implem public void close() throws IOException { loggers.close(); } - - public void selectInputStreams(Collection<EditLogInputStream> streams, - long fromTxnId, boolean inProgressOk) throws IOException { - selectInputStreams(streams, fromTxnId, inProgressOk, true); - } @Override public void selectInputStreams(Collection<EditLogInputStream> streams, - long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { + long fromTxnId, boolean inProgressOk) throws IOException { QuorumCall<AsyncLogger, RemoteEditLogManifest> q = - loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk); + loggers.getEditLogManifest(fromTxnId, inProgressOk); Map<AsyncLogger, RemoteEditLogManifest> resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, "selectInputStreams"); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Fri Dec 13 17:28:14 2013 @@ -123,14 +123,12 @@ public interface QJournalProtocol { /** * @param jid the journal from which to enumerate edits * @param sinceTxId the first transaction which the client cares about - * @param forReading whether or not the caller intends to read from the edit - * logs * @param inProgressOk whether or not to check the in-progress edit log * segment * @return a list of edit log segments since the given transaction ID. */ public GetEditLogManifestResponseProto getEditLogManifest(String jid, - long sinceTxId, boolean forReading, boolean inProgressOk) + long sinceTxId, boolean inProgressOk) throws IOException; /** Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Fri Dec 13 17:28:14 2013 @@ -203,7 +203,6 @@ public class QJournalProtocolServerSideT return impl.getEditLogManifest( request.getJid().getIdentifier(), request.getSinceTxId(), - request.getForReading(), request.getInProgressOk()); } catch (IOException e) { throw new ServiceException(e); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Fri Dec 13 17:28:14 2013 @@ -228,14 +228,13 @@ public class QJournalProtocolTranslatorP @Override public GetEditLogManifestResponseProto getEditLogManifest(String jid, - long sinceTxId, boolean forReading, boolean inProgressOk) + long sinceTxId, boolean inProgressOk) throws IOException { try { return rpcProxy.getEditLogManifest(NULL_CONTROLLER, GetEditLogManifestRequestProto.newBuilder() .setJid(convertJournalId(jid)) .setSinceTxId(sinceTxId) - .setForReading(forReading) .setInProgressOk(inProgressOk) .build()); } catch (ServiceException e) { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Fri Dec 13 17:28:14 2013 @@ -630,15 +630,12 @@ class Journal implements Closeable { * @see QJournalProtocol#getEditLogManifest(String, long) */ public RemoteEditLogManifest getEditLogManifest(long sinceTxId, - boolean forReading, boolean inProgressOk) throws IOException { + boolean inProgressOk) throws IOException { // No need to checkRequest() here - anyone may ask for the list // of segments. checkFormatted(); - // if this is for reading, ignore the in-progress editlog segment - inProgressOk = forReading ? false : inProgressOk; - List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading, - inProgressOk); + List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk); if (inProgressOk) { RemoteEditLog log = null; Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Fri Dec 13 17:28:14 2013 @@ -178,11 +178,11 @@ class JournalNodeRpcServer implements QJ @SuppressWarnings("deprecation") @Override public GetEditLogManifestResponseProto getEditLogManifest(String jid, - long sinceTxId, boolean forReading, boolean inProgressOk) + long sinceTxId, boolean inProgressOk) throws IOException { RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid) - .getEditLogManifest(sinceTxId, forReading, inProgressOk); + .getEditLogManifest(sinceTxId, inProgressOk); return GetEditLogManifestResponseProto.newBuilder() .setManifest(PBHelper.convert(manifest)) Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Dec 13 17:28:14 2013 @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; - import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.BufferedInputStream; @@ -221,9 +220,9 @@ public class Balancer { private Map<Block, BalancerBlock> globalBlockList = new HashMap<Block, BalancerBlock>(); private MovedBlocks movedBlocks = new MovedBlocks(); - // Map storage IDs to BalancerDatanodes - private Map<String, BalancerDatanode> datanodes - = new HashMap<String, BalancerDatanode>(); + /** Map (datanodeUuid -> BalancerDatanodes) */ + private final Map<String, BalancerDatanode> datanodeMap + = new HashMap<String, BalancerDatanode>(); private NetworkTopology cluster; @@ -241,6 +240,14 @@ public class Balancer { private PendingBlockMove() { } + @Override + public String toString() { + final Block b = block.getBlock(); + return b + " with size=" + b.getNumBytes() + " from " + + source.getDisplayName() + " to " + target.getDisplayName() + + " through " + proxySource.getDisplayName(); + } + /* choose a block & a proxy source for this pendingMove * whose source & target have already been chosen. * @@ -272,11 +279,7 @@ public class Balancer { if ( chooseProxySource() ) { movedBlocks.add(block); if (LOG.isDebugEnabled()) { - LOG.debug("Decided to move block "+ block.getBlockId() - +" with a length of "+StringUtils.byteDesc(block.getNumBytes()) - + " bytes from " + source.getDisplayName() - + " to " + target.getDisplayName() - + " using proxy source " + proxySource.getDisplayName() ); + LOG.debug("Decided to move " + this); } return true; } @@ -292,26 +295,27 @@ public class Balancer { */ private boolean chooseProxySource() { final DatanodeInfo targetDN = target.getDatanode(); - boolean find = false; - for (BalancerDatanode loc : block.getLocations()) { - // check if there is replica which is on the same rack with the target - if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { - find = true; - // if cluster is not nodegroup aware or the proxy is on the same - // nodegroup with target, then we already find the nearest proxy - if (!cluster.isNodeGroupAware() - || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) { + // if node group is supported, first try add nodes in the same node group + if (cluster.isNodeGroupAware()) { + for (BalancerDatanode loc : block.getLocations()) { + if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) { return true; } } - - if (!find) { - // find out a non-busy replica out of rack of target - find = addTo(loc); + } + // check if there is replica which is on the same rack with the target + for (BalancerDatanode loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { + return true; } } - - return find; + // find out a non-busy replica + for (BalancerDatanode loc : block.getLocations()) { + if (addTo(loc)) { + return true; + } + } + return false; } // add a BalancerDatanode as proxy source for specific block movement @@ -352,17 +356,9 @@ public class Balancer { sendRequest(out); receiveResponse(in); bytesMoved.inc(block.getNumBytes()); - LOG.info( "Moving block " + block.getBlock().getBlockId() + - " from "+ source.getDisplayName() + " to " + - target.getDisplayName() + " through " + - proxySource.getDisplayName() + - " is succeeded." ); + LOG.info("Successfully moved " + this); } catch (IOException e) { - LOG.warn("Error moving block "+block.getBlockId()+ - " from " + source.getDisplayName() + " to " + - target.getDisplayName() + " through " + - proxySource.getDisplayName() + - ": "+e.getMessage()); + LOG.warn("Failed to move " + this + ": " + e.getMessage()); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); @@ -414,9 +410,7 @@ public class Balancer { @Override public void run() { if (LOG.isDebugEnabled()) { - LOG.debug("Starting moving "+ block.getBlockId() + - " from " + proxySource.getDisplayName() + " to " + - target.getDisplayName()); + LOG.debug("Start moving " + PendingBlockMove.this); } dispatch(); } @@ -463,11 +457,6 @@ public class Balancer { return block; } - /* Return the block id */ - private long getBlockId() { - return block.getBlockId(); - } - /* Return the length of the block */ private long getNumBytes() { return block.getNumBytes(); @@ -551,7 +540,7 @@ public class Balancer { /* Get the storage id of the datanode */ protected String getStorageID() { - return datanode.getStorageID(); + return datanode.getDatanodeUuid(); } /** Decide if still need to move more bytes */ @@ -674,10 +663,10 @@ public class Balancer { synchronized (block) { // update locations - for ( String storageID : blk.getStorageIDs() ) { - BalancerDatanode datanode = datanodes.get(storageID); + for (String datanodeUuid : blk.getDatanodeUuids()) { + final BalancerDatanode d = datanodeMap.get(datanodeUuid); if (datanode != null) { // not an unknown datanode - block.addLocation(datanode); + block.addLocation(d); } } } @@ -851,16 +840,6 @@ public class Balancer { DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); } - /* Shuffle datanode array */ - static private void shuffleArray(DatanodeInfo[] datanodes) { - for (int i=datanodes.length; i>1; i--) { - int randomIndex = DFSUtil.getRandom().nextInt(i); - DatanodeInfo tmp = datanodes[randomIndex]; - datanodes[randomIndex] = datanodes[i-1]; - datanodes[i-1] = tmp; - } - } - /* Given a data node set, build a network topology and decide * over-utilized datanodes, above average utilized datanodes, * below average utilized datanodes, and underutilized datanodes. @@ -890,8 +869,7 @@ public class Balancer { * an increasing order or a decreasing order. */ long overLoadedBytes = 0L, underLoadedBytes = 0L; - shuffleArray(datanodes); - for (DatanodeInfo datanode : datanodes) { + for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) { if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { continue; // ignore decommissioning or decommissioned nodes } @@ -922,13 +900,13 @@ public class Balancer { datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); } } - this.datanodes.put(datanode.getStorageID(), datanodeS); + datanodeMap.put(datanode.getDatanodeUuid(), datanodeS); } //logging logNodes(); - assert (this.datanodes.size() == + assert (this.datanodeMap.size() == overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()) : "Mismatched number of datanodes"; @@ -1000,9 +978,9 @@ public class Balancer { // At last, match all remaining nodes chooseNodes(ANY_OTHER); - assert (datanodes.size() >= sources.size()+targets.size()) + assert (datanodeMap.size() >= sources.size()+targets.size()) : "Mismatched number of datanodes (" + - datanodes.size() + " total, " + + datanodeMap.size() + " total, " + sources.size() + " sources, " + targets.size() + " targets)"; @@ -1303,7 +1281,7 @@ public class Balancer { this.aboveAvgUtilizedDatanodes.clear(); this.belowAvgUtilizedDatanodes.clear(); this.underUtilizedDatanodes.clear(); - this.datanodes.clear(); + this.datanodeMap.clear(); this.sources.clear(); this.targets.clear(); this.policy.reset(); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Fri Dec 13 17:28:14 2013 @@ -75,7 +75,7 @@ public interface BlockCollection { * and set the locations. */ public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock, - DatanodeDescriptor[] locations) throws IOException; + DatanodeStorageInfo[] targets) throws IOException; /** * @return whether the block collection is under construction. Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Fri Dec 13 17:28:14 2013 @@ -21,6 +21,7 @@ import java.util.LinkedList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; @@ -39,11 +40,11 @@ public class BlockInfo extends Block imp private LightWeightGSet.LinkedElement nextLinkedElement; /** - * This array contains triplets of references. For each i-th datanode the - * block belongs to triplets[3*i] is the reference to the DatanodeDescriptor - * and triplets[3*i+1] and triplets[3*i+2] are references to the previous and - * the next blocks, respectively, in the list of blocks belonging to this - * data-node. + * This array contains triplets of references. For each i-th storage, the + * block belongs to triplets[3*i] is the reference to the + * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are + * references to the previous and the next blocks, respectively, in the list + * of blocks belonging to this storage. * * Using previous and next in Object triplets is done instead of a * {@link LinkedList} list to efficiently use memory. With LinkedList the cost @@ -86,9 +87,14 @@ public class BlockInfo extends Block imp } public DatanodeDescriptor getDatanode(int index) { + DatanodeStorageInfo storage = getStorageInfo(index); + return storage == null ? null : storage.getDatanodeDescriptor(); + } + + DatanodeStorageInfo getStorageInfo(int index) { assert this.triplets != null : "BlockInfo is not initialized"; assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - return (DatanodeDescriptor)triplets[index*3]; + return (DatanodeStorageInfo)triplets[index*3]; } private BlockInfo getPrevious(int index) { @@ -111,14 +117,10 @@ public class BlockInfo extends Block imp return info; } - private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous, - BlockInfo next) { + private void setStorageInfo(int index, DatanodeStorageInfo storage) { assert this.triplets != null : "BlockInfo is not initialized"; - int i = index * 3; - assert index >= 0 && i+2 < triplets.length : "Index is out of bound"; - triplets[i] = node; - triplets[i+1] = previous; - triplets[i+2] = next; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + triplets[index*3] = storage; } /** @@ -190,22 +192,34 @@ public class BlockInfo extends Block imp } /** - * Add data-node this block belongs to. + * Add a {@link DatanodeStorageInfo} location for a block */ - public boolean addNode(DatanodeDescriptor node) { - if(findDatanode(node) >= 0) // the node is already there - return false; + boolean addStorage(DatanodeStorageInfo storage) { + boolean added = true; + int idx = findDatanode(storage.getDatanodeDescriptor()); + if(idx >= 0) { + if (getStorageInfo(idx) == storage) { // the storage is already there + return false; + } else { + // The block is on the DN but belongs to a different storage. + // Update our state. + removeStorage(storage); + added = false; // Just updating storage. Return false. + } + } // find the last null node int lastNode = ensureCapacity(1); - setDatanode(lastNode, node, null, null); - return true; + setStorageInfo(lastNode, storage); + setNext(lastNode, null); + setPrevious(lastNode, null); + return added; } /** - * Remove data-node from the block. + * Remove {@link DatanodeStorageInfo} location for a block */ - public boolean removeNode(DatanodeDescriptor node) { - int dnIndex = findDatanode(node); + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfo(storage); if(dnIndex < 0) // the node is not found return false; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : @@ -213,10 +227,13 @@ public class BlockInfo extends Block imp // find the last not null node int lastNode = numNodes()-1; // replace current node triplet by the lastNode one - setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode), - getNext(lastNode)); + setStorageInfo(dnIndex, getStorageInfo(lastNode)); + setNext(dnIndex, getNext(lastNode)); + setPrevious(dnIndex, getPrevious(lastNode)); // set the last triplet to null - setDatanode(lastNode, null, null, null); + setStorageInfo(lastNode, null); + setNext(lastNode, null); + setPrevious(lastNode, null); return true; } @@ -236,37 +253,70 @@ public class BlockInfo extends Block imp } return -1; } + /** + * Find specified DatanodeStorageInfo. + * @param dn + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeInfo dn) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur == null) + break; + if(cur.getDatanodeDescriptor() == dn) + return idx; + } + return -1; + } + + /** + * Find specified DatanodeStorageInfo. + * @param storageInfo + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeStorageInfo storageInfo) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur == storageInfo) + return idx; + if(cur == null) + break; + } + return -1; + } /** * Insert this block into the head of the list of blocks - * related to the specified DatanodeDescriptor. + * related to the specified DatanodeStorageInfo. * If the head is null then form a new list. * @return current block as the new head of the list. */ - public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) { - int dnIndex = this.findDatanode(dn); + BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { + int dnIndex = this.findStorageInfo(storage); assert dnIndex >= 0 : "Data node is not found: current"; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : "Block is already in the list and cannot be inserted."; this.setPrevious(dnIndex, null); this.setNext(dnIndex, head); if(head != null) - head.setPrevious(head.findDatanode(dn), this); + head.setPrevious(head.findStorageInfo(storage), this); return this; } /** * Remove this block from the list of blocks - * related to the specified DatanodeDescriptor. + * related to the specified DatanodeStorageInfo. * If this block is the head of the list then return the next block as * the new head. * @return the new head of the list or null if the list becomes - * empty after deletion. + * empy after deletion. */ - public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) { + BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { if(head == null) return null; - int dnIndex = this.findDatanode(dn); + int dnIndex = this.findStorageInfo(storage); if(dnIndex < 0) // this block is not on the data-node list return head; @@ -275,9 +325,9 @@ public class BlockInfo extends Block imp this.setNext(dnIndex, null); this.setPrevious(dnIndex, null); if(prev != null) - prev.setNext(prev.findDatanode(dn), next); + prev.setNext(prev.findStorageInfo(storage), next); if(next != null) - next.setPrevious(next.findDatanode(dn), prev); + next.setPrevious(next.findStorageInfo(storage), prev); if(this == head) // removing the head head = next; return head; @@ -289,7 +339,7 @@ public class BlockInfo extends Block imp * * @return the new head of the list. */ - public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn, + public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage, int curIndex, int headIndex) { if (head == this) { return this; @@ -298,9 +348,9 @@ public class BlockInfo extends Block imp BlockInfo prev = this.setPrevious(curIndex, null); head.setPrevious(headIndex, this); - prev.setNext(prev.findDatanode(dn), next); + prev.setNext(prev.findStorageInfo(storage), next); if (next != null) - next.setPrevious(next.findDatanode(dn), prev); + next.setPrevious(next.findStorageInfo(storage), prev); return this; } @@ -328,10 +378,10 @@ public class BlockInfo extends Block imp * @return BlockInfoUnderConstruction - an under construction block. */ public BlockInfoUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeDescriptor[] targets) { + BlockUCState s, DatanodeStorageInfo[] targets) { if(isComplete()) { - return new BlockInfoUnderConstruction( - this, getBlockCollection().getBlockReplication(), s, targets); + return new BlockInfoUnderConstruction(this, + getBlockCollection().getBlockReplication(), s, targets); } // the block is already under construction BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this; Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1550774&r1=1550773&r2=1550774&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Fri Dec 13 17:28:14 2013 @@ -63,12 +63,12 @@ public class BlockInfoUnderConstruction * corresponding replicas. */ static class ReplicaUnderConstruction extends Block { - private DatanodeDescriptor expectedLocation; + private final DatanodeStorageInfo expectedLocation; private ReplicaState state; private boolean chosenAsPrimary; ReplicaUnderConstruction(Block block, - DatanodeDescriptor target, + DatanodeStorageInfo target, ReplicaState state) { super(block); this.expectedLocation = target; @@ -82,7 +82,7 @@ public class BlockInfoUnderConstruction * It is not guaranteed, but expected, that the data-node actually has * the replica. */ - DatanodeDescriptor getExpectedLocation() { + private DatanodeStorageInfo getExpectedStorageLocation() { return expectedLocation; } @@ -118,7 +118,7 @@ public class BlockInfoUnderConstruction * Is data-node the replica belongs to alive. */ boolean isAlive() { - return expectedLocation.isAlive; + return expectedLocation.getDatanodeDescriptor().isAlive; } @Override // Block @@ -162,7 +162,7 @@ public class BlockInfoUnderConstruction */ public BlockInfoUnderConstruction(Block blk, int replication, BlockUCState state, - DatanodeDescriptor[] targets) { + DatanodeStorageInfo[] targets) { super(blk, replication); assert getBlockUCState() != BlockUCState.COMPLETE : "BlockInfoUnderConstruction cannot be in COMPLETE state"; @@ -186,7 +186,7 @@ public class BlockInfoUnderConstruction } /** Set expected locations */ - public void setExpectedLocations(DatanodeDescriptor[] targets) { + public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations); for(int i = 0; i < numLocations; i++) @@ -198,12 +198,12 @@ public class BlockInfoUnderConstruction * Create array of expected replica locations * (as has been assigned by chooseTargets()). */ - public DatanodeDescriptor[] getExpectedLocations() { + public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); - DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations]; + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; for(int i = 0; i < numLocations; i++) - locations[i] = replicas.get(i).getExpectedLocation(); - return locations; + storages[i] = replicas.get(i).getExpectedStorageLocation(); + return storages; } /** Get the number of expected locations */ @@ -244,9 +244,9 @@ public class BlockInfoUnderConstruction // The replica list is unchanged. for (ReplicaUnderConstruction r : replicas) { if (genStamp != r.getGenerationStamp()) { - r.getExpectedLocation().removeBlock(this); + r.getExpectedStorageLocation().removeBlock(this); NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " - + "from location: " + r.getExpectedLocation()); + + "from location: " + r.getExpectedStorageLocation()); } } } @@ -302,31 +302,44 @@ public class BlockInfoUnderConstruction if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { continue; } - if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) { - primary = replicas.get(i); + final ReplicaUnderConstruction ruc = replicas.get(i); + final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate(); + if (lastUpdate > mostRecentLastUpdate) { primaryNodeIndex = i; - mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate(); + primary = ruc; + mostRecentLastUpdate = lastUpdate; } } if (primary != null) { - primary.getExpectedLocation().addBlockToBeRecovered(this); + primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); primary.setChosenAsPrimary(true); NameNode.blockStateChangeLog.info("BLOCK* " + this + " recovery started, primary=" + primary); } } - void addReplicaIfNotPresent(DatanodeDescriptor dn, + void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block, ReplicaState rState) { - for (ReplicaUnderConstruction r : replicas) { - if (r.getExpectedLocation() == dn) { + Iterator<ReplicaUnderConstruction> it = replicas.iterator(); + while (it.hasNext()) { + ReplicaUnderConstruction r = it.next(); + if(r.getExpectedStorageLocation() == storage) { // Record the gen stamp from the report r.setGenerationStamp(block.getGenerationStamp()); return; + } else if (r.getExpectedStorageLocation().getDatanodeDescriptor() == + storage.getDatanodeDescriptor()) { + + // The Datanode reported that the block is on a different storage + // than the one chosen by BlockPlacementPolicy. This can occur as + // we allow Datanodes to choose the target storage. Update our + // state by removing the stale entry and adding a new one. + it.remove(); + break; } } - replicas.add(new ReplicaUnderConstruction(block, dn, rState)); + replicas.add(new ReplicaUnderConstruction(block, storage, rState)); } @Override // BlockInfo