This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 39954ad  HDDS-5111. DataNode should not always report full information 
in heartbeat (#2182)
39954ad is described below

commit 39954adeef38d4baba0c231d2bf68a4b0873e01f
Author: Jackson Yao <[email protected]>
AuthorDate: Tue Jun 8 02:03:56 2021 +0800

    HDDS-5111. DataNode should not always report full information in heartbeat 
(#2182)
---
 .../container/common/report/ReportPublisher.java   |   8 +-
 .../common/statemachine/StateContext.java          | 131 ++++++++++++-------
 .../states/endpoint/HeartbeatEndpointTask.java     |   5 +-
 .../transport/server/ratis/XceiverServerRatis.java |   3 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   2 +-
 .../common/report/TestReportPublisher.java         |   2 +-
 .../common/statemachine/TestStateContext.java      | 140 ++++++++++-----------
 .../states/endpoint/TestHeartbeatEndpointTask.java |  14 ++-
 .../commandhandler/TestBlockDeletion.java          |   3 +-
 9 files changed, 179 insertions(+), 129 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index e6b4106..8d4820e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 
@@ -80,7 +81,12 @@ public abstract class ReportPublisher<T extends 
GeneratedMessage>
    */
   private void publishReport() {
     try {
-      context.addReport(getReport());
+      GeneratedMessage report = getReport();
+      if (report instanceof CommandStatusReportsProto) {
+        context.addIncrementalReport(report);
+      } else {
+        context.refreshFullReport(report);
+      }
     } catch (IOException e) {
       LOG.error("Exception while publishing report.", e);
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 5c3aaa9..ca73468 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -40,7 +41,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
 import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
@@ -63,10 +63,11 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import static java.lang.Math.min;
-import org.apache.commons.collections.CollectionUtils;
-
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
 import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
+
+import org.apache.commons.collections.CollectionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,10 +94,6 @@ public class StateContext {
   @VisibleForTesting
   static final String CRL_STATUS_REPORT_PROTO_NAME =
       CRLStatusReport.getDescriptor().getFullName();
-  // Accepted types of reports that can be queued to incrementalReportsQueue
-  private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
-      Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
-          INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
 
   static final Logger LOG =
       LoggerFactory.getLogger(StateContext.class);
@@ -121,6 +118,14 @@ public class StateContext {
   private boolean shutdownOnError = false;
   private boolean shutdownGracefully = false;
   private final AtomicLong threadPoolNotAvailableCount;
+  // Endpoint -> ReportType -> Boolean of whether the full report should be
+  //  queued in getFullReports call.
+  private final Map<InetSocketAddress,
+      Map<String, AtomicBoolean>> fullReportSendIndicator;
+  // List of supported full report types.
+  private final List<String> fullReportTypeList;
+  // ReportType -> Report.
+  private final Map<String, AtomicReference<GeneratedMessage>> type2Reports;
 
   /**
    * term of latest leader SCM, extract from SCMCommand.
@@ -167,6 +172,24 @@ public class StateContext {
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
     threadPoolNotAvailableCount = new AtomicLong(0);
+    fullReportSendIndicator = new HashMap<>();
+    fullReportTypeList = new ArrayList<>();
+    type2Reports = new HashMap<>();
+    initReportTypeCollection();
+  }
+
+  /**
+   * init related ReportType Collections.
+   */
+  private void initReportTypeCollection(){
+    fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME);
+    type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports);
+    fullReportTypeList.add(NODE_REPORT_PROTO_NAME);
+    type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport);
+    fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME);
+    type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
+    fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
+    type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
   }
 
   /**
@@ -254,7 +277,7 @@ public class StateContext {
    *
    * @param report report to be added
    */
-  public void addReport(GeneratedMessage report) {
+  public void addIncrementalReport(GeneratedMessage report) {
     if (report == null) {
       return;
     }
@@ -262,23 +285,38 @@ public class StateContext {
     Preconditions.checkState(descriptor != null);
     final String reportType = descriptor.getFullName();
     Preconditions.checkState(reportType != null);
-    if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
-      containerReports.set(report);
-    } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
-      nodeReport.set(report);
-    } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
-      pipelineReports.set(report);
-    } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
-      synchronized (incrementalReportsQueue) {
-        for (InetSocketAddress endpoint : endpoints) {
-          incrementalReportsQueue.get(endpoint).add(report);
-        }
+    // in some case, we want to add a fullReportType message
+    // as an incremental message.
+    // see XceiverServerRatis#sendPipelineReport
+    synchronized (incrementalReportsQueue) {
+      for (InetSocketAddress endpoint : endpoints) {
+        incrementalReportsQueue.get(endpoint).add(report);
       }
-    } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
-      crlStatusReport.set(report);
-    } else {
+    }
+  }
+
+  /**
+   * refresh Full report.
+   *
+   * @param report report to be refreshed
+   */
+  public void refreshFullReport(GeneratedMessage report) {
+    if (report == null) {
+      return;
+    }
+    final Descriptor descriptor = report.getDescriptorForType();
+    Preconditions.checkState(descriptor != null);
+    final String reportType = descriptor.getFullName();
+    Preconditions.checkState(reportType != null);
+    if (!fullReportTypeList.contains(reportType)) {
       throw new IllegalArgumentException(
-          "Unidentified report message type: " + reportType);
+          "not full report message type: " + reportType);
+    }
+    type2Reports.get(reportType).set(report);
+    if (fullReportSendIndicator != null) {
+      for (Map<String, AtomicBoolean> mp : fullReportSendIndicator.values()) {
+        mp.get(reportType).set(true);
+      }
     }
   }
 
@@ -301,10 +339,6 @@ public class StateContext {
       Preconditions.checkState(descriptor != null);
       final String reportType = descriptor.getFullName();
       Preconditions.checkState(reportType != null);
-      if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
-        throw new IllegalArgumentException(
-            "Unaccepted report message type: " + reportType);
-      }
     }
     synchronized (incrementalReportsQueue) {
       if (incrementalReportsQueue.containsKey(endpoint)){
@@ -340,23 +374,27 @@ public class StateContext {
     return reportsToReturn;
   }
 
-  List<GeneratedMessage> getNonIncrementalReports() {
+  List<GeneratedMessage> getFullReports(
+      InetSocketAddress endpoint) {
+    Map<String, AtomicBoolean> mp = fullReportSendIndicator.get(endpoint);
     List<GeneratedMessage> nonIncrementalReports = new LinkedList<>();
-    GeneratedMessage report = containerReports.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
-    }
-    report = nodeReport.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
-    }
-    report = pipelineReports.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
-    }
-    report = crlStatusReport.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
+    if (null != mp){
+      for (Map.Entry<String, AtomicBoolean> kv : mp.entrySet()) {
+        if (kv.getValue().get()) {
+          String reportType = kv.getKey();
+          final AtomicReference<GeneratedMessage> ref =
+              type2Reports.get(reportType);
+          if (ref == null) {
+            throw new RuntimeException(reportType + " is not a valid full "
+                + "report type!");
+          }
+          final GeneratedMessage msg = ref.get();
+          if (msg != null) {
+            nonIncrementalReports.add(msg);
+            mp.get(reportType).set(false);
+          }
+        }
+      }
     }
     return nonIncrementalReports;
   }
@@ -372,7 +410,7 @@ public class StateContext {
     if (maxLimit < 0) {
       throw new IllegalArgumentException("Illegal maxLimit value: " + 
maxLimit);
     }
-    List<GeneratedMessage> reports = getNonIncrementalReports();
+    List<GeneratedMessage> reports = getFullReports(endpoint);
     if (maxLimit <= reports.size()) {
       return reports.subList(0, maxLimit);
     } else {
@@ -800,6 +838,11 @@ public class StateContext {
       this.containerActions.put(endpoint, new LinkedList<>());
       this.pipelineActions.put(endpoint, new LinkedList<>());
       this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
+      Map<String, AtomicBoolean> mp = new HashMap<>();
+      fullReportTypeList.forEach(e -> {
+        mp.putIfAbsent(e, new AtomicBoolean(true));
+      });
+      this.fullReportSendIndicator.putIfAbsent(endpoint, mp);
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 60b7978..cb65e37 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -150,7 +150,7 @@ public class HeartbeatEndpointTask
     } catch (IOException ex) {
       Preconditions.checkState(requestBuilder != null);
       // put back the reports which failed to be sent
-      putBackReports(requestBuilder);
+      putBackIncrementalReports(requestBuilder);
       rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
@@ -159,7 +159,8 @@ public class HeartbeatEndpointTask
   }
 
   // TODO: Make it generic.
-  private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) 
{
+  private void putBackIncrementalReports(
+      SCMHeartbeatRequestProto.Builder requestBuilder) {
     List<GeneratedMessage> reports = new LinkedList<>();
     // We only put back CommandStatusReports and IncrementalContainerReport
     // because those are incremental. Container/Node/PipelineReport are
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6fd2706..3a2aec9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -917,7 +917,8 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
   private void sendPipelineReport() {
     if (context !=  null) {
       // TODO: Send IncrementalPipelineReport instead of full PipelineReport
-      
context.addReport(context.getParent().getContainer().getPipelineReport());
+      context.addIncrementalReport(
+          context.getParent().getContainer().getPipelineReport());
       context.getParent().triggerHeartbeat();
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 736e3d6..2312c0b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -130,7 +130,7 @@ public class OzoneContainer {
           .newBuilder()
           .addReport(containerReplicaProto)
           .build();
-      context.addReport(icr);
+      context.addIncrementalReport(icr);
       context.getParent().triggerHeartbeat();
     };
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index c97d703..83e44d3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -133,7 +133,7 @@ public class TestReportPublisher {
     Thread.sleep(150);
     executorService.shutdown();
     Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
-    verify(dummyContext, times(1)).addReport(null);
+    verify(dummyContext, times(1)).refreshFullReport(null);
     // After executor shutdown, no new reports should be published
     Thread.sleep(100);
     Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index d152f4d..6d0ad16 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -37,6 +36,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
@@ -106,35 +106,7 @@ public class TestStateContext {
     // getReports dequeues incremental reports
     expectedReportCount.clear();
 
-    // Case 2: Attempt to put back a full report
-
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-
-    // Case 3: Put back mixed types of incremental reports
-
+    // Case 2: Put back mixed types of incremental reports
     ctx.putBackReports(Arrays.asList(
         newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
         newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
@@ -152,31 +124,6 @@ public class TestStateContext {
     checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
     // getReports dequeues incremental reports
     expectedReportCount.clear();
-
-    // Case 4: Attempt to put back mixed types of full reports
-
-    try {
-      ctx.putBackReports(Arrays.asList(
-          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
-          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)
-      ), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-
-    // Case 5: Attempt to put back mixed full and incremental reports
-
-    try {
-      ctx.putBackReports(Arrays.asList(
-          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)
-      ), scm2);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
   }
 
   @Test
@@ -197,8 +144,48 @@ public class TestStateContext {
 
     Map<String, Integer> expectedReportCount = new HashMap<>();
 
+    // Add a bunch of ContainerReports
+    batchRefreshfullReports(ctx,
+        StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // every time getAllAvailableReports is called , if we want to get a full
+    // report of a certain type, we must call "batchRefreshfullReports" for
+    // this type to refresh.
+    expectedReportCount.remove(StateContext.CONTAINER_REPORTS_PROTO_NAME);
+
+    // Add a bunch of NodeReport
+    batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    expectedReportCount.remove(StateContext.NODE_REPORT_PROTO_NAME);
+
+    // Add a bunch of PipelineReports
+    batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 
128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    expectedReportCount.remove(StateContext.PIPELINE_REPORTS_PROTO_NAME);
+
+    // Add a bunch of CommandStatusReports
+    batchAddIncrementalReport(ctx,
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+    expectedReportCount.put(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+    // Should keep all of them
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.remove(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
+
     // Add a bunch of IncrementalContainerReport
-    batchAddReports(ctx,
+    batchAddIncrementalReport(ctx,
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
     // Should keep all of them
     expectedReportCount.put(
@@ -210,9 +197,16 @@ public class TestStateContext {
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
   }
 
-  void batchAddReports(StateContext ctx, String reportName, int count) {
+  void batchRefreshfullReports(StateContext ctx, String reportName, int count) 
{
+    for (int i = 0; i < count; i++) {
+      ctx.refreshFullReport(newMockReport(reportName));
+    }
+  }
+
+  void batchAddIncrementalReport(StateContext ctx,
+                                 String reportName, int count) {
     for (int i = 0; i < count; i++) {
-      ctx.addReport(newMockReport(reportName));
+      ctx.addIncrementalReport(newMockReport(reportName));
     }
   }
 
@@ -243,7 +237,7 @@ public class TestStateContext {
     assertNull(context1.getPipelineReports());
     GeneratedMessage containerReports =
         newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
-    context1.addReport(containerReports);
+    context1.refreshFullReport(containerReports);
 
     assertNotNull(context1.getContainerReports());
     assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME,
@@ -255,7 +249,7 @@ public class TestStateContext {
     StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
     GeneratedMessage nodeReport =
         newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
-    context2.addReport(nodeReport);
+    context2.refreshFullReport(nodeReport);
 
     assertNull(context2.getContainerReports());
     assertNotNull(context2.getNodeReport());
@@ -267,7 +261,7 @@ public class TestStateContext {
     StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
     GeneratedMessage pipelineReports =
         newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
-    context3.addReport(pipelineReports);
+    context3.refreshFullReport(pipelineReports);
 
     assertNull(context3.getContainerReports());
     assertNull(context3.getNodeReport());
@@ -311,7 +305,7 @@ public class TestStateContext {
         newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
 
     // Try to add report with zero endpoint. Should not be stored.
-    stateContext.addReport(generatedMessage);
+    stateContext.addIncrementalReport(generatedMessage);
     assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
 
     // Add 2 scm endpoints.
@@ -319,7 +313,7 @@ public class TestStateContext {
     stateContext.addEndpoint(scm2);
 
     // Add report. Should be added to all endpoints.
-    stateContext.addReport(generatedMessage);
+    stateContext.addIncrementalReport(generatedMessage);
     List<GeneratedMessage> allAvailableReports =
         stateContext.getAllAvailableReports(scm1);
     assertEquals(1, allAvailableReports.size());
@@ -462,9 +456,9 @@ public class TestStateContext {
 
     // task num greater than pool size
     for (int i = 0; i < threadPoolSize; i++) {
-      executorService.submit(() -> futureOne.get());
+      executorService.submit((Callable<String>) futureOne::get);
     }
-    executorService.submit(() -> futureTwo.get());
+    executorService.submit((Callable<String>) futureTwo::get);
 
     Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService));
 
@@ -483,8 +477,8 @@ public class TestStateContext {
 
     ExecutorService executorService = Executors.newFixedThreadPool(1);
     CompletableFuture<String> future = new CompletableFuture<>();
-    executorService.submit(() -> future.get());
-    executorService.submit(() -> future.get());
+    executorService.submit((Callable<String>) future::get);
+    executorService.submit((Callable<String>) future::get);
 
     StateContext subject = new StateContext(new OzoneConfiguration(),
         DatanodeStates.INIT, mock(DatanodeStateMachine.class)) {
@@ -549,11 +543,13 @@ public class TestStateContext {
     Map<String, Integer> expectedReportCount = new HashMap<>();
 
     // Add a bunch of ContainerReports
-    batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
-    batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
-    batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
-    batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
-    batchAddReports(ctx,
+    batchRefreshfullReports(ctx,
+        StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+    batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+    batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 
128);
+    batchRefreshfullReports(ctx,
+        StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
+    batchAddIncrementalReport(ctx,
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
 
     // Should only keep the latest one
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 9b238a1..0f215f4 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -96,7 +96,7 @@ public class TestHeartbeatEndpointTask {
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(NodeReportProto.getDefaultInstance());
+    context.refreshFullReport(NodeReportProto.getDefaultInstance());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -128,7 +128,7 @@ public class TestHeartbeatEndpointTask {
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(ContainerReportsProto.getDefaultInstance());
+    context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -160,7 +160,8 @@ public class TestHeartbeatEndpointTask {
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    context.addIncrementalReport(
+        CommandStatusReportsProto.getDefaultInstance());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -224,9 +225,10 @@ public class TestHeartbeatEndpointTask {
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(NodeReportProto.getDefaultInstance());
-    context.addReport(ContainerReportsProto.getDefaultInstance());
-    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    context.refreshFullReport(NodeReportProto.getDefaultInstance());
+    context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
+    context.addIncrementalReport(
+        CommandStatusReportsProto.getDefaultInstance());
     context.addContainerAction(getContainerAction());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 5ff95ce..addd15c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -393,7 +393,8 @@ public class TestBlockDeletion {
 
     logCapturer.clearOutput();
     cluster.getHddsDatanodes().get(0)
-        .getDatanodeStateMachine().getContext().addReport(dummyReport);
+        .getDatanodeStateMachine().getContext().
+        addIncrementalReport(dummyReport);
     cluster.getHddsDatanodes().get(0)
         .getDatanodeStateMachine().triggerHeartbeat();
     // wait for event to be handled by event handler

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to