This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 0152eb5  HDDS-4404. Datanode can go OOM when a Recon or SCM Server is 
very slow in processing reports (#1601)
0152eb5 is described below

commit 0152eb5a728aaccf23cefccdd4f406b5fd45b6ca
Author: Siyao Meng <[email protected]>
AuthorDate: Mon Dec 14 03:28:22 2020 -0800

    HDDS-4404. Datanode can go OOM when a Recon or SCM Server is very slow in 
processing reports (#1601)
    
    * HDDS-4404. Datanode can go OOM when a Recon or SCM Server is very slow in 
processing reports.
    
    Change-Id: Idf6394d0bde2fb1c9fe99a96c49fa28677e0e527
    
    * Checkstyle.
    
    Change-Id: I6c4729634905bf6bba5bb6483c393d3c7511a78e
    
    * Add mock-maker-inline so UT can mock final classes.
    
    Change-Id: I63470510ed2089438f08f4d3cea321c1b5f01c3c
    
    * Remove throw exception in addReport.
    
    Change-Id: Id3ff6222ed279568d32876afc0a6a5ee1316c926
    
    * Fix existing UT TestStateContext#testReportAPIs.
    
    Change-Id: I25020b4411f4163a2ec134431793bfe663e5f6bf
    
    * Remove unused imports.
    
    Change-Id: I2dc93c42fc7468b37d41a179ca047ee385fc8896
    
    * Work around mockito bug in UT TestCreatePipelineCommandHandler: caused by 
adding mock-maker-inline.
    
    Change-Id: I0f21c508af1f7b36d7632aaa3b9173a17056f211
    
    * Add UT testContainerNodePipelineReportAPIs.
    
    Change-Id: I8d67a1b6e2d9043fb1479e2547d2e690932f7985
    
    * Clean up.
    
    Change-Id: Iaec544ba6fb211d0d71fe03b26b0fc70199dc882
    
    * @VisibleForTesting for get container/node/pipeline reports as they are 
only used in UT, at least at the moment.
    
    Change-Id: I72070fde8df220ecebf2fb6d1015d19f65cee11e
    
    * - Renamed `reports` to `incrementalReportsQueue` to better reflect the 
functionality of the map.
    - Added `acceptedIncrementalReportTypeSet` for rigorous report type 
checking.
    - `StateContext#addReport` now explicitly throws, instead of hiding any 
possible NPEs. Rationale: As I think again, those reports are all internally 
generated rather than dependent on direct user input. It's better to catch any 
problem early (if any).
    - `StateContext#addReport` also strictly checks for report type now. This 
intends to force any future additions of report types to be explicitly allowed. 
Only 5 report types (names) are expected:
    ```
    hadoop.hdds.NodeReportProto
    hadoop.hdds.ContainerReportsProto
    hadoop.hdds.IncrementalContainerReportProto
    hadoop.hdds.CommandStatusReportsProto
    hadoop.hdds.PipelineReportsProto
    ```
    - `StateContext#putBackReports` now checks ALL reports in the list to be 
put back. I'd expect the list to be small. And since the logic is run on each 
DataNode, it shouldn't become a performance bottleneck. Also added debug 
logging just in case we need to diagnose it in the future.
    - Added new UT `testPutBackReports` and `testReportQueueWithAddReports` to 
check that `putBackReports` and `getReports` behaves.
    
    Change-Id: I48aca3d4a422d52532b39c9462a30519c9cbe0eb
    
    * Make containerReports, nodeReport, pipelineReports atomic and final.
    
    Change-Id: I44aba0ff093fa0ff5f9a35a74ff4126d9c73dd7b
    
    * Checkstyle.
    
    Change-Id: I545ca8c636b8bbd1239a936603b6efcbfbd10b88
    
    * Clean up.
    
    Change-Id: I5cdc38a297cf3d91bad8335eb36f2ba6b2009c10
    
    * Remove assertion in putBackReports as requestBuilder might include 
Container/Node/Pipeline reports.
    
    Thanks Aravindan.
    
    Change-Id: Ic39f53de8bb91a179bd5dc3301aa89ffecf5c076
    
    * Empty commit to retrigger CI.
    
    Change-Id: I46fb65ce2c81cb607dd10a9ee829d829ec7a8cfc
    
    * Empty commit to retrigger CI.
    
    Change-Id: I9d3b5ce0b81d457344ecd6b4ad6e141169bd89d3
    
    * Retrigger.
    
    Change-Id: I4e9dace08152e21c2bb5da564487d776a84c4ade
    
    * Allow null as input to StateContext#addReport() as this blocked UT 
TestStorageContainerManager#testBlockDeletionTransactions.
    
    Change-Id: I16c8bfcfbd619a23241ea6dda9b95265714984bb
    
    * Retrigger CI
    
    Change-Id: I64203ebbaeebdb4c598bb8dad578579c82bf604b
    
    * trigger new CI check
    
    Co-authored-by: Doroszlai, Attila <[email protected]>
---
 .../common/statemachine/StateContext.java          | 143 +++++++++--
 .../states/endpoint/HeartbeatEndpointTask.java     |  16 +-
 .../common/statemachine/TestStateContext.java      | 286 ++++++++++++++++++++-
 .../TestCreatePipelineCommandHandler.java          |   6 +-
 .../org.mockito.plugins.MockMaker                  |  16 ++
 5 files changed, 435 insertions(+), 32 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4cd769f..f87561a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -32,15 +32,23 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import 
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
@@ -63,6 +71,27 @@ import org.slf4j.LoggerFactory;
  * Current Context of State Machine.
  */
 public class StateContext {
+
+  @VisibleForTesting
+  final static String CONTAINER_REPORTS_PROTO_NAME =
+      ContainerReportsProto.getDescriptor().getFullName();
+  @VisibleForTesting
+  final static String NODE_REPORT_PROTO_NAME =
+      NodeReportProto.getDescriptor().getFullName();
+  @VisibleForTesting
+  final static String PIPELINE_REPORTS_PROTO_NAME =
+      PipelineReportsProto.getDescriptor().getFullName();
+  @VisibleForTesting
+  final static String COMMAND_STATUS_REPORTS_PROTO_NAME =
+      CommandStatusReportsProto.getDescriptor().getFullName();
+  @VisibleForTesting
+  final static String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME =
+      IncrementalContainerReportProto.getDescriptor().getFullName();
+  // Accepted types of reports that can be queued to incrementalReportsQueue
+  private final static Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
+      Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
+          INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
+
   static final Logger LOG =
       LoggerFactory.getLogger(StateContext.class);
   private final Queue<SCMCommand> commandQueue;
@@ -72,7 +101,13 @@ public class StateContext {
   private final AtomicLong stateExecutionCount;
   private final ConfigurationSource conf;
   private final Set<InetSocketAddress> endpoints;
-  private final Map<InetSocketAddress, List<GeneratedMessage>> reports;
+  // Only the latest full report of each type is kept
+  private final AtomicReference<GeneratedMessage> containerReports;
+  private final AtomicReference<GeneratedMessage> nodeReport;
+  private final AtomicReference<GeneratedMessage> pipelineReports;
+  // Incremental reports are queued in the map below
+  private final Map<InetSocketAddress, List<GeneratedMessage>>
+      incrementalReportsQueue;
   private final Map<InetSocketAddress, Queue<ContainerAction>> 
containerActions;
   private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
   private DatanodeStateMachine.DatanodeStates state;
@@ -102,7 +137,10 @@ public class StateContext {
     this.parent = parent;
     commandQueue = new LinkedList<>();
     cmdStatusMap = new ConcurrentHashMap<>();
-    reports = new HashMap<>();
+    incrementalReportsQueue = new HashMap<>();
+    containerReports = new AtomicReference<>();
+    nodeReport = new AtomicReference<>();
+    pipelineReports = new AtomicReference<>();
     endpoints = new HashSet<>();
     containerActions = new HashMap<>();
     pipelineActions = new HashMap<>();
@@ -190,17 +228,34 @@ public class StateContext {
   public boolean getShutdownOnError() {
     return shutdownOnError;
   }
+
   /**
    * Adds the report to report queue.
    *
    * @param report report to be added
    */
   public void addReport(GeneratedMessage report) {
-    if (report != null) {
-      synchronized (reports) {
-        for (InetSocketAddress endpoint : endpoints) {
-          reports.get(endpoint).add(report);
+    if (report == null) {
+      return;
+    }
+    final Descriptor descriptor = report.getDescriptorForType();
+    Preconditions.checkState(descriptor != null);
+    final String reportType = descriptor.getFullName();
+    Preconditions.checkState(reportType != null);
+    for (InetSocketAddress endpoint : endpoints) {
+      if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
+        containerReports.set(report);
+      } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
+        nodeReport.set(report);
+      } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
+        pipelineReports.set(report);
+      } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
+        synchronized (incrementalReportsQueue) {
+          incrementalReportsQueue.get(endpoint).add(report);
         }
+      } else {
+        throw new IllegalArgumentException(
+            "Unidentified report message type: " + reportType);
       }
     }
   }
@@ -214,9 +269,24 @@ public class StateContext {
    */
   public void putBackReports(List<GeneratedMessage> reportsToPutBack,
                              InetSocketAddress endpoint) {
-    synchronized (reports) {
-      if (reports.containsKey(endpoint)){
-        reports.get(endpoint).addAll(0, reportsToPutBack);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("endpoint: {}, size of reportsToPutBack: {}",
+          endpoint, reportsToPutBack.size());
+    }
+    // We don't expect too much reports to be put back
+    for (GeneratedMessage report : reportsToPutBack) {
+      final Descriptor descriptor = report.getDescriptorForType();
+      Preconditions.checkState(descriptor != null);
+      final String reportType = descriptor.getFullName();
+      Preconditions.checkState(reportType != null);
+      if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
+        throw new IllegalArgumentException(
+            "Unaccepted report message type: " + reportType);
+      }
+    }
+    synchronized (incrementalReportsQueue) {
+      if (incrementalReportsQueue.containsKey(endpoint)){
+        incrementalReportsQueue.get(endpoint).addAll(0, reportsToPutBack);
       }
     }
   }
@@ -232,6 +302,22 @@ public class StateContext {
     return getReports(endpoint, Integer.MAX_VALUE);
   }
 
+  List<GeneratedMessage> getIncrementalReports(
+      InetSocketAddress endpoint, int maxLimit) {
+    List<GeneratedMessage> reportsToReturn = new LinkedList<>();
+    synchronized (incrementalReportsQueue) {
+      List<GeneratedMessage> reportsForEndpoint =
+          incrementalReportsQueue.get(endpoint);
+      if (reportsForEndpoint != null) {
+        List<GeneratedMessage> tempList = reportsForEndpoint.subList(
+            0, min(reportsForEndpoint.size(), maxLimit));
+        reportsToReturn.addAll(tempList);
+        tempList.clear();
+      }
+    }
+    return reportsToReturn;
+  }
+
   /**
    * Returns available reports from the report queue with a max limit on
    * list size, or empty list if the queue is empty.
@@ -240,15 +326,19 @@ public class StateContext {
    */
   public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
                                            int maxLimit) {
-    List<GeneratedMessage> reportsToReturn = new LinkedList<>();
-    synchronized (reports) {
-      List<GeneratedMessage> reportsForEndpoint = reports.get(endpoint);
-      if (reportsForEndpoint != null) {
-        List<GeneratedMessage> tempList = reportsForEndpoint.subList(
-            0, min(reportsForEndpoint.size(), maxLimit));
-        reportsToReturn.addAll(tempList);
-        tempList.clear();
-      }
+    List<GeneratedMessage> reportsToReturn =
+        getIncrementalReports(endpoint, maxLimit);
+    GeneratedMessage report = containerReports.get();
+    if (report != null) {
+      reportsToReturn.add(report);
+    }
+    report = nodeReport.get();
+    if (report != null) {
+      reportsToReturn.add(report);
+    }
+    report = pipelineReports.get();
+    if (report != null) {
+      reportsToReturn.add(report);
     }
     return reportsToReturn;
   }
@@ -580,7 +670,22 @@ public class StateContext {
       this.endpoints.add(endpoint);
       this.containerActions.put(endpoint, new LinkedList<>());
       this.pipelineActions.put(endpoint, new LinkedList<>());
-      this.reports.put(endpoint, new LinkedList<>());
+      this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
     }
   }
+
+  @VisibleForTesting
+  public GeneratedMessage getContainerReports() {
+    return containerReports.get();
+  }
+
+  @VisibleForTesting
+  public GeneratedMessage getNodeReport() {
+    return nodeReport.get();
+  }
+
+  @VisibleForTesting
+  public GeneratedMessage getPipelineReports() {
+    return pipelineReports.get();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index cea4295..4e436c4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -142,15 +142,15 @@ public class HeartbeatEndpointTask
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending heartbeat message :: {}", request.toString());
       }
-      SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
+      SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint()
           .sendHeartbeat(request);
-      processResponse(reponse, datanodeDetailsProto);
+      processResponse(response, datanodeDetailsProto);
       rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
       rpcEndpoint.zeroMissedCount();
     } catch (IOException ex) {
+      Preconditions.checkState(requestBuilder != null);
       // put back the reports which failed to be sent
       putBackReports(requestBuilder);
-
       rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
@@ -161,12 +161,9 @@ public class HeartbeatEndpointTask
   // TODO: Make it generic.
   private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) 
{
     List<GeneratedMessage> reports = new LinkedList<>();
-    if (requestBuilder.hasContainerReport()) {
-      reports.add(requestBuilder.getContainerReport());
-    }
-    if (requestBuilder.hasNodeReport()) {
-      reports.add(requestBuilder.getNodeReport());
-    }
+    // We only put back CommandStatusReports and IncrementalContainerReport
+    // because those are incremental. Container/Node/PipelineReport are
+    // accumulative so we can keep only the latest of each.
     if (requestBuilder.getCommandStatusReportsCount() != 0) {
       reports.addAll(requestBuilder.getCommandStatusReportsList());
     }
@@ -194,6 +191,7 @@ public class HeartbeatEndpointTask
           } else {
             requestBuilder.setField(descriptor, report);
           }
+          break;
         }
       }
     }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index d3032c3..e9c39d3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -23,11 +23,19 @@ import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static org.apache.hadoop.test.GenericTestUtils.waitFor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
@@ -37,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -53,6 +62,271 @@ import com.google.protobuf.GeneratedMessage;
  */
 public class TestStateContext {
 
+  /**
+   * Only accepted types of reports can be put back to the report queue.
+   */
+  @Test
+  public void testPutBackReports() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachineMock =
+        mock(DatanodeStateMachine.class);
+
+    StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
+        datanodeStateMachineMock);
+    InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+    ctx.addEndpoint(scm1);
+    InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
+    ctx.addEndpoint(scm2);
+
+    Map<String, Integer> expectedReportCount = new HashMap<>();
+
+    // Case 1: Put back an incremental report
+
+    ctx.putBackReports(Collections.singletonList(newMockReport(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME)), scm1);
+    // scm2 report queue should be empty
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // Check scm1 queue
+    expectedReportCount.put(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.clear();
+
+    ctx.putBackReports(Collections.singletonList(newMockReport(
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)), scm2);
+    // scm1 report queue should be empty
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    // Check scm2 queue
+    expectedReportCount.put(
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.clear();
+
+    // Case 2: Attempt to put back a full report
+
+    try {
+      ctx.putBackReports(Collections.singletonList(
+          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1);
+      fail("Should throw exception when putting back unaccepted reports!");
+    } catch (IllegalArgumentException ignored) {
+    }
+    try {
+      ctx.putBackReports(Collections.singletonList(
+          newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2);
+      fail("Should throw exception when putting back unaccepted reports!");
+    } catch (IllegalArgumentException ignored) {
+    }
+    try {
+      ctx.putBackReports(Collections.singletonList(
+          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1);
+      fail("Should throw exception when putting back unaccepted reports!");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // Case 3: Put back mixed types of incremental reports
+
+    ctx.putBackReports(Arrays.asList(
+        newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
+        newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
+        newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
+        newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
+        newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME)
+    ), scm1);
+    // scm2 report queue should be empty
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // Check scm1 queue
+    expectedReportCount.put(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 2);
+    expectedReportCount.put(
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 3);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.clear();
+
+    // Case 4: Attempt to put back mixed types of full reports
+
+    try {
+      ctx.putBackReports(Arrays.asList(
+          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
+          newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
+          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)
+      ), scm1);
+      fail("Should throw exception when putting back unaccepted reports!");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // Case 5: Attempt to put back mixed full and incremental reports
+
+    try {
+      ctx.putBackReports(Arrays.asList(
+          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
+          newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
+          newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)
+      ), scm2);
+      fail("Should throw exception when putting back unaccepted reports!");
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test
+  public void testReportQueueWithAddReports() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachineMock =
+        mock(DatanodeStateMachine.class);
+
+    StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
+        datanodeStateMachineMock);
+    InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+    ctx.addEndpoint(scm1);
+    InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
+    ctx.addEndpoint(scm2);
+    // Check initial state
+    assertEquals(0, ctx.getAllAvailableReports(scm1).size());
+    assertEquals(0, ctx.getAllAvailableReports(scm2).size());
+
+    Map<String, Integer> expectedReportCount = new HashMap<>();
+
+    // Add a bunch of ContainerReports
+    batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+    // Add a bunch of NodeReport
+    batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+    // Add a bunch of PipelineReports
+    batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+    // Add a bunch of PipelineReports
+    batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+    // Add a bunch of CommandStatusReports
+    batchAddReports(ctx,
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+    expectedReportCount.put(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+    // Should keep all of them
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.remove(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
+
+    // Add a bunch of IncrementalContainerReport
+    batchAddReports(ctx,
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
+    // Should keep all of them
+    expectedReportCount.put(
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.remove(
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
+  }
+
+  void batchAddReports(StateContext ctx, String reportName, int count) {
+    for (int i = 0; i < count; i++) {
+      ctx.addReport(newMockReport(reportName));
+    }
+  }
+
+  void checkReportCount(List<GeneratedMessage> reports,
+      Map<String, Integer> expectedReportCount) {
+    Map<String, Integer> reportCount = new HashMap<>();
+    for (GeneratedMessage report : reports) {
+      final String reportName = report.getDescriptorForType().getFullName();
+      reportCount.put(reportName, reportCount.getOrDefault(reportName, 0) + 1);
+    }
+    // Verify
+    assertEquals(expectedReportCount, reportCount);
+  }
+
+  /**
+   * Check if Container, Node and Pipeline report APIs work as expected.
+   */
+  @Test
+  public void testContainerNodePipelineReportAPIs() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachineMock =
+        mock(DatanodeStateMachine.class);
+
+    // ContainerReports
+    StateContext context1 = newStateContext(conf, datanodeStateMachineMock);
+    assertNull(context1.getContainerReports());
+    assertNull(context1.getNodeReport());
+    assertNull(context1.getPipelineReports());
+    GeneratedMessage containerReports =
+        newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
+    context1.addReport(containerReports);
+
+    assertNotNull(context1.getContainerReports());
+    assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME,
+        context1.getContainerReports().getDescriptorForType().getFullName());
+    assertNull(context1.getNodeReport());
+    assertNull(context1.getPipelineReports());
+
+    // NodeReport
+    StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
+    GeneratedMessage nodeReport =
+        newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
+    context2.addReport(nodeReport);
+
+    assertNull(context2.getContainerReports());
+    assertNotNull(context2.getNodeReport());
+    assertEquals(StateContext.NODE_REPORT_PROTO_NAME,
+        context2.getNodeReport().getDescriptorForType().getFullName());
+    assertNull(context2.getPipelineReports());
+
+    // PipelineReports
+    StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
+    GeneratedMessage pipelineReports =
+        newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
+    context3.addReport(pipelineReports);
+
+    assertNull(context3.getContainerReports());
+    assertNull(context3.getNodeReport());
+    assertNotNull(context3.getPipelineReports());
+    assertEquals(StateContext.PIPELINE_REPORTS_PROTO_NAME,
+        context3.getPipelineReports().getDescriptorForType().getFullName());
+  }
+
+  private StateContext newStateContext(OzoneConfiguration conf,
+      DatanodeStateMachine datanodeStateMachineMock) {
+    StateContext stateContext = new StateContext(conf,
+        DatanodeStates.getInitState(), datanodeStateMachineMock);
+    InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+    stateContext.addEndpoint(scm1);
+    InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
+    stateContext.addEndpoint(scm2);
+    return stateContext;
+  }
+
+  private GeneratedMessage newMockReport(String messageType) {
+    GeneratedMessage pipelineReports = mock(GeneratedMessage.class);
+    when(pipelineReports.getDescriptorForType()).thenReturn(
+        mock(Descriptor.class));
+    when(pipelineReports.getDescriptorForType().getFullName()).thenReturn(
+        messageType);
+    return pipelineReports;
+  }
+
   @Test
   public void testReportAPIs() {
     OzoneConfiguration conf = new OzoneConfiguration();
@@ -64,8 +338,14 @@ public class TestStateContext {
     InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
     InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
 
-    // Try to add report with endpoint. Should not be stored.
-    stateContext.addReport(mock(GeneratedMessage.class));
+    GeneratedMessage generatedMessage = mock(GeneratedMessage.class);
+    when(generatedMessage.getDescriptorForType()).thenReturn(
+        mock(Descriptor.class));
+    when(generatedMessage.getDescriptorForType().getFullName()).thenReturn(
+        "hadoop.hdds.CommandStatusReportsProto");
+
+    // Try to add report with zero endpoint. Should not be stored.
+    stateContext.addReport(generatedMessage);
     assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
 
     // Add 2 scm endpoints.
@@ -73,7 +353,7 @@ public class TestStateContext {
     stateContext.addEndpoint(scm2);
 
     // Add report. Should be added to all endpoints.
-    stateContext.addReport(mock(GeneratedMessage.class));
+    stateContext.addReport(generatedMessage);
     List<GeneratedMessage> allAvailableReports =
         stateContext.getAllAvailableReports(scm1);
     assertEquals(1, allAvailableReports.size());
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index febd1c3..d23f1c4 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -44,6 +44,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -79,7 +80,10 @@ public class TestCreatePipelineCommandHandler {
     Mockito.when(raftClient.getGroupManagementApi(
         Mockito.any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
     PowerMockito.mockStatic(RaftClient.class);
-    PowerMockito.when(RaftClient.newBuilder()).thenReturn(builder);
+    // Work around for mockito bug:
+    // https://github.com/powermock/powermock/issues/992
+    PowerMockito.when(RaftClient.newBuilder()).thenAnswer(
+        (Answer<RaftClient.Builder>) invocation -> builder);
   }
 
   private RaftClient.Builder mockRaftClientBuilder() {
diff --git 
a/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
 
b/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..3c9e1c8
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+mock-maker-inline
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to