[ 
https://issues.apache.org/jira/browse/HDFS-16070?focusedWorklogId=611465&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-611465
 ]

ASF GitHub Bot logged work on HDFS-16070:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/21 17:09
            Start Date: 15/Jun/21 17:09
    Worklog Time Spent: 10m 
      Work Description: goiri commented on a change in pull request #3105:
URL: https://github.com/apache/hadoop/pull/3105#discussion_r651985111



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,

Review comment:
       protected static void writeFile() and the same for the other methods.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
##########
@@ -2646,6 +2655,7 @@ public void run() {
       } catch (Throwable t) {
         LOG.error("Failed to transfer block {}", b, t);
       } finally {
+        transferringBlock.remove(b);

Review comment:
       Are we sure we are cleaning this up and won't leave garbage?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");

Review comment:
       Logger format {}

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      stm.flush();
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;

Review comment:
       You never capture this when using writeFile() so it will be GC for this 
current test.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      stm.flush();
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;
+    }
+  }
+
+  /*
+   * Wait till node is fully decommissioned.
+   */
+  private void waitBlockMeetReplication(BlockInfo blockInfo, Short repl) {
+    boolean done = repl == blockInfo.numNodes();
+    while (!done) {
+      LOG.info("Waiting for repl change to " + repl + " current repl: "
+          + blockInfo.numNodes());
+      try {
+        Thread.sleep(HEARTBEAT_INTERVAL * 500);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+      done = repl == blockInfo.numNodes();
+    }
+    LOG.info("block " + blockInfo + " meet the replication " + repl);
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testIOBusyNode() throws Exception {
+
+    FileSystem fileSys = cluster.getFileSystem(0);
+    // 1. create file
+    final Path file = new Path(dir, "testFile");
+    int repl = 1;
+    writeFile(fileSys, file, repl);
+
+    // 2. find the datanode which store this block
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(file.toString()).asFile();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    NumberReplicas replicas = bm.countNodes(firstBlock);
+    Assert.assertEquals(1, replicas.liveReplicas());
+    Assert.assertEquals(1, firstBlock.numNodes());
+
+    // 3. make datanode io busy. we delay remove operation so that we could
+    //   simulate that the datanode's io is busy.
+    DatanodeDescriptor datanode = firstBlock.getDatanode(0);
+    Logger log = mock(Logger.class);
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (datanode.getXferPort() != dn.getXferPort()) {
+        continue;
+      }
+      Set<ExtendedBlock> set = Collections
+          .synchronizedSet(new HashSet<ExtendedBlock>() {
+            @Override
+            public boolean add(ExtendedBlock block) {
+              boolean ret = super.add(block);
+              try {
+                Thread.sleep(30000);

Review comment:
       Comment saying why waiting 30 seconds

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      stm.flush();
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;
+    }
+  }
+
+  /*
+   * Wait till node is fully decommissioned.
+   */
+  private void waitBlockMeetReplication(BlockInfo blockInfo, Short repl) {
+    boolean done = repl == blockInfo.numNodes();
+    while (!done) {
+      LOG.info("Waiting for repl change to " + repl + " current repl: "
+          + blockInfo.numNodes());
+      try {
+        Thread.sleep(HEARTBEAT_INTERVAL * 500);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+      done = repl == blockInfo.numNodes();
+    }
+    LOG.info("block " + blockInfo + " meet the replication " + repl);
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testIOBusyNode() throws Exception {
+
+    FileSystem fileSys = cluster.getFileSystem(0);
+    // 1. create file
+    final Path file = new Path(dir, "testFile");
+    int repl = 1;
+    writeFile(fileSys, file, repl);
+
+    // 2. find the datanode which store this block
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(file.toString()).asFile();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    NumberReplicas replicas = bm.countNodes(firstBlock);
+    Assert.assertEquals(1, replicas.liveReplicas());
+    Assert.assertEquals(1, firstBlock.numNodes());
+
+    // 3. make datanode io busy. we delay remove operation so that we could
+    //   simulate that the datanode's io is busy.
+    DatanodeDescriptor datanode = firstBlock.getDatanode(0);
+    Logger log = mock(Logger.class);
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (datanode.getXferPort() != dn.getXferPort()) {
+        continue;
+      }
+      Set<ExtendedBlock> set = Collections
+          .synchronizedSet(new HashSet<ExtendedBlock>() {
+            @Override
+            public boolean add(ExtendedBlock block) {
+              boolean ret = super.add(block);
+              try {
+                Thread.sleep(30000);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+              }
+              return ret;
+            }
+          });
+      Field transferringBlock = DataNode.class

Review comment:
       You are doing some reflection here, can you make it into its own block 
explaining why?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      stm.flush();
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;
+    }
+  }
+
+  /*
+   * Wait till node is fully decommissioned.
+   */
+  private void waitBlockMeetReplication(BlockInfo blockInfo, Short repl) {

Review comment:
       This could be done with LamdaTestsUtils.waitFor()

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      stm.flush();
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;
+    }
+  }
+
+  /*
+   * Wait till node is fully decommissioned.
+   */
+  private void waitBlockMeetReplication(BlockInfo blockInfo, Short repl) {
+    boolean done = repl == blockInfo.numNodes();
+    while (!done) {
+      LOG.info("Waiting for repl change to " + repl + " current repl: "
+          + blockInfo.numNodes());
+      try {
+        Thread.sleep(HEARTBEAT_INTERVAL * 500);
+      } catch (InterruptedException e) {
+        // nothing
+      }
+      done = repl == blockInfo.numNodes();
+    }
+    LOG.info("block " + blockInfo + " meet the replication " + repl);
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testIOBusyNode() throws Exception {
+
+    FileSystem fileSys = cluster.getFileSystem(0);
+    // 1. create file
+    final Path file = new Path(dir, "testFile");
+    int repl = 1;
+    writeFile(fileSys, file, repl);
+
+    // 2. find the datanode which store this block
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(file.toString()).asFile();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    NumberReplicas replicas = bm.countNodes(firstBlock);
+    Assert.assertEquals(1, replicas.liveReplicas());
+    Assert.assertEquals(1, firstBlock.numNodes());
+
+    // 3. make datanode io busy. we delay remove operation so that we could
+    //   simulate that the datanode's io is busy.
+    DatanodeDescriptor datanode = firstBlock.getDatanode(0);
+    Logger log = mock(Logger.class);
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (datanode.getXferPort() != dn.getXferPort()) {
+        continue;
+      }
+      Set<ExtendedBlock> set = Collections
+          .synchronizedSet(new HashSet<ExtendedBlock>() {
+            @Override
+            public boolean add(ExtendedBlock block) {
+              boolean ret = super.add(block);
+              try {
+                Thread.sleep(30000);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+              }
+              return ret;
+            }
+          });
+      Field transferringBlock = DataNode.class
+          .getDeclaredField("transferringBlock");
+      transferringBlock.setAccessible(true);
+      Field modifiers = Field.class.getDeclaredField("modifiers");
+      modifiers.setAccessible(true);
+      modifiers.setInt(transferringBlock,
+          transferringBlock.getModifiers() & ~Modifier.FINAL);
+      transferringBlock.set(dn, set);
+
+      Field logger = DataNode.class.getDeclaredField("LOG");
+      logger.setAccessible(true);
+      modifiers = Field.class.getDeclaredField("modifiers");
+      modifiers.setAccessible(true);
+      modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
+      logger.set(null, log);
+    }
+
+    // 4. add block's replication to 2
+    bm.setReplication((short) 1, (short) 2, firstBlock);
+    waitBlockMeetReplication(firstBlock, (short) 2);
+    replicas = bm.countNodes(firstBlock);
+    Assert.assertEquals(replicas.liveReplicas(), 2);
+
+    // 5. sleep and verfiy the unnecessary transfer.
+    Thread.sleep(3000);

Review comment:
       Can this be made to wait for something? Otherwise, explain why 3 seconds?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBusyIODataNode.java
##########
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBusyIODataNode {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestBusyIODataNode
+      .class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private FSNamesystem fsn;
+  private BlockManager bm;
+
+  static final long SEED = 0xDEADBEEFL;
+  static final int BLOCK_SIZE = 8192;
+  private static final int HEARTBEAT_INTERVAL = 1;
+
+  private final Path dir = new Path("/" + this.getClass().getSimpleName());
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    fsn = cluster.getNamesystem();
+    bm = fsn.getBlockManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    writeFile(fileSys, name, repl, 2);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
+      int numOfBlocks) throws IOException {
+    writeFile(fileSys, name, repl, numOfBlocks, true);
+  }
+
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+      int repl, int numOfBlocks, boolean completeFile)
+      throws IOException {
+    // create and write a file that contains two blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+            .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) repl, BLOCK_SIZE);
+    byte[] buffer = new byte[BLOCK_SIZE * numOfBlocks];
+    Random rand = new Random(SEED);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    LOG.info("Created file " + name + " with " + repl + " replicas.");
+    if (completeFile) {
+      stm.close();
+      return null;
+    } else {
+      stm.flush();
+      // Do not close stream, return it
+      // so that it is not garbage collected
+      return stm;
+    }
+  }
+
+  /*
+   * Wait till node is fully decommissioned.
+   */
+  private void waitBlockMeetReplication(BlockInfo blockInfo, Short repl) {
+    boolean done = repl == blockInfo.numNodes();
+    while (!done) {
+      LOG.info("Waiting for repl change to " + repl + " current repl: "

Review comment:
       logger

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
##########
@@ -2394,16 +2396,22 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] 
xferTargets,
     
     int numTargets = xferTargets.length;
     if (numTargets > 0) {
-      final String xferTargetsString =
-          StringUtils.join(" ", Arrays.asList(xferTargets));
-      LOG.info("{} Starting thread to transfer {} to {}", bpReg, block,
-          xferTargetsString);
+      if (transferringBlock.contains(block)) {
+        LOG.warn(

Review comment:
       Can you explain why we are avoiding the block storm?
   How big can this set get?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 611465)
    Time Spent: 1h 40m  (was: 1.5h)

> DataTransfer block storm when datanode's io is busy.
> ----------------------------------------------------
>
>                 Key: HDFS-16070
>                 URL: https://issues.apache.org/jira/browse/HDFS-16070
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>    Affects Versions: 3.3.0, 3.2.1
>            Reporter: zhengchenyu
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> When I speed up the decommission, I found that some datanode's io is busy, 
> then I found host's load is very high, and ten thousands data transfer thread 
> are running. 
> Then I find log like below.
> {code}
> # setup datatranfer log
> 2021-06-08 13:42:37,620 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
> DatanodeRegistration(10.201.4.49:9866, 
> datanodeUuid=6c55b7cb-f8ef-445b-9cca-d82b5b077ed1, infoPort=9864, 
> infoSecurePort=0, ipcPort=9867, 
> storageInfo=lv=-57;cid=CID-37e80bd5-733a-4d7b-ba3d-b46269573c72;nsid=215490653;c=1584525570797)
>  Starting thread to transfer 
> BP-852924019-10.201.1.32-1584525570797:blk_-9223372036449848858_30963611 to 
> 10.201.7.52:9866
> 2021-06-08 13:52:36,345 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
> DatanodeRegistration(10.201.4.49:9866, 
> datanodeUuid=6c55b7cb-f8ef-445b-9cca-d82b5b077ed1, infoPort=9864, 
> infoSecurePort=0, ipcPort=9867, 
> storageInfo=lv=-57;cid=CID-37e80bd5-733a-4d7b-ba3d-b46269573c72;nsid=215490653;c=1584525570797)
>  Starting thread to transfer 
> BP-852924019-10.201.1.32-1584525570797:blk_-9223372036449848858_30963611 to 
> 10.201.7.31:9866
> 2021-06-08 14:02:37,197 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
> DatanodeRegistration(10.201.4.49:9866, 
> datanodeUuid=6c55b7cb-f8ef-445b-9cca-d82b5b077ed1, infoPort=9864, 
> infoSecurePort=0, ipcPort=9867, 
> storageInfo=lv=-57;cid=CID-37e80bd5-733a-4d7b-ba3d-b46269573c72;nsid=215490653;c=1584525570797)
>  Starting thread to transfer 
> BP-852924019-10.201.1.32-1584525570797:blk_-9223372036449848858_30963611 to 
> 10.201.16.50:9866
> # datatranfer done log
> 2021-06-08 13:54:08,134 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
> DataTransfer, at bd-tz1-hadoop-004049.zeus.lianjia.com:9866: Transmitted 
> BP-852924019-10.201.1.32-1584525570797:blk_-9223372036449848858_30963611 
> (numBytes=7457424) to /10.201.7.52:9866
> 2021-06-08 14:10:47,170 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
> DataTransfer, at bd-tz1-hadoop-004049.zeus.lianjia.com:9866: Transmitted 
> BP-852924019-10.201.1.32-1584525570797:blk_-9223372036449848858_30963611 
> (numBytes=7457424) to /10.201.16.50:9866
> {code}
> You will see last datatranfser thread was done on 13:54:08, but next 
> datatranfser was start at 13:52:36. 
> If datatranfser was not done in 10min(pending timeout + check interval), then 
> next datatranfser for same block will be running. Then disk and network are 
> heavy.
> Note: decommission ec block will trigger this problem easily, becuase every 
> ec internal block are unique. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to