HDFS-9917. IBR accumulate more objects when SNN was down for sometime. 
(Contributed by Brahma Reddy Battula)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/818d6b79
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/818d6b79
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/818d6b79

Branch: refs/heads/HDFS-7240
Commit: 818d6b799eead13a17a0214172df60a269b046fb
Parents: f6b1a81
Author: Vinayakumar B <vinayakum...@apache.org>
Authored: Tue Apr 5 09:49:39 2016 +0800
Committer: Vinayakumar B <vinayakum...@apache.org>
Committed: Tue Apr 5 09:49:39 2016 +0800

----------------------------------------------------------------------
 .../hdfs/server/datanode/BPServiceActor.java    |  5 +
 .../datanode/IncrementalBlockReportManager.java |  9 ++
 .../server/datanode/TestBPOfferService.java     | 96 +++++++++++++++++++-
 3 files changed, 107 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/818d6b79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 49f64c2..39f8219 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -798,6 +798,11 @@ class BPServiceActor implements Runnable {
       // and re-register
       register(nsInfo);
       scheduler.scheduleHeartbeat();
+      // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
+      // for sometime.
+      if (state == HAServiceState.STANDBY) {
+        ibrManager.clearIBRs();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/818d6b79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
index b9b348a..e95142d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
@@ -258,4 +258,13 @@ class IncrementalBlockReportManager {
       }
     }
   }
+
+  void clearIBRs() {
+    pendingIBRs.clear();
+  }
+
+  @VisibleForTesting
+  int getPendingIBRSize() {
+    return pendingIBRs.size();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/818d6b79/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 95a103e..29db702 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -48,10 +49,12 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -92,7 +95,10 @@ public class TestBPOfferService {
 
   private DatanodeProtocolClientSideTranslatorPB mockNN1;
   private DatanodeProtocolClientSideTranslatorPB mockNN2;
-  private final NNHAStatusHeartbeat[] mockHaStatuses = new 
NNHAStatusHeartbeat[2];
+  private final NNHAStatusHeartbeat[] mockHaStatuses =
+      new NNHAStatusHeartbeat[2];
+  private final DatanodeCommand[][] datanodeCommands =
+      new DatanodeCommand[2][0];
   private final int[] heartbeatCounts = new int[2];
   private DataNode mockDn;
   private FsDatasetSpi<?> mockFSDataset;
@@ -147,6 +153,7 @@ public class TestBPOfferService {
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean());
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
+    datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
   }
   
@@ -165,9 +172,12 @@ public class TestBPOfferService {
     @Override
     public HeartbeatResponse answer(InvocationOnMock invocation) throws 
Throwable {
       heartbeatCounts[nnIdx]++;
-      return new HeartbeatResponse(new DatanodeCommand[0],
-          mockHaStatuses[nnIdx], null,
+      HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
+          datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
           ThreadLocalRandom.current().nextLong() | 1L);
+      //reset the command
+      datanodeCommands[nnIdx] = new DatanodeCommand[0];
+      return heartbeatResponse;
     }
   }
 
@@ -709,4 +719,84 @@ public class TestBPOfferService {
       bpos.join();
     }
   }
+
+  /*
+   * HDFS-9917 : Standby IBR accumulation when Standby was down.
+   */
+  @Test
+  public void testIBRClearanceForStandbyOnReRegister() throws Exception {
+    final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      // Return nothing when active Active Namenode gets IBRs
+      Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted(
+          Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
+              .any(StorageReceivedDeletedBlocks[].class));
+
+      final IOException re = new IOException(
+          "Standby NN is currently not able to process IBR");
+
+      final AtomicBoolean ibrReported = new AtomicBoolean(false);
+      // throw exception for standby when first IBR is receieved
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          ibrReported.set(true);
+          throw re;
+        }
+      }).when(mockNN2).blockReceivedAndDeleted(
+          Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
+              .any(StorageReceivedDeletedBlocks[].class));
+
+      DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
+      Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
+      // Add IBRs
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
+      // Send heartbeat so that the BpServiceActor can send IBR to
+      // namenode
+      bpos.triggerHeartbeatForTests();
+      // Wait till first IBR is received at standbyNN. Just for confirmation.
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return ibrReported.get();
+        }
+      }, 100, 1000);
+
+      // Send register command back to Datanode to reRegister().
+      // After reRegister IBRs should be cleared.
+      datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() };
+      assertEquals(
+          "IBR size before reRegister should be non-0", 1, getStandbyIBRSize(
+              bpos));
+      bpos.triggerHeartbeatForTests();
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return getStandbyIBRSize(bpos) == 0;
+        }
+      }, 100, 1000);
+    } finally {
+      bpos.stop();
+      bpos.join();
+    }
+  }
+
+  private int getStandbyIBRSize(BPOfferService bpos) {
+    List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
+    for (BPServiceActor bpServiceActor : bpServiceActors) {
+      if (bpServiceActor.state == HAServiceState.STANDBY) {
+        return bpServiceActor.getIbrManager().getPendingIBRSize();
+      }
+    }
+    return -1;
+  }
 }

Reply via email to