This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 39954ad HDDS-5111. DataNode should not always report full information
in heartbeat (#2182)
39954ad is described below
commit 39954adeef38d4baba0c231d2bf68a4b0873e01f
Author: Jackson Yao <[email protected]>
AuthorDate: Tue Jun 8 02:03:56 2021 +0800
HDDS-5111. DataNode should not always report full information in heartbeat
(#2182)
---
.../container/common/report/ReportPublisher.java | 8 +-
.../common/statemachine/StateContext.java | 131 ++++++++++++-------
.../states/endpoint/HeartbeatEndpointTask.java | 5 +-
.../transport/server/ratis/XceiverServerRatis.java | 3 +-
.../ozone/container/ozoneimpl/OzoneContainer.java | 2 +-
.../common/report/TestReportPublisher.java | 2 +-
.../common/statemachine/TestStateContext.java | 140 ++++++++++-----------
.../states/endpoint/TestHeartbeatEndpointTask.java | 14 ++-
.../commandhandler/TestBlockDeletion.java | 3 +-
9 files changed, 179 insertions(+), 129 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index e6b4106..8d4820e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -80,7 +81,12 @@ public abstract class ReportPublisher<T extends
GeneratedMessage>
*/
private void publishReport() {
try {
- context.addReport(getReport());
+ GeneratedMessage report = getReport();
+ if (report instanceof CommandStatusReportsProto) {
+ context.addIncrementalReport(report);
+ } else {
+ context.refreshFullReport(report);
+ }
} catch (IOException e) {
LOG.error("Exception while publishing report.", e);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 5c3aaa9..ca73468 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,7 +41,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
@@ -63,10 +63,11 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import static java.lang.Math.min;
-import org.apache.commons.collections.CollectionUtils;
-
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
+
+import org.apache.commons.collections.CollectionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,10 +94,6 @@ public class StateContext {
@VisibleForTesting
static final String CRL_STATUS_REPORT_PROTO_NAME =
CRLStatusReport.getDescriptor().getFullName();
- // Accepted types of reports that can be queued to incrementalReportsQueue
- private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
- Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
- INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
@@ -121,6 +118,14 @@ public class StateContext {
private boolean shutdownOnError = false;
private boolean shutdownGracefully = false;
private final AtomicLong threadPoolNotAvailableCount;
+ // Endpoint -> ReportType -> Boolean of whether the full report should be
+ // queued in getFullReports call.
+ private final Map<InetSocketAddress,
+ Map<String, AtomicBoolean>> fullReportSendIndicator;
+ // List of supported full report types.
+ private final List<String> fullReportTypeList;
+ // ReportType -> Report.
+ private final Map<String, AtomicReference<GeneratedMessage>> type2Reports;
/**
* term of latest leader SCM, extract from SCMCommand.
@@ -167,6 +172,24 @@ public class StateContext {
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
threadPoolNotAvailableCount = new AtomicLong(0);
+ fullReportSendIndicator = new HashMap<>();
+ fullReportTypeList = new ArrayList<>();
+ type2Reports = new HashMap<>();
+ initReportTypeCollection();
+ }
+
+ /**
+ * init related ReportType Collections.
+ */
+ private void initReportTypeCollection(){
+ fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME);
+ type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports);
+ fullReportTypeList.add(NODE_REPORT_PROTO_NAME);
+ type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport);
+ fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME);
+ type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
+ fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
+ type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
}
/**
@@ -254,7 +277,7 @@ public class StateContext {
*
* @param report report to be added
*/
- public void addReport(GeneratedMessage report) {
+ public void addIncrementalReport(GeneratedMessage report) {
if (report == null) {
return;
}
@@ -262,23 +285,38 @@ public class StateContext {
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
- if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
- containerReports.set(report);
- } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
- nodeReport.set(report);
- } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
- pipelineReports.set(report);
- } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
- synchronized (incrementalReportsQueue) {
- for (InetSocketAddress endpoint : endpoints) {
- incrementalReportsQueue.get(endpoint).add(report);
- }
+ // in some case, we want to add a fullReportType message
+ // as an incremental message.
+ // see XceiverServerRatis#sendPipelineReport
+ synchronized (incrementalReportsQueue) {
+ for (InetSocketAddress endpoint : endpoints) {
+ incrementalReportsQueue.get(endpoint).add(report);
}
- } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
- crlStatusReport.set(report);
- } else {
+ }
+ }
+
+ /**
+ * refresh Full report.
+ *
+ * @param report report to be refreshed
+ */
+ public void refreshFullReport(GeneratedMessage report) {
+ if (report == null) {
+ return;
+ }
+ final Descriptor descriptor = report.getDescriptorForType();
+ Preconditions.checkState(descriptor != null);
+ final String reportType = descriptor.getFullName();
+ Preconditions.checkState(reportType != null);
+ if (!fullReportTypeList.contains(reportType)) {
throw new IllegalArgumentException(
- "Unidentified report message type: " + reportType);
+ "not full report message type: " + reportType);
+ }
+ type2Reports.get(reportType).set(report);
+ if (fullReportSendIndicator != null) {
+ for (Map<String, AtomicBoolean> mp : fullReportSendIndicator.values()) {
+ mp.get(reportType).set(true);
+ }
}
}
@@ -301,10 +339,6 @@ public class StateContext {
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
- if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
- throw new IllegalArgumentException(
- "Unaccepted report message type: " + reportType);
- }
}
synchronized (incrementalReportsQueue) {
if (incrementalReportsQueue.containsKey(endpoint)){
@@ -340,23 +374,27 @@ public class StateContext {
return reportsToReturn;
}
- List<GeneratedMessage> getNonIncrementalReports() {
+ List<GeneratedMessage> getFullReports(
+ InetSocketAddress endpoint) {
+ Map<String, AtomicBoolean> mp = fullReportSendIndicator.get(endpoint);
List<GeneratedMessage> nonIncrementalReports = new LinkedList<>();
- GeneratedMessage report = containerReports.get();
- if (report != null) {
- nonIncrementalReports.add(report);
- }
- report = nodeReport.get();
- if (report != null) {
- nonIncrementalReports.add(report);
- }
- report = pipelineReports.get();
- if (report != null) {
- nonIncrementalReports.add(report);
- }
- report = crlStatusReport.get();
- if (report != null) {
- nonIncrementalReports.add(report);
+ if (null != mp){
+ for (Map.Entry<String, AtomicBoolean> kv : mp.entrySet()) {
+ if (kv.getValue().get()) {
+ String reportType = kv.getKey();
+ final AtomicReference<GeneratedMessage> ref =
+ type2Reports.get(reportType);
+ if (ref == null) {
+ throw new RuntimeException(reportType + " is not a valid full "
+ + "report type!");
+ }
+ final GeneratedMessage msg = ref.get();
+ if (msg != null) {
+ nonIncrementalReports.add(msg);
+ mp.get(reportType).set(false);
+ }
+ }
+ }
}
return nonIncrementalReports;
}
@@ -372,7 +410,7 @@ public class StateContext {
if (maxLimit < 0) {
throw new IllegalArgumentException("Illegal maxLimit value: " +
maxLimit);
}
- List<GeneratedMessage> reports = getNonIncrementalReports();
+ List<GeneratedMessage> reports = getFullReports(endpoint);
if (maxLimit <= reports.size()) {
return reports.subList(0, maxLimit);
} else {
@@ -800,6 +838,11 @@ public class StateContext {
this.containerActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new LinkedList<>());
this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
+ Map<String, AtomicBoolean> mp = new HashMap<>();
+ fullReportTypeList.forEach(e -> {
+ mp.putIfAbsent(e, new AtomicBoolean(true));
+ });
+ this.fullReportSendIndicator.putIfAbsent(endpoint, mp);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 60b7978..cb65e37 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -150,7 +150,7 @@ public class HeartbeatEndpointTask
} catch (IOException ex) {
Preconditions.checkState(requestBuilder != null);
// put back the reports which failed to be sent
- putBackReports(requestBuilder);
+ putBackIncrementalReports(requestBuilder);
rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
@@ -159,7 +159,8 @@ public class HeartbeatEndpointTask
}
// TODO: Make it generic.
- private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder)
{
+ private void putBackIncrementalReports(
+ SCMHeartbeatRequestProto.Builder requestBuilder) {
List<GeneratedMessage> reports = new LinkedList<>();
// We only put back CommandStatusReports and IncrementalContainerReport
// because those are incremental. Container/Node/PipelineReport are
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6fd2706..3a2aec9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -917,7 +917,8 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private void sendPipelineReport() {
if (context != null) {
// TODO: Send IncrementalPipelineReport instead of full PipelineReport
-
context.addReport(context.getParent().getContainer().getPipelineReport());
+ context.addIncrementalReport(
+ context.getParent().getContainer().getPipelineReport());
context.getParent().triggerHeartbeat();
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 736e3d6..2312c0b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -130,7 +130,7 @@ public class OzoneContainer {
.newBuilder()
.addReport(containerReplicaProto)
.build();
- context.addReport(icr);
+ context.addIncrementalReport(icr);
context.getParent().triggerHeartbeat();
};
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index c97d703..83e44d3 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -133,7 +133,7 @@ public class TestReportPublisher {
Thread.sleep(150);
executorService.shutdown();
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
- verify(dummyContext, times(1)).addReport(null);
+ verify(dummyContext, times(1)).refreshFullReport(null);
// After executor shutdown, no new reports should be published
Thread.sleep(100);
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index d152f4d..6d0ad16 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +36,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
@@ -106,35 +106,7 @@ public class TestStateContext {
// getReports dequeues incremental reports
expectedReportCount.clear();
- // Case 2: Attempt to put back a full report
-
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
-
- // Case 3: Put back mixed types of incremental reports
-
+ // Case 2: Put back mixed types of incremental reports
ctx.putBackReports(Arrays.asList(
newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
@@ -152,31 +124,6 @@ public class TestStateContext {
checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
// getReports dequeues incremental reports
expectedReportCount.clear();
-
- // Case 4: Attempt to put back mixed types of full reports
-
- try {
- ctx.putBackReports(Arrays.asList(
- newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
- newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
- newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME),
- newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)
- ), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
-
- // Case 5: Attempt to put back mixed full and incremental reports
-
- try {
- ctx.putBackReports(Arrays.asList(
- newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
- newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
- newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)
- ), scm2);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
}
@Test
@@ -197,8 +144,48 @@ public class TestStateContext {
Map<String, Integer> expectedReportCount = new HashMap<>();
+ // Add a bunch of ContainerReports
+ batchRefreshfullReports(ctx,
+ StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // every time getAllAvailableReports is called , if we want to get a full
+ // report of a certain type, we must call "batchRefreshfullReports" for
+ // this type to refresh.
+ expectedReportCount.remove(StateContext.CONTAINER_REPORTS_PROTO_NAME);
+
+ // Add a bunch of NodeReport
+ batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ expectedReportCount.remove(StateContext.NODE_REPORT_PROTO_NAME);
+
+ // Add a bunch of PipelineReports
+ batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME,
128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ expectedReportCount.remove(StateContext.PIPELINE_REPORTS_PROTO_NAME);
+
+ // Add a bunch of CommandStatusReports
+ batchAddIncrementalReport(ctx,
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+ expectedReportCount.put(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+ // Should keep all of them
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.remove(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
+
// Add a bunch of IncrementalContainerReport
- batchAddReports(ctx,
+ batchAddIncrementalReport(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
// Should keep all of them
expectedReportCount.put(
@@ -210,9 +197,16 @@ public class TestStateContext {
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
}
- void batchAddReports(StateContext ctx, String reportName, int count) {
+ void batchRefreshfullReports(StateContext ctx, String reportName, int count)
{
+ for (int i = 0; i < count; i++) {
+ ctx.refreshFullReport(newMockReport(reportName));
+ }
+ }
+
+ void batchAddIncrementalReport(StateContext ctx,
+ String reportName, int count) {
for (int i = 0; i < count; i++) {
- ctx.addReport(newMockReport(reportName));
+ ctx.addIncrementalReport(newMockReport(reportName));
}
}
@@ -243,7 +237,7 @@ public class TestStateContext {
assertNull(context1.getPipelineReports());
GeneratedMessage containerReports =
newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
- context1.addReport(containerReports);
+ context1.refreshFullReport(containerReports);
assertNotNull(context1.getContainerReports());
assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME,
@@ -255,7 +249,7 @@ public class TestStateContext {
StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
GeneratedMessage nodeReport =
newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
- context2.addReport(nodeReport);
+ context2.refreshFullReport(nodeReport);
assertNull(context2.getContainerReports());
assertNotNull(context2.getNodeReport());
@@ -267,7 +261,7 @@ public class TestStateContext {
StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
GeneratedMessage pipelineReports =
newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
- context3.addReport(pipelineReports);
+ context3.refreshFullReport(pipelineReports);
assertNull(context3.getContainerReports());
assertNull(context3.getNodeReport());
@@ -311,7 +305,7 @@ public class TestStateContext {
newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
// Try to add report with zero endpoint. Should not be stored.
- stateContext.addReport(generatedMessage);
+ stateContext.addIncrementalReport(generatedMessage);
assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
// Add 2 scm endpoints.
@@ -319,7 +313,7 @@ public class TestStateContext {
stateContext.addEndpoint(scm2);
// Add report. Should be added to all endpoints.
- stateContext.addReport(generatedMessage);
+ stateContext.addIncrementalReport(generatedMessage);
List<GeneratedMessage> allAvailableReports =
stateContext.getAllAvailableReports(scm1);
assertEquals(1, allAvailableReports.size());
@@ -462,9 +456,9 @@ public class TestStateContext {
// task num greater than pool size
for (int i = 0; i < threadPoolSize; i++) {
- executorService.submit(() -> futureOne.get());
+ executorService.submit((Callable<String>) futureOne::get);
}
- executorService.submit(() -> futureTwo.get());
+ executorService.submit((Callable<String>) futureTwo::get);
Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService));
@@ -483,8 +477,8 @@ public class TestStateContext {
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture<String> future = new CompletableFuture<>();
- executorService.submit(() -> future.get());
- executorService.submit(() -> future.get());
+ executorService.submit((Callable<String>) future::get);
+ executorService.submit((Callable<String>) future::get);
StateContext subject = new StateContext(new OzoneConfiguration(),
DatanodeStates.INIT, mock(DatanodeStateMachine.class)) {
@@ -549,11 +543,13 @@ public class TestStateContext {
Map<String, Integer> expectedReportCount = new HashMap<>();
// Add a bunch of ContainerReports
- batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
- batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
- batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
- batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
- batchAddReports(ctx,
+ batchRefreshfullReports(ctx,
+ StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+ batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+ batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME,
128);
+ batchRefreshfullReports(ctx,
+ StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
+ batchAddIncrementalReport(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
// Should only keep the latest one
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 9b238a1..0f215f4 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -96,7 +96,7 @@ public class TestHeartbeatEndpointTask {
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(NodeReportProto.getDefaultInstance());
+ context.refreshFullReport(NodeReportProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -128,7 +128,7 @@ public class TestHeartbeatEndpointTask {
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(ContainerReportsProto.getDefaultInstance());
+ context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -160,7 +160,8 @@ public class TestHeartbeatEndpointTask {
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ context.addIncrementalReport(
+ CommandStatusReportsProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -224,9 +225,10 @@ public class TestHeartbeatEndpointTask {
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(NodeReportProto.getDefaultInstance());
- context.addReport(ContainerReportsProto.getDefaultInstance());
- context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ context.refreshFullReport(NodeReportProto.getDefaultInstance());
+ context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
+ context.addIncrementalReport(
+ CommandStatusReportsProto.getDefaultInstance());
context.addContainerAction(getContainerAction());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 5ff95ce..addd15c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -393,7 +393,8 @@ public class TestBlockDeletion {
logCapturer.clearOutput();
cluster.getHddsDatanodes().get(0)
- .getDatanodeStateMachine().getContext().addReport(dummyReport);
+ .getDatanodeStateMachine().getContext().
+ addIncrementalReport(dummyReport);
cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine().triggerHeartbeat();
// wait for event to be handled by event handler
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]