Author: szetszwo
Date: Mon Mar 19 22:09:14 2012
New Revision: 1302683
URL: http://svn.apache.org/viewvc?rev=1302683&view=rev
Log:
HDFS-3105. Add DatanodeStorage information to block recovery.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Mar 19
22:09:14 2012
@@ -241,6 +241,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo)
+ HDFS-3105. Add DatanodeStorage information to block recovery. (szetszwo)
+
OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
Mon Mar 19 22:09:14 2012
@@ -293,7 +293,8 @@ public class DatanodeProtocolClientSideT
@Override
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile,
- boolean deleteblock, DatanodeID[] newtargets) throws IOException {
+ boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages
+ ) throws IOException {
CommitBlockSynchronizationRequestProto.Builder builder =
CommitBlockSynchronizationRequestProto.newBuilder()
.setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
@@ -301,6 +302,7 @@ public class DatanodeProtocolClientSideT
.setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) {
builder.addNewTaragets(PBHelper.convert(newtargets[i]));
+ builder.addNewTargetStorages(newtargetstorages[i]);
}
CommitBlockSynchronizationRequestProto req = builder.build();
try {
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
Mon Mar 19 22:09:14 2012
@@ -259,10 +259,12 @@ public class DatanodeProtocolServerSideT
for (int i = 0; i < dnprotos.size(); i++) {
dns[i] = PBHelper.convert(dnprotos.get(i));
}
+ final List<String> sidprotos = request.getNewTargetStoragesList();
+ final String[] storageIDs = sidprotos.toArray(new
String[sidprotos.size()]);
try {
impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
request.getNewGenStamp(), request.getNewLength(),
- request.getCloseFile(), request.getDeleteBlock(), dns);
+ request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs);
} catch (IOException e) {
throw new ServiceException(e);
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
Mon Mar 19 22:09:14 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocolP
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import
org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
import
org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
import
org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
@@ -66,14 +65,15 @@ public class InterDatanodeProtocolServer
public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
RpcController unused, UpdateReplicaUnderRecoveryRequestProto request)
throws ServiceException {
- ExtendedBlock b;
+ final String storageID;
try {
- b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()),
+ storageID = impl.updateReplicaUnderRecovery(
+ PBHelper.convert(request.getBlock()),
request.getRecoveryId(), request.getNewLength());
} catch (IOException e) {
throw new ServiceException(e);
}
return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
- .setBlock(PBHelper.convert(b)).build();
+ .setStorageID(storageID).build();
}
}
\ No newline at end of file
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
Mon Mar 19 22:09:14 2012
@@ -91,15 +91,15 @@ public class InterDatanodeProtocolTransl
}
@Override
- public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock))
.setNewLength(newLength).setRecoveryId(recoveryId).build();
try {
- return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery(
- NULL_CONTROLLER, req).getBlock());
+ return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
+ ).getStorageID();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Mon Mar 19 22:09:14 2012
@@ -1765,10 +1765,9 @@ public class DataNode extends Configured
* Update replica with the new generation stamp and length.
*/
@Override // InterDatanodeProtocol
- public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
- long recoveryId,
- long newLength) throws IOException {
- ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
+ public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
+ final long recoveryId, final long newLength) throws IOException {
+ final String storageID = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
@@ -1777,7 +1776,7 @@ public class DataNode extends Configured
newBlock.setGenerationStamp(recoveryId);
newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "");
- return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
+ return storageID;
}
/** A convenient class used in block recovery */
@@ -1786,6 +1785,8 @@ public class DataNode extends Configured
final InterDatanodeProtocol datanode;
final ReplicaRecoveryInfo rInfo;
+ private String storageID;
+
BlockRecord(DatanodeID id,
InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) {
@@ -1794,6 +1795,12 @@ public class DataNode extends Configured
this.rInfo = rInfo;
}
+ void updateReplicaUnderRecovery(String bpid, long recoveryId, long
newLength
+ ) throws IOException {
+ final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
+ storageID = datanode.updateReplicaUnderRecovery(b, recoveryId,
newLength);
+ }
+
@Override
public String toString() {
return "block:" + rInfo + " node:" + id;
@@ -1870,6 +1877,7 @@ public class DataNode extends Configured
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock();
+ final String bpid = block.getBlockPoolId();
DatanodeProtocolClientSideTranslatorPB nn =
getActiveNamenodeForBP(block.getBlockPoolId());
if (nn == null) {
@@ -1889,7 +1897,7 @@ public class DataNode extends Configured
// The block can be deleted.
if (syncList.isEmpty()) {
nn.commitBlockSynchronization(block, recoveryId, 0,
- true, true, DatanodeID.EMPTY_ARRAY);
+ true, true, DatanodeID.EMPTY_ARRAY, null);
return;
}
@@ -1912,8 +1920,8 @@ public class DataNode extends Configured
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
- final ExtendedBlock newBlock = new ExtendedBlock(block.getBlockPoolId(),
block
- .getBlockId(), -1, recoveryId);
+ final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
+ -1, recoveryId);
switch(bestState) {
case FINALIZED:
assert finalizedLength > 0 : "finalizedLength is not positive";
@@ -1944,16 +1952,11 @@ public class DataNode extends Configured
}
List<DatanodeID> failedList = new ArrayList<DatanodeID>();
- List<DatanodeID> successList = new ArrayList<DatanodeID>();
+ final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) {
try {
- ExtendedBlock reply = r.datanode.updateReplicaUnderRecovery(
- new ExtendedBlock(newBlock.getBlockPoolId(), r.rInfo), recoveryId,
- newBlock.getNumBytes());
- assert reply.equals(newBlock) &&
- reply.getNumBytes() == newBlock.getNumBytes() :
- "Updated replica must be the same as the new block.";
- successList.add(r.id);
+ r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes());
+ successList.add(r);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e);
@@ -1974,10 +1977,16 @@ public class DataNode extends Configured
}
// Notify the name-node about successfully recovered replicas.
- DatanodeID[] nlist = successList.toArray(new
DatanodeID[successList.size()]);
+ final DatanodeID[] datanodes = new DatanodeID[successList.size()];
+ final String[] storages = new String[datanodes.length];
+ for(int i = 0; i < datanodes.length; i++) {
+ final BlockRecord r = successList.get(i);
+ datanodes[i] = r.id;
+ storages[i] = r.storageID;
+ }
nn.commitBlockSynchronization(block,
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
- nlist);
+ datanodes, storages);
}
private static void logRecoverBlock(String who,
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Mon Mar 19 22:09:14 2012
@@ -553,14 +553,16 @@ class FSDataset implements FSDatasetInte
*/
static class FSVolume implements FsVolumeSpi {
private final FSDataset dataset;
+ private final String storageID;
private final Map<String, BlockPoolSlice> map = new HashMap<String,
BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
- FSVolume(FSDataset dataset, File currentDir, Configuration conf
- ) throws IOException {
+ FSVolume(FSDataset dataset, String storageID, File currentDir,
+ Configuration conf) throws IOException {
this.dataset = dataset;
+ this.storageID = storageID;
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.currentDir = currentDir;
@@ -808,6 +810,10 @@ class FSDataset implements FSDatasetInte
}
}
}
+
+ String getStorageID() {
+ return storageID;
+ }
}
static class FSVolumeSet {
@@ -1017,6 +1023,12 @@ class FSDataset implements FSDatasetInte
return volumes.volumes;
}
+ @Override
+ public synchronized FSVolume getVolume(final ExtendedBlock b) {
+ final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
+ return r != null? (FSVolume)r.getVolume(): null;
+ }
+
@Override // FSDatasetInterface
public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException {
@@ -1107,7 +1119,7 @@ class FSDataset implements FSDatasetInte
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
- volArray.add(new FSVolume(this, dir, conf));
+ volArray.add(new FSVolume(this, storage.getStorageID(), dir, conf));
DataNode.LOG.info("FSDataset added volume - " + dir);
}
volumeMap = new ReplicasMap(this);
@@ -1758,19 +1770,6 @@ class FSDataset implements FSDatasetInte
channel.position(newPos);
}
- synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws
IOException {
- if ( vol == null ) {
- ReplicaInfo replica = volumeMap.get(bpid, blk);
- if (replica != null) {
- vol = (FSVolume)volumeMap.get(bpid, blk).getVolume();
- }
- if ( vol == null ) {
- throw new IOException("Could not find volume for block " + blk);
- }
- }
- return vol.createTmpFile(bpid, blk);
- }
-
//
// REMIND - mjc - eventually we should have a timeout system
// in place to clean up block files left by abandoned clients.
@@ -2421,13 +2420,13 @@ class FSDataset implements FSDatasetInte
}
@Override // FSDatasetInterface
- public synchronized ReplicaInfo updateReplicaUnderRecovery(
+ public synchronized String updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
final long newlength) throws IOException {
//get replica
- final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockPoolId(),
- oldBlock.getBlockId());
+ final String bpid = oldBlock.getBlockPoolId();
+ final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
DataNode.LOG.info("updateReplica: block=" + oldBlock
+ ", recoveryId=" + recoveryId
+ ", length=" + newlength
@@ -2457,10 +2456,18 @@ class FSDataset implements FSDatasetInte
//update replica
final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
.getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
newlength);
+ assert finalized.getBlockId() == oldBlock.getBlockId()
+ && finalized.getGenerationStamp() == recoveryId
+ && finalized.getNumBytes() == newlength
+ : "Replica information mismatched: oldBlock=" + oldBlock
+ + ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ + ", finalized=" + finalized;
//check replica files after update
checkReplicaFiles(finalized);
- return finalized;
+
+ //return storage ID
+ return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
}
private FinalizedReplica updateReplicaUnderRecovery(
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Mon Mar 19 22:09:14 2012
@@ -87,6 +87,9 @@ public interface FSDatasetInterface<V ex
/** @return a list of volumes. */
public List<V> getVolumes();
+ /** @return the volume that contains a replica of the block. */
+ public V getVolume(ExtendedBlock b);
+
/** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap();
@@ -336,11 +339,11 @@ public interface FSDatasetInterface<V ex
/**
* Update replica's generation stamp and length and finalize it.
+ * @return the ID of storage that stores the block
*/
- public ReplicaInfo updateReplicaUnderRecovery(
- ExtendedBlock oldBlock,
- long recoveryId,
- long newLength) throws IOException;
+ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+ long recoveryId, long newLength) throws IOException;
+
/**
* add new block pool ID
* @param bpid Block pool Id
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Mon Mar 19 22:09:14 2012
@@ -2830,7 +2830,8 @@ public class FSNamesystem implements Nam
void commitBlockSynchronization(ExtendedBlock lastblock,
long newgenerationstamp, long newlength,
- boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
+ String[] newtargetstorages)
throws IOException, UnresolvedLinkException {
String src = "";
writeLock();
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
Mon Mar 19 22:09:14 2012
@@ -546,10 +546,11 @@ class NameNodeRpcServer implements Namen
@Override // DatanodeProtocol
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
- boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
+ String[] newtargetstorages)
throws IOException {
- namesystem.commitBlockSynchronization(block,
- newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+ namesystem.commitBlockSynchronization(block, newgenerationstamp,
+ newlength, closeFile, deleteblock, newtargets, newtargetstorages);
}
@Override // ClientProtocol
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
Mon Mar 19 22:09:14 2012
@@ -176,6 +176,6 @@ public interface DatanodeProtocol {
*/
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
- boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
- ) throws IOException;
+ boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
+ String[] newtargetstorages) throws IOException;
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
Mon Mar 19 22:09:14 2012
@@ -26,7 +26,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
/** An inter-datanode protocol for updating generation stamp
@@ -55,9 +54,6 @@ public interface InterDatanodeProtocol {
*
* For more details on protocol buffer wire protocol, please see
* .../org/apache/hadoop/hdfs/protocolPB/overview.html
- *
- * The log of historical changes can be retrieved from the svn).
- * 6: Add block pool ID to Block
*/
public static final long versionID = 6L;
@@ -73,7 +69,6 @@ public interface InterDatanodeProtocol {
/**
* Update replica with the new generation stamp and length.
*/
- ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
- long recoveryId,
- long newLength) throws IOException;
+ String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId,
+ long newLength) throws IOException;
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
Mon Mar 19 22:09:14 2012
@@ -339,6 +339,7 @@ message CommitBlockSynchronizationReques
required bool closeFile = 4;
required bool deleteBlock = 5;
repeated DatanodeIDProto newTaragets = 6;
+ repeated string newTargetStorages = 7;
}
/**
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto
Mon Mar 19 22:09:14 2012
@@ -55,7 +55,7 @@ message UpdateReplicaUnderRecoveryReques
* Response returns updated block information
*/
message UpdateReplicaUnderRecoveryResponseProto {
- required ExtendedBlockProto block = 1; // Updated block information
+ required string storageID = 1; // ID of the storage that stores replica
}
/**
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Mon Mar 19 22:09:14 2012
@@ -903,11 +903,10 @@ public class SimulatedFSDataset implemen
}
@Override // FSDatasetInterface
- public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newlength) {
- return new FinalizedReplica(
- oldBlock.getBlockId(), newlength, recoveryId, null, null);
+ return storageId;
}
@Override // FSDatasetInterface
@@ -985,4 +984,9 @@ public class SimulatedFSDataset implemen
public RollingLogs createRollingLogs(String bpid, String prefix) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public FsVolumeSpi getVolume(ExtendedBlock b) {
+ throw new UnsupportedOperationException();
+ }
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Mon Mar 19 22:09:14 2012
@@ -18,6 +18,27 @@
package org.apache.hadoop.hdfs.server.datanode;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -34,10 +55,10 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -45,10 +66,9 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
@@ -62,16 +82,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
/**
* This tests if sync all replicas in block recovery works correctly
*/
@@ -196,11 +206,9 @@ public class TestBlockRecovery {
syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
- anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(),
- block.getBlockId(), expectLen, block.getGenerationStamp()));
+ anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
- anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(),
- block.getBlockId(), expectLen, block.getGenerationStamp()));
+ anyLong())).thenReturn("storage2");
dn.syncBlock(rBlock, syncList);
}
@@ -463,7 +471,7 @@ public class TestBlockRecovery {
d.join();
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization(
- block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
+ block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
}
private List<BlockRecord> initBlockRecords(DataNode spyDN) throws
IOException {
@@ -521,7 +529,7 @@ public class TestBlockRecovery {
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
- anyBoolean(), any(DatanodeID[].class));
+ anyBoolean(), any(DatanodeID[].class), any(String[].class));
}
/**
@@ -550,7 +558,7 @@ public class TestBlockRecovery {
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
- anyBoolean(), any(DatanodeID[].class));
+ anyBoolean(), any(DatanodeID[].class), any(String[].class));
} finally {
streams.close();
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Mon Mar 19 22:09:14 2012
@@ -329,14 +329,9 @@ public class TestInterDatanodeProtocol {
}
//update
- final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery(
+ final String storageID = fsdataset.updateReplicaUnderRecovery(
new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
-
- //check meta data after update
- FSDataset.checkReplicaFiles(finalized);
- Assert.assertEquals(b.getBlockId(), finalized.getBlockId());
- Assert.assertEquals(recoveryid, finalized.getGenerationStamp());
- Assert.assertEquals(newlength, finalized.getNumBytes());
+ assertTrue(storageID != null);
} finally {
if (cluster != null) cluster.shutdown();
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1302683&r1=1302682&r2=1302683&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
Mon Mar 19 22:09:14 2012
@@ -307,7 +307,8 @@ public class TestPipelinesFailover {
Mockito.anyLong(), // new length
Mockito.eq(true), // close file
Mockito.eq(false), // delete block
- (DatanodeID[]) Mockito.anyObject()); // new targets
+ (DatanodeID[]) Mockito.anyObject(), // new targets
+ (String[]) Mockito.anyObject()); // new target storages
DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
assertFalse(fsOtherUser.recoverLease(TEST_PATH));