Repository: hadoop
Updated Branches:
  refs/heads/trunk 8d29e2451 -> 3c18a53cb


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
new file mode 100644
index 0000000..a0a5f83
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+
+/**
+ * StripedReconstructor reconstruct one or more missed striped block in the
+ * striped block group, the minimum number of live striped blocks should be
+ * no less than data block number.
+ *
+ * | <- Striped Block Group -> |
+ *  blk_0      blk_1       blk_2(*)   blk_3   ...   <- A striped block group
+ *    |          |           |          |
+ *    v          v           v          v
+ * +------+   +------+   +------+   +------+
+ * |cell_0|   |cell_1|   |cell_2|   |cell_3|  ...
+ * +------+   +------+   +------+   +------+
+ * |cell_4|   |cell_5|   |cell_6|   |cell_7|  ...
+ * +------+   +------+   +------+   +------+
+ * |cell_8|   |cell_9|   |cell10|   |cell11|  ...
+ * +------+   +------+   +------+   +------+
+ *  ...         ...       ...         ...
+ *
+ *
+ * We use following steps to reconstruct striped block group, in each round, we
+ * reconstruct <code>bufferSize</code> data until finish, the
+ * <code>bufferSize</code> is configurable and may be less or larger than
+ * cell size:
+ * step1: read <code>bufferSize</code> data from minimum number of sources
+ *        required by reconstruction.
+ * step2: decode data for targets.
+ * step3: transfer data to targets.
+ *
+ * In step1, try to read <code>bufferSize</code> data from minimum number
+ * of sources , if there is corrupt or stale sources, read from new source
+ * will be scheduled. The best sources are remembered for next round and
+ * may be updated in each round.
+ *
+ * In step2, typically if source blocks we read are all data blocks, we
+ * need to call encode, and if there is one parity block, we need to call
+ * decode. Notice we only read once and reconstruct all missed striped block
+ * if they are more than one.
+ *
+ * In step3, send the reconstructed data to targets by constructing packet
+ * and send them directly. Same as continuous block replication, we
+ * don't check the packet ack. Since the datanode doing the reconstruction work
+ * are one of the source datanodes, so the reconstructed data are sent
+ * remotely.
+ *
+ * There are some points we can do further improvements in next phase:
+ * 1. we can read the block file directly on the local datanode,
+ *    currently we use remote block reader. (Notice short-circuit is not
+ *    a good choice, see inline comments).
+ * 2. We need to check the packet ack for EC reconstruction? Since EC
+ *    reconstruction is more expensive than continuous block replication,
+ *    it needs to read from several other datanodes, should we make sure the
+ *    reconstructed result received by targets?
+ */
+@InterfaceAudience.Private
+class StripedReconstructor implements Runnable {
+  private static final Logger LOG = DataNode.LOG;
+
+  private final ErasureCodingWorker worker;
+  private final DataNode datanode;
+  private final Configuration conf;
+
+  private final ErasureCodingPolicy ecPolicy;
+
+  private RawErasureDecoder decoder;
+
+  private final ExtendedBlock blockGroup;
+  private final BitSet liveBitSet;
+
+  // position in striped internal block
+  private long positionInBlock;
+
+  private StripedReader stripedReader;
+
+  private StripedWriter stripedWriter;
+
+  private final CachingStrategy cachingStrategy;
+
+  StripedReconstructor(ErasureCodingWorker worker,
+                       BlockECReconstructionInfo reconstructionInfo) {
+    this.worker = worker;
+    this.datanode = worker.getDatanode();
+    this.conf = worker.getConf();
+
+    ecPolicy = reconstructionInfo.getErasureCodingPolicy();
+
+    blockGroup = reconstructionInfo.getExtendedBlock();
+    byte[] liveIndices = reconstructionInfo.getLiveBlockIndices();
+    liveBitSet = new BitSet(ecPolicy.getNumDataUnits() +
+        ecPolicy.getNumParityUnits());
+    for (int i = 0; i < liveIndices.length; i++) {
+      liveBitSet.set(liveIndices[i]);
+    }
+
+    stripedReader = new StripedReader(this, datanode,
+        conf, reconstructionInfo);
+    stripedWriter = new StripedWriter(this, datanode,
+        conf, reconstructionInfo);
+
+    cachingStrategy = CachingStrategy.newDefaultStrategy();
+
+    positionInBlock = 0L;
+  }
+
+  BitSet getLiveBitSet() {
+    return liveBitSet;
+  }
+
+  ByteBuffer allocateBuffer(int length) {
+    return ByteBuffer.allocate(length);
+  }
+
+  ExtendedBlock getBlock(int i) {
+    return StripedBlockUtil.constructInternalBlock(blockGroup, ecPolicy, i);
+  }
+
+  long getBlockLen(int i) {
+    return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
+        ecPolicy, i);
+  }
+
+  boolean hasValidTargets() {
+    return stripedWriter.hasValidTargets();
+  }
+
+  @Override
+  public void run() {
+    datanode.incrementXmitsInProgress();
+    try {
+      stripedReader.init();
+
+      stripedWriter.init();
+
+      reconstructAndTransfer();
+
+      stripedWriter.endTargetBlocks();
+
+      // Currently we don't check the acks for packets, this is similar as
+      // block replication.
+    } catch (Throwable e) {
+      LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e);
+    } finally {
+      datanode.decrementXmitsInProgress();
+
+      stripedReader.close();
+
+      stripedWriter.close();
+    }
+  }
+
+  void reconstructAndTransfer() throws IOException {
+    while (positionInBlock < stripedWriter.getMaxTargetLength()) {
+      long remaining = stripedWriter.getMaxTargetLength() - positionInBlock;
+      final int toReconstructLen =
+          (int) Math.min(stripedReader.getBufferSize(), remaining);
+      // step1: read from minimum source DNs required for reconstruction.
+      // The returned success list is the source DNs we do real read from
+      stripedReader.readMinimumSources(toReconstructLen);
+
+      // step2: decode to reconstruct targets
+      reconstructTargets(toReconstructLen);
+
+      // step3: transfer data
+      if (stripedWriter.transferData2Targets() == 0) {
+        String error = "Transfer failed for all targets.";
+        throw new IOException(error);
+      }
+
+      positionInBlock += toReconstructLen;
+
+      clearBuffers();
+    }
+  }
+
+  // Initialize decoder
+  private void initDecoderIfNecessary() {
+    if (decoder == null) {
+      decoder = CodecUtil.createRSRawDecoder(conf, ecPolicy.getNumDataUnits(),
+          ecPolicy.getNumParityUnits());
+    }
+  }
+
+  private void reconstructTargets(int toReconstructLen) {
+    initDecoderIfNecessary();
+
+    ByteBuffer[] inputs = stripedReader.getInputBuffers(toReconstructLen);
+
+    int[] erasedIndices = stripedWriter.getRealTargetIndices();
+    ByteBuffer[] outputs = 
stripedWriter.getRealTargetBuffers(toReconstructLen);
+
+    decoder.decode(inputs, erasedIndices, outputs);
+
+    stripedWriter.updateRealTargetBuffers(toReconstructLen);
+  }
+
+  long getPositionInBlock() {
+    return positionInBlock;
+  }
+
+  /**
+   * Clear all associated buffers.
+   */
+  private void clearBuffers() {
+    stripedReader.clearBuffers();
+
+    stripedWriter.clearBuffers();
+  }
+
+  InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
+    return NetUtils.createSocketAddr(dnInfo.getXferAddr(
+        datanode.getDnConf().getConnectToDnViaHostname()));
+  }
+
+  int getBufferSize() {
+    return stripedReader.getBufferSize();
+  }
+
+  DataChecksum getChecksum() {
+    return stripedReader.getChecksum();
+  }
+
+  CachingStrategy getCachingStrategy() {
+    return cachingStrategy;
+  }
+
+  CompletionService<Void> createReadService() {
+    return new ExecutorCompletionService<>(worker.getStripedReadPool());
+  }
+
+  ExtendedBlock getBlockGroup() {
+    return blockGroup;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
new file mode 100644
index 0000000..e2052a3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+/**
+ * Manage striped writers that writes to a target with reconstructed data.
+ */
+@InterfaceAudience.Private
+class StripedWriter {
+  private static final Logger LOG = DataNode.LOG;
+  private final static int WRITE_PACKET_SIZE = 64 * 1024;
+
+  private final StripedReconstructor reconstructor;
+  private final DataNode datanode;
+  private final Configuration conf;
+
+  private final int dataBlkNum;
+  private final int parityBlkNum;
+
+  private boolean[] targetsStatus;
+
+  // targets
+  private final DatanodeInfo[] targets;
+  private final short[] targetIndices;
+  private boolean hasValidTargets;
+  private final StorageType[] targetStorageTypes;
+  private long maxTargetLength;
+
+  private StripedBlockWriter[] writers;
+
+  private int maxChunksPerPacket;
+  private byte[] packetBuf;
+  private byte[] checksumBuf;
+  private int bytesPerChecksum;
+  private int checksumSize;
+
+  StripedWriter(StripedReconstructor reconstructor,
+                DataNode datanode,
+                Configuration conf,
+                BlockECReconstructionInfo reconstructionInfo) {
+    this.reconstructor = reconstructor;
+    this.datanode = datanode;
+    this.conf = conf;
+
+    ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy();
+    dataBlkNum = ecPolicy.getNumDataUnits();
+    parityBlkNum = ecPolicy.getNumParityUnits();
+
+    targets = reconstructionInfo.getTargetDnInfos();
+    targetStorageTypes = reconstructionInfo.getTargetStorageTypes();
+
+    writers = new StripedBlockWriter[targets.length];
+
+    targetIndices = new short[targets.length];
+    Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
+        "Too much missed striped blocks.");
+    initTargetIndices();
+
+    maxTargetLength = 0L;
+    for (short targetIndex : targetIndices) {
+      maxTargetLength = Math.max(maxTargetLength,
+          reconstructor.getBlockLen(targetIndex));
+    }
+
+    // targetsStatus store whether some target is success, it will record
+    // any failed target once, if some target failed (invalid DN or transfer
+    // failed), will not transfer data to it any more.
+    targetsStatus = new boolean[targets.length];
+  }
+
+  void init() throws IOException {
+    DataChecksum checksum = reconstructor.getChecksum();
+    checksumSize = checksum.getChecksumSize();
+    bytesPerChecksum = checksum.getBytesPerChecksum();
+    int chunkSize = bytesPerChecksum + checksumSize;
+    maxChunksPerPacket = Math.max(
+        (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize, 1);
+    int maxPacketSize = chunkSize * maxChunksPerPacket
+        + PacketHeader.PKT_MAX_HEADER_LEN;
+
+    packetBuf = new byte[maxPacketSize];
+    int tmpLen = checksumSize *
+        (reconstructor.getBufferSize() / bytesPerChecksum);
+    checksumBuf = new byte[tmpLen];
+
+    if (initTargetStreams() == 0) {
+      String error = "All targets are failed.";
+      throw new IOException(error);
+    }
+  }
+
+  private void initTargetIndices() {
+    BitSet bitset = reconstructor.getLiveBitSet();
+
+    int m = 0;
+    int k = 0;
+    hasValidTargets = false;
+    for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+      if (!bitset.get(i)) {
+        if (reconstructor.getBlockLen(i) > 0) {
+          if (m < targets.length) {
+            targetIndices[m++] = (short)i;
+            hasValidTargets = true;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Send reconstructed data to targets.
+   */
+  int transferData2Targets() {
+    int nSuccess = 0;
+    for (int i = 0; i < targets.length; i++) {
+      if (targetsStatus[i]) {
+        boolean success = false;
+        try {
+          writers[i].transferData2Target(packetBuf);
+          nSuccess++;
+          success = true;
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+        targetsStatus[i] = success;
+      }
+    }
+    return nSuccess;
+  }
+
+  /**
+   * Send an empty packet to mark the end of the block.
+   */
+  void endTargetBlocks() {
+    for (int i = 0; i < targets.length; i++) {
+      if (targetsStatus[i]) {
+        try {
+          writers[i].endTargetBlock(packetBuf);
+        } catch (IOException e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * Initialize  output/input streams for transferring data to target
+   * and send create block request.
+   */
+  int initTargetStreams() {
+    int nSuccess = 0;
+    for (short i = 0; i < targets.length; i++) {
+      try {
+        writers[i] = createWriter(i);
+        nSuccess++;
+        targetsStatus[i] = true;
+      } catch (Throwable e) {
+        LOG.warn(e.getMessage());
+      }
+    }
+    return nSuccess;
+  }
+
+  private StripedBlockWriter createWriter(short index) throws IOException {
+    return new StripedBlockWriter(this, datanode, conf,
+        reconstructor.getBlock(targetIndices[index]), targets[index],
+        targetStorageTypes[index]);
+  }
+
+  ByteBuffer allocateWriteBuffer() {
+    return reconstructor.allocateBuffer(reconstructor.getBufferSize());
+  }
+
+  int getTargets() {
+    return targets.length;
+  }
+
+  private int getRealTargets() {
+    int m = 0;
+    for (int i = 0; i < targets.length; i++) {
+      if (targetsStatus[i]) {
+        m++;
+      }
+    }
+    return m;
+  }
+
+  int[] getRealTargetIndices() {
+    int realTargets = getRealTargets();
+    int[] results = new int[realTargets];
+    int m = 0;
+    for (int i = 0; i < targets.length; i++) {
+      if (targetsStatus[i]) {
+        results[m++] = targetIndices[i];
+      }
+    }
+    return results;
+  }
+
+  ByteBuffer[] getRealTargetBuffers(int toReconstructLen) {
+    int numGood = getRealTargets();
+    ByteBuffer[] outputs = new ByteBuffer[numGood];
+    int m = 0;
+    for (int i = 0; i < targets.length; i++) {
+      if (targetsStatus[i]) {
+        writers[i].getTargetBuffer().limit(toReconstructLen);
+        outputs[m++] = writers[i].getTargetBuffer();
+      }
+    }
+    return outputs;
+  }
+
+  void updateRealTargetBuffers(int toReconstructLen) {
+    for (int i = 0; i < targets.length; i++) {
+      if (targetsStatus[i]) {
+        long blockLen = reconstructor.getBlockLen(targetIndices[i]);
+        long remaining = blockLen - reconstructor.getPositionInBlock();
+        if (remaining <= 0) {
+          writers[i].getTargetBuffer().limit(0);
+        } else if (remaining < toReconstructLen) {
+          writers[i].getTargetBuffer().limit((int)remaining);
+        }
+      }
+    }
+  }
+
+  long getMaxTargetLength() {
+    return maxTargetLength;
+  }
+
+  byte[] getChecksumBuf() {
+    return checksumBuf;
+  }
+
+  int getBytesPerChecksum() {
+    return bytesPerChecksum;
+  }
+
+  int getChecksumSize() {
+    return checksumSize;
+  }
+
+  DataChecksum getChecksum() {
+    return reconstructor.getChecksum();
+  }
+
+  int getMaxChunksPerPacket() {
+    return maxChunksPerPacket;
+  }
+
+  CachingStrategy getCachingStrategy() {
+    return reconstructor.getCachingStrategy();
+  }
+
+  InetSocketAddress getSocketAddress4Transfer(DatanodeInfo target) {
+    return reconstructor.getSocketAddress4Transfer(target);
+  }
+
+  boolean hasValidTargets() {
+    return hasValidTargets;
+  }
+
+  /**
+   * Clear all buffers.
+   */
+  void clearBuffers() {
+    for (StripedBlockWriter writer : writers) {
+      ByteBuffer targetBuffer = writer.getTargetBuffer();
+      if (targetBuffer != null) {
+        targetBuffer.clear();
+      }
+    }
+  }
+
+  void close() {
+    for (int i = 0; i < targets.length; i++) {
+      writers[i].close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java
new file mode 100644
index 0000000..3150fce
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Datanode side striping + erasure coding related task processing.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c18a53c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 38ca8ce..7155e74 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -230,22 +230,23 @@ public class TestReconstructStripedFile {
   private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
       ReconstructionType type)
     throws IOException {
-    int stoppedDN = 0;
-    for (Map.Entry<ExtendedBlock, DataNode> target : 
corruptTargets.entrySet()) {
-      if (stoppedDN == 0 || type != ReconstructionType.DataOnly
+    int stoppedDNs = 0;
+    for (Map.Entry<ExtendedBlock, DataNode> target :
+        corruptTargets.entrySet()) {
+      if (stoppedDNs == 0 || type != ReconstructionType.DataOnly
           || random.nextBoolean()) {
         // stop at least one DN to trigger reconstruction
         LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
             + " with internal block " + target.getKey());
         shutdownDataNode(target.getValue());
-        stoppedDN++;
+        stoppedDNs++;
       } else { // corrupt the data on the DN
         LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()
             + " with internal block " + target.getKey());
         cluster.corruptReplica(target.getValue(), target.getKey());
       }
     }
-    return stoppedDN;
+    return stoppedDNs;
   }
 
   /**

Reply via email to