http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --cc 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index da09b0e,29bcd79..c93a362
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@@ -2816,30 -2633,14 +2831,30 @@@ public class DataNode extends Reconfigu
    }
  
    /**
-    * Convenience method, which unwraps RemoteException.
-    * @throws IOException not a RemoteException.
-    */
 -   * Update replica with the new generation stamp and length.  
++  * Convenience method, which unwraps RemoteException.
++  * @throws IOException not a RemoteException.
++  */
 +  private static ReplicaRecoveryInfo callInitReplicaRecovery(
 +      InterDatanodeProtocol datanode,
 +      RecoveringBlock rBlock) throws IOException {
 +    try {
 +      return datanode.initReplicaRecovery(rBlock);
-     } catch(RemoteException re) {
++    } catch (RemoteException re) {
 +      throw re.unwrapRemoteException();
 +    }
 +  }
 +
 +  /**
-    * Update replica with the new generation stamp and length.  
++   * Update replica with the new generation stamp and length.
     */
    @Override // InterDatanodeProtocol
    public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
--      final long recoveryId, final long newBlockId, final long newLength)
++                                           final long recoveryId, final long 
newBlockId, final long newLength)
        throws IOException {
 -    final String storageID = data.updateReplicaUnderRecovery(oldBlock,
 -        recoveryId, newBlockId, newLength);
 +    final FsDatasetSpi<?> dataset =
 +        (FsDatasetSpi<?>) getDataset(oldBlock.getBlockPoolId());
 +    final String storageID = dataset.updateReplicaUnderRecovery(
 +        oldBlock, recoveryId, newBlockId, newLength);
      // Notify the namenode of the updated block info. This is important
      // for HA, since otherwise the standby node may lose track of the
      // block locations until the next block report.
@@@ -2851,234 -2652,6 +2866,244 @@@
      return storageID;
    }
  
-   /** A convenient class used in block recovery */
-   static class BlockRecord { 
++  /**
++   * A convenient class used in block recovery
++   */
++  static class BlockRecord {
 +    final DatanodeID id;
 +    final InterDatanodeProtocol datanode;
 +    final ReplicaRecoveryInfo rInfo;
-     
 +    private String storageID;
 +
 +    BlockRecord(DatanodeID id,
 +                InterDatanodeProtocol datanode,
 +                ReplicaRecoveryInfo rInfo) {
 +      this.id = id;
 +      this.datanode = datanode;
 +      this.rInfo = rInfo;
 +    }
 +
 +    void updateReplicaUnderRecovery(String bpid, long recoveryId,
 +                                    long newBlockId, long newLength)
 +        throws IOException {
 +      final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
 +      storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, 
newBlockId,
 +          newLength);
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return "block:" + rInfo + " node:" + id;
 +    }
 +  }
 +
-   /** Recover a block */
++
++  /**
++   * Recover a block
++   */
 +  private void recoverBlock(RecoveringBlock rBlock) throws IOException {
 +    ExtendedBlock block = rBlock.getBlock();
 +    String blookPoolId = block.getBlockPoolId();
 +    DatanodeID[] datanodeids = rBlock.getLocations();
 +    List<BlockRecord> syncList = new 
ArrayList<BlockRecord>(datanodeids.length);
 +    int errorCount = 0;
 +
 +    //check generation stamps
-     for(DatanodeID id : datanodeids) {
++    for (DatanodeID id : datanodeids) {
 +      try {
 +        BPOfferService bpos = blockPoolManager.get(blookPoolId);
 +        DatanodeRegistration bpReg = bpos.bpRegistration;
-         InterDatanodeProtocol datanode = bpReg.equals(id)?
-             this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
-                 dnConf.socketTimeout, dnConf.connectToDnViaHostname);
++        InterDatanodeProtocol datanode = bpReg.equals(id) ?
++            this : DataNode.createInterDataNodeProtocolProxy(id, getConf(),
++            dnConf.socketTimeout, dnConf.connectToDnViaHostname);
 +        ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
 +        if (info != null &&
 +            info.getGenerationStamp() >= block.getGenerationStamp() &&
 +            info.getNumBytes() > 0) {
 +          syncList.add(new BlockRecord(id, datanode, info));
 +        }
 +      } catch (RecoveryInProgressException ripE) {
 +        InterDatanodeProtocol.LOG.warn(
 +            "Recovery for replica " + block + " on data-node " + id
-             + " is already in progress. Recovery id = "
-             + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
++                + " is already in progress. Recovery id = "
++                + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
 +        return;
 +      } catch (IOException e) {
 +        ++errorCount;
 +        InterDatanodeProtocol.LOG.warn(
-             "Failed to obtain replica info for block (=" + block 
-             + ") from datanode (=" + id + ")", e);
++            "Failed to obtain replica info for block (=" + block
++                + ") from datanode (=" + id + ")", e);
 +      }
 +    }
 +
 +    if (errorCount == datanodeids.length) {
 +      throw new IOException("All datanodes failed: block=" + block
 +          + ", datanodeids=" + Arrays.asList(datanodeids));
 +    }
 +
 +    syncBlock(rBlock, syncList);
 +  }
 +
 +  /**
 +   * Get the NameNode corresponding to the given block pool.
 +   *
 +   * @param bpid Block pool Id
 +   * @return Namenode corresponding to the bpid
-    * @throws IOException if unable to get the corresponding NameNode
++   * @throws IOException if unable to get the corresponding NameNode Block
++   *                     synchronization
 +   */
 +  public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String 
bpid)
 +      throws IOException {
 +    BPOfferService bpos = blockPoolManager.get(bpid);
 +    if (bpos == null) {
 +      throw new IOException("No block pool offer service for bpid=" + bpid);
 +    }
-     
++
 +    DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN();
 +    if (activeNN == null) {
 +      throw new IOException(
 +          "Block pool " + bpid + " has not recognized an active NN");
 +    }
 +    return activeNN;
 +  }
 +
-   /** Block synchronization */
++  /**
++   * Block synchronization
++   */
 +  void syncBlock(RecoveringBlock rBlock,
-                          List<BlockRecord> syncList) throws IOException {
++                 List<BlockRecord> syncList) throws IOException {
 +    ExtendedBlock block = rBlock.getBlock();
 +    final String bpid = block.getBlockPoolId();
 +    DatanodeProtocolClientSideTranslatorPB nn =
-       getActiveNamenodeForBP(block.getBlockPoolId());
++        getActiveNamenodeForBP(block.getBlockPoolId());
 +
 +    long recoveryId = rBlock.getNewGenerationStamp();
 +    boolean isTruncateRecovery = rBlock.getNewBlock() != null;
 +    long blockId = (isTruncateRecovery) ?
 +        rBlock.getNewBlock().getBlockId() : block.getBlockId();
 +
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
 +          + "), syncList=" + syncList);
 +    }
 +
 +    // syncList.isEmpty() means that all data-nodes do not have the block
 +    // or their replicas have 0 length.
 +    // The block can be deleted.
 +    if (syncList.isEmpty()) {
 +      nn.commitBlockSynchronization(block, recoveryId, 0,
 +          true, true, DatanodeID.EMPTY_ARRAY, null);
 +      return;
 +    }
 +
 +    // Calculate the best available replica state.
 +    ReplicaState bestState = ReplicaState.RWR;
 +    long finalizedLength = -1;
-     for(BlockRecord r : syncList) {
++    for (BlockRecord r : syncList) {
 +      assert r.rInfo.getNumBytes() > 0 : "zero length replica";
-       ReplicaState rState = r.rInfo.getOriginalReplicaState(); 
-       if(rState.getValue() < bestState.getValue())
++      ReplicaState rState = r.rInfo.getOriginalReplicaState();
++      if (rState.getValue() < bestState.getValue()) {
 +        bestState = rState;
-       if(rState == ReplicaState.FINALIZED) {
-         if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
++      }
++      if (rState == ReplicaState.FINALIZED) {
++        if (finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) {
 +          throw new IOException("Inconsistent size of finalized replicas. " +
 +              "Replica " + r.rInfo + " expected size: " + finalizedLength);
++        }
 +        finalizedLength = r.rInfo.getNumBytes();
 +      }
 +    }
 +
 +    // Calculate list of nodes that will participate in the recovery
 +    // and the new block size
 +    List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
 +    final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
 +        -1, recoveryId);
-     switch(bestState) {
++    switch (bestState) {
 +    case FINALIZED:
 +      assert finalizedLength > 0 : "finalizedLength is not positive";
-       for(BlockRecord r : syncList) {
++      for (BlockRecord r : syncList) {
 +        ReplicaState rState = r.rInfo.getOriginalReplicaState();
-         if(rState == ReplicaState.FINALIZED ||
-            rState == ReplicaState.RBW &&
-                       r.rInfo.getNumBytes() == finalizedLength)
++        if (rState == ReplicaState.FINALIZED ||
++            rState == ReplicaState.RBW &&
++                r.rInfo.getNumBytes() == finalizedLength) {
 +          participatingList.add(r);
++        }
 +      }
 +      newBlock.setNumBytes(finalizedLength);
 +      break;
 +    case RBW:
 +    case RWR:
 +      long minLength = Long.MAX_VALUE;
-       for(BlockRecord r : syncList) {
++      for (BlockRecord r : syncList) {
 +        ReplicaState rState = r.rInfo.getOriginalReplicaState();
-         if(rState == bestState) {
++        if (rState == bestState) {
 +          minLength = Math.min(minLength, r.rInfo.getNumBytes());
 +          participatingList.add(r);
 +        }
 +      }
 +      newBlock.setNumBytes(minLength);
 +      break;
 +    case RUR:
 +    case TEMPORARY:
 +      assert false : "bad replica state: " + bestState;
 +    }
-     if(isTruncateRecovery)
++    if (isTruncateRecovery) {
 +      newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());
++    }
 +
 +    List<DatanodeID> failedList = new ArrayList<DatanodeID>();
 +    final List<BlockRecord> successList = new ArrayList<BlockRecord>();
-     for(BlockRecord r : participatingList) {
++    for (BlockRecord r : participatingList) {
 +      try {
 +        r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
 +            newBlock.getNumBytes());
 +        successList.add(r);
 +      } catch (IOException e) {
 +        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
 +            + newBlock + ", datanode=" + r.id + ")", e);
 +        failedList.add(r.id);
 +      }
 +    }
 +
 +    // If any of the data-nodes failed, the recovery fails, because
 +    // we never know the actual state of the replica on failed data-nodes.
 +    // The recovery should be started over.
-     if(!failedList.isEmpty()) {
++    if (!failedList.isEmpty()) {
 +      StringBuilder b = new StringBuilder();
-       for(DatanodeID id : failedList) {
++      for (DatanodeID id : failedList) {
 +        b.append("\n  " + id);
 +      }
 +      throw new IOException("Cannot recover " + block + ", the following "
 +          + failedList.size() + " data-nodes failed {" + b + "\n}");
 +    }
 +
 +    // Notify the name-node about successfully recovered replicas.
 +    final DatanodeID[] datanodes = new DatanodeID[successList.size()];
 +    final String[] storages = new String[datanodes.length];
-     for(int i = 0; i < datanodes.length; i++) {
++    for (int i = 0; i < datanodes.length; i++) {
 +      final BlockRecord r = successList.get(i);
 +      datanodes[i] = r.id;
 +      storages[i] = r.storageID;
 +    }
 +    nn.commitBlockSynchronization(block,
 +        newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
 +        datanodes, storages);
 +  }
-   
++
 +  private static void logRecoverBlock(String who, RecoveringBlock rb) {
 +    ExtendedBlock block = rb.getBlock();
 +    DatanodeInfo[] targets = rb.getLocations();
-     
 +    LOG.info(who + " calls recoverBlock(" + block
 +        + ", targets=[" + Joiner.on(", ").join(targets) + "]"
 +        + ((rb.getNewBlock() == null) ? ", newGenerationStamp="
-             + rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock())
++        + rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock())
 +        + ")");
 +  }
 +
 +  /**
 +   * Only valid for blocks stored by FsDatasetSpi instances.
 +   */
    @Override // ClientDataNodeProtocol
    public long getReplicaVisibleLength(final ExtendedBlock block) throws 
IOException {
      checkReadAccess(block);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------

Reply via email to